feat(api): Enhance session events API with pagination, sorting, and filtering

- Added support for pagination (page and limit) in the session events endpoint.
- Implemented sorting functionality based on specified columns and directions.
- Introduced free-text search capability for session events.
- Updated SQL queries to retrieve all events and added a new SQL constant for events.
- Refactored GraphQL types and helpers to support new plugin and event queries.
- Created new GraphQL resolvers for plugins and events with pagination and filtering.
- Added comprehensive tests for new GraphQL endpoints and session events functionality.
This commit is contained in:
Jokob @NetAlertX
2026-03-26 20:57:10 +00:00
parent 250e533655
commit ec3e4c8988
15 changed files with 1312 additions and 352 deletions

View File

@@ -192,3 +192,242 @@ def test_graphql_langstrings_excludes_languages_json(client, api_token):
f"languages.json leaked into langStrings as {len(polluted)} entries; "
"graphql_endpoint.py must exclude it from the directory scan"
)
# --- PLUGINS_OBJECTS TESTS ---
def test_graphql_plugins_objects_no_options(client, api_token):
"""pluginsObjects without options returns valid schema (entries list + count fields)"""
query = {
"query": """
{
pluginsObjects {
dbCount
count
entries {
index
plugin
objectPrimaryId
status
}
}
}
"""
}
resp = client.post("/graphql", json=query, headers=auth_headers(api_token))
assert resp.status_code == 200
body = resp.get_json()
assert "errors" not in body
result = body["data"]["pluginsObjects"]
assert isinstance(result["entries"], list)
assert isinstance(result["dbCount"], int)
assert isinstance(result["count"], int)
assert result["dbCount"] >= result["count"]
def test_graphql_plugins_objects_pagination(client, api_token):
"""pluginsObjects with limit=5 returns at most 5 entries and count reflects filter total"""
query = {
"query": """
query PluginsObjectsPaged($options: PluginQueryOptionsInput) {
pluginsObjects(options: $options) {
dbCount
count
entries { index plugin }
}
}
""",
"variables": {"options": {"page": 1, "limit": 5}}
}
resp = client.post("/graphql", json=query, headers=auth_headers(api_token))
assert resp.status_code == 200
body = resp.get_json()
assert "errors" not in body
result = body["data"]["pluginsObjects"]
assert len(result["entries"]) <= 5
assert result["count"] >= len(result["entries"])
def test_graphql_plugins_events_no_options(client, api_token):
"""pluginsEvents without options returns valid schema"""
query = {
"query": """
{
pluginsEvents {
dbCount
count
entries { index plugin objectPrimaryId dateTimeCreated }
}
}
"""
}
resp = client.post("/graphql", json=query, headers=auth_headers(api_token))
assert resp.status_code == 200
body = resp.get_json()
assert "errors" not in body
result = body["data"]["pluginsEvents"]
assert isinstance(result["entries"], list)
assert isinstance(result["count"], int)
def test_graphql_plugins_history_no_options(client, api_token):
"""pluginsHistory without options returns valid schema"""
query = {
"query": """
{
pluginsHistory {
dbCount
count
entries { index plugin watchedValue1 }
}
}
"""
}
resp = client.post("/graphql", json=query, headers=auth_headers(api_token))
assert resp.status_code == 200
body = resp.get_json()
assert "errors" not in body
result = body["data"]["pluginsHistory"]
assert isinstance(result["entries"], list)
assert isinstance(result["count"], int)
def test_graphql_plugins_hard_cap(client, api_token):
"""limit=99999 is clamped server-side to at most 1000 entries"""
query = {
"query": """
query PluginsHardCap($options: PluginQueryOptionsInput) {
pluginsObjects(options: $options) {
count
entries { index }
}
}
""",
"variables": {"options": {"page": 1, "limit": 99999}}
}
resp = client.post("/graphql", json=query, headers=auth_headers(api_token))
assert resp.status_code == 200
body = resp.get_json()
assert "errors" not in body
entries = body["data"]["pluginsObjects"]["entries"]
assert len(entries) <= 1000, f"Hard cap violated: got {len(entries)} entries"
# --- EVENTS TESTS ---
def test_graphql_events_no_options(client, api_token):
"""events without options returns valid schema (entries list + count fields)"""
query = {
"query": """
{
events {
dbCount
count
entries {
eveMac
eveIp
eveDateTime
eveEventType
eveAdditionalInfo
}
}
}
"""
}
resp = client.post("/graphql", json=query, headers=auth_headers(api_token))
assert resp.status_code == 200
body = resp.get_json()
assert "errors" not in body
result = body["data"]["events"]
assert isinstance(result["entries"], list)
assert isinstance(result["count"], int)
assert isinstance(result["dbCount"], int)
def test_graphql_events_filter_by_mac(client, api_token):
"""events filtered by eveMac='00:00:00:00:00:00' returns only that MAC (or empty)"""
query = {
"query": """
query EventsByMac($options: EventQueryOptionsInput) {
events(options: $options) {
count
entries { eveMac eveEventType eveDateTime }
}
}
""",
"variables": {"options": {"eveMac": "00:00:00:00:00:00", "limit": 50}}
}
resp = client.post("/graphql", json=query, headers=auth_headers(api_token))
assert resp.status_code == 200
body = resp.get_json()
assert "errors" not in body
result = body["data"]["events"]
for entry in result["entries"]:
assert entry["eveMac"].upper() == "00:00:00:00:00:00", (
f"MAC filter leaked a non-matching row: {entry['eveMac']}"
)
# --- PLUGIN FILTER SCOPING TESTS ---
def test_graphql_plugins_objects_dbcount_scoped_to_plugin(client, api_token):
"""dbCount should reflect only the rows for the requested plugin, not the entire table."""
# First, get the unscoped total
query_all = {
"query": "{ pluginsObjects { dbCount count } }"
}
resp_all = client.post("/graphql", json=query_all, headers=auth_headers(api_token))
assert resp_all.status_code == 200
total_all = resp_all.get_json()["data"]["pluginsObjects"]["dbCount"]
# Now request a non-existent plugin — dbCount must be 0
query_fake = {
"query": """
query Scoped($options: PluginQueryOptionsInput) {
pluginsObjects(options: $options) { dbCount count entries { plugin } }
}
""",
"variables": {"options": {"plugin": "NONEXISTENT_PLUGIN_XYZ"}}
}
resp_fake = client.post("/graphql", json=query_fake, headers=auth_headers(api_token))
assert resp_fake.status_code == 200
body_fake = resp_fake.get_json()
assert "errors" not in body_fake
result_fake = body_fake["data"]["pluginsObjects"]
assert result_fake["dbCount"] == 0, (
f"dbCount should be 0 for non-existent plugin, got {result_fake['dbCount']}"
)
assert result_fake["count"] == 0
assert result_fake["entries"] == []
def test_graphql_plugins_objects_scoped_entries_match_plugin(client, api_token):
"""When filtering by plugin, all returned entries must belong to that plugin."""
# Get first available plugin prefix from the unscoped query
query_sample = {
"query": "{ pluginsObjects(options: {page: 1, limit: 1}) { entries { plugin } } }"
}
resp = client.post("/graphql", json=query_sample, headers=auth_headers(api_token))
assert resp.status_code == 200
entries = resp.get_json()["data"]["pluginsObjects"]["entries"]
if not entries:
pytest.skip("No plugin objects in database")
target = entries[0]["plugin"]
# Query scoped to that plugin
query_scoped = {
"query": """
query Scoped($options: PluginQueryOptionsInput) {
pluginsObjects(options: $options) { dbCount count entries { plugin } }
}
""",
"variables": {"options": {"plugin": target, "page": 1, "limit": 100}}
}
resp2 = client.post("/graphql", json=query_scoped, headers=auth_headers(api_token))
assert resp2.status_code == 200
result = resp2.get_json()["data"]["pluginsObjects"]
assert result["dbCount"] > 0
for entry in result["entries"]:
assert entry["plugin"].upper() == target.upper(), (
f"Plugin filter leaked: expected {target}, got {entry['plugin']}"
)