api layer v0.2.5 - /sessions + graphql tests

This commit is contained in:
jokob-sk
2025-08-21 15:10:47 +10:00
parent 3dc87d2adb
commit 915bb523d6
11 changed files with 875 additions and 57 deletions

View File

@@ -5,6 +5,7 @@ import random
import string
import uuid
import pytest
from datetime import datetime, timedelta
INSTALL_PATH = "/app"
sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
@@ -30,21 +31,33 @@ def auth_headers(token):
return {"Authorization": f"Bearer {token}"}
def create_event(client, api_token, mac, event="UnitTest Event", days_old=None):
"""
Create event using API (POST /event/<mac>).
If days_old is set, adds it to payload for backdating support.
"""
payload = {
"event": event,
}
if days_old:
payload["days_old"] = days_old
return client.post(f"/event/{mac}", json=payload, headers=auth_headers(api_token))
payload = {"ip": "0.0.0.0", "event_type": event}
# Calculate the event_time if days_old is given
if days_old is not None:
event_time = timeNowTZ() - timedelta(days=days_old)
# ISO 8601 string
payload["event_time"] = event_time.isoformat()
return client.post(f"/events/create/{mac}", json=payload, headers=auth_headers(api_token))
def list_events(client, api_token, mac=None):
url = "/events" if mac is None else f"/events/{mac}"
url = "/events" if mac is None else f"/events?mac={mac}"
return client.get(url, headers=auth_headers(api_token))
def test_create_event(client, api_token, test_mac):
# create event
resp = create_event(client, api_token, test_mac)
assert resp.status_code == 200
data = resp.get_json()
assert data.get("success") is True
# confirm event exists
resp = list_events(client, api_token, test_mac)
assert resp.status_code == 200
events = resp.get_json().get("events", [])
assert any(ev.get("eve_MAC") == test_mac for ev in events)
def test_delete_events_for_mac(client, api_token, test_mac):
# create event
@@ -54,7 +67,8 @@ def test_delete_events_for_mac(client, api_token, test_mac):
# confirm exists
resp = list_events(client, api_token, test_mac)
assert resp.status_code == 200
assert any(ev["eve_MAC"] == test_mac for ev in resp.json)
events = resp.json.get("events", [])
assert any(ev["eve_MAC"] == test_mac for ev in events)
# delete
resp = client.delete(f"/events/{test_mac}", headers=auth_headers(api_token))
@@ -64,7 +78,7 @@ def test_delete_events_for_mac(client, api_token, test_mac):
# confirm deleted
resp = list_events(client, api_token, test_mac)
assert resp.status_code == 200
assert len(resp.json) == 0
assert len(resp.json.get("events", [])) == 0
def test_delete_all_events(client, api_token, test_mac):
@@ -82,7 +96,7 @@ def test_delete_all_events(client, api_token, test_mac):
# confirm no events
resp = list_events(client, api_token)
assert len(resp.json) == 0
assert len(resp.json.get("events", [])) == 0
def test_delete_events_30days(client, api_token, test_mac):
@@ -100,5 +114,7 @@ def test_delete_events_30days(client, api_token, test_mac):
# confirm only recent remains
resp = list_events(client, api_token, test_mac)
mac_events = [ev for ev in resp.json if ev["eve_MAC"] == test_mac]
events = resp.get_json().get("events", [])
mac_events = [ev for ev in events if ev.get("eve_MAC") == test_mac]
assert len(mac_events) == 1

81
test/test_graphq_endpoints.py Executable file
View File

@@ -0,0 +1,81 @@
import sys
import pathlib
import sqlite3
import random
import string
import uuid
import pytest
from datetime import datetime, timedelta
INSTALL_PATH = "/app"
sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
from helper import timeNowTZ, get_setting_value
from api_server.api_server_start import app
@pytest.fixture(scope="session")
def api_token():
return get_setting_value("API_TOKEN")
@pytest.fixture
def client():
with app.test_client() as client:
yield client
@pytest.fixture
def test_mac():
# Generate a unique MAC for each test run
return "AA:BB:CC:" + ":".join(f"{random.randint(0,255):02X}" for _ in range(3))
def auth_headers(token):
return {"Authorization": f"Bearer {token}"}
def test_graphql_debug_get(client):
"""GET /graphql should return the debug string"""
resp = client.get("/graphql")
assert resp.status_code == 200
assert resp.data.decode() == "NetAlertX GraphQL server running."
def test_graphql_post_unauthorized(client):
"""POST /graphql without token should return 401"""
query = {"query": "{ devices { devName devMac } }"}
resp = client.post("/graphql", json=query)
assert resp.status_code == 401
assert "Unauthorized access attempt" in resp.json.get("error", "")
def test_graphql_post_devices(client, api_token):
"""POST /graphql with a valid token should return device data"""
query = {
"query": """
{
devices {
devices { devName devMac devIsRandomMac devParentChildrenCount }
count
}
}
"""
}
resp = client.post("/graphql", json=query, headers=auth_headers(api_token))
assert resp.status_code == 200
data = resp.json.get("data", {})
assert "devices" in data
assert isinstance(data["devices"]["devices"], list)
def test_graphql_post_settings(client, api_token):
"""POST /graphql should return settings data"""
query = {
"query": """
{
settings {
settings { setKey setValue setGroup }
count
}
}
"""
}
resp = client.post("/graphql", json=query, headers=auth_headers(api_token))
assert resp.status_code == 200
data = resp.json.get("data", {})
assert "settings" in data
assert isinstance(data["settings"]["settings"], list)

View File

@@ -126,4 +126,78 @@ def test_traceroute_device(client, api_token, test_mac):
assert "output" in data
assert isinstance(data["output"], str)
@pytest.mark.parametrize("ip,expected_status", [
("8.8.8.8", 200),
("256.256.256.256", 400), # Invalid IP
("", 400), # Missing IP
])
def test_nslookup_endpoint(client, api_token, ip, expected_status):
payload = {"devLastIP": ip} if ip else {}
resp = client.post("/nettools/nslookup", json=payload, headers=auth_headers(api_token))
assert resp.status_code == expected_status
data = resp.json
if expected_status == 200:
assert data.get("success") is True
assert isinstance(data["output"], list)
assert all(isinstance(line, str) for line in data["output"])
else:
assert data.get("success") is False
assert "error" in data
@pytest.mark.parametrize("ip,mode,expected_status", [
("127.0.0.1", "fast", 200),
("127.0.0.1", "normal", 200),
("127.0.0.1", "detail", 200),
("127.0.0.1", "skipdiscovery", 200),
("127.0.0.1", "invalidmode", 400),
("999.999.999.999", "fast", 400),
])
def test_nmap_endpoint(client, api_token, ip, mode, expected_status):
payload = {"scan": ip, "mode": mode}
resp = client.post("/nettools/nmap", json=payload, headers=auth_headers(api_token))
assert resp.status_code == expected_status
data = resp.json
if expected_status == 200:
assert data.get("success") is True
assert data.get("mode") == mode
assert data.get("ip") == ip
assert isinstance(data["output"], list)
assert all(isinstance(line, str) for line in data["output"])
else:
assert data.get("success") is False
assert "error" in data
def test_nslookup_unauthorized(client):
# No auth headers
resp = client.post("/nettools/nslookup", json={"devLastIP": "8.8.8.8"})
assert resp.status_code == 403
data = resp.json
assert data.get("success") is False
assert data.get("error") == "Forbidden"
def test_nmap_unauthorized(client):
# No auth headers
resp = client.post("/nettools/nmap", json={"scan": "127.0.0.1", "mode": "fast"})
assert resp.status_code == 403
data = resp.json
assert data.get("success") is False
assert data.get("error") == "Forbidden"
def test_internet_info_endpoint(client, api_token):
resp = client.get("/nettools/internetinfo", headers=auth_headers(api_token))
data = resp.json
if resp.status_code == 200:
assert data.get("success") is True
assert isinstance(data.get("output"), str)
assert len(data["output"]) > 0 # ensure output is not empty
else:
# Handle errors, e.g., curl failure
assert data.get("success") is False
assert "error" in data
assert "details" in data

181
test/test_sessions_endpoints.py Executable file
View File

@@ -0,0 +1,181 @@
import sys
import pathlib
import sqlite3
import random
import string
import uuid
import pytest
from datetime import datetime, timedelta
INSTALL_PATH = "/app"
sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
from helper import timeNowTZ, get_setting_value
from api_server.api_server_start import app
@pytest.fixture(scope="session")
def api_token():
return get_setting_value("API_TOKEN")
@pytest.fixture
def client():
with app.test_client() as client:
yield client
@pytest.fixture
def test_mac():
# Generate a unique MAC for each test run
return "AA:BB:CC:" + ":".join(f"{random.randint(0,255):02X}" for _ in range(3))
def auth_headers(token):
return {"Authorization": f"Bearer {token}"}
def test_create_device(client, api_token, test_mac):
payload = {
"createNew": True,
"devType": "Test Device",
"devOwner": "Unit Test",
"devType": "Router",
"devVendor": "TestVendor",
}
resp = client.post(f"/device/{test_mac}", json=payload, headers=auth_headers(api_token))
assert resp.status_code == 200
assert resp.json.get("success") is True
# -----------------------------
# CREATE SESSION
# -----------------------------
def test_create_session(client, api_token, test_mac):
payload = {
"mac": test_mac,
"ip": "192.168.1.100",
"start_time": timeNowTZ(),
"event_type_conn": "Connected",
"event_type_disc": "Disconnected"
}
resp = client.post("/sessions/create", json=payload, headers=auth_headers(api_token))
assert resp.status_code == 200
assert resp.json.get("success") is True
# -----------------------------
# LIST SESSIONS
# -----------------------------
def test_list_sessions(client, api_token, test_mac):
# Ensure at least one session exists
payload = {
"mac": test_mac,
"ip": "192.168.1.100",
"start_time": timeNowTZ()
}
client.post("/sessions/create", json=payload, headers=auth_headers(api_token))
# List sessions for MAC
resp = client.get(f"/sessions/list?mac={test_mac}", headers=auth_headers(api_token))
assert resp.status_code == 200
assert resp.json.get("success") is True
sessions = resp.json.get("sessions")
assert any(ses["ses_MAC"] == test_mac for ses in sessions)
# -----------------------------
# DELETE SESSION
# -----------------------------
def test_delete_session(client, api_token, test_mac):
# First create session
payload = {
"mac": test_mac,
"ip": "192.168.1.100",
"start_time": timeNowTZ()
}
client.post("/sessions/create", json=payload, headers=auth_headers(api_token))
# Delete session
resp = client.delete("/sessions/delete", json={"mac": test_mac}, headers=auth_headers(api_token))
assert resp.status_code == 200
assert resp.json.get("success") is True
# Confirm deletion
resp = client.get(f"/sessions/list?mac={test_mac}", headers=auth_headers(api_token))
sessions = resp.json.get("sessions")
assert not any(ev["ses_MAC"] == test_mac for ses in sessions)
def test_get_sessions_calendar(client, api_token, test_mac):
"""
Test the /sessions/calendar endpoint.
Creates session and ensures the calendar output is correct.
Cleans up test sessions after test.
"""
# --- Setup: create two sessions for the test MAC ---
now = timeNowTZ()
start1 = (now - timedelta(days=2)).isoformat(timespec="seconds")
end1 = (now - timedelta(days=1, hours=20)).isoformat(timespec="seconds")
start2 = (now - timedelta(days=1)).isoformat(timespec="seconds")
end2 = (now - timedelta(hours=20)).isoformat(timespec="seconds")
# Create sessions using your endpoint
client.post("/sessions/create", json={
"mac": test_mac,
"ip": "192.168.1.100",
"start_time": start1,
"end_time": end1,
"event_type_conn": "connect",
"event_type_disc": "disconnect"
}, headers=auth_headers(api_token))
client.post("/sessions/create", json={
"mac": test_mac,
"ip": "192.168.1.100",
"start_time": start2,
"end_time": end2,
"event_type_conn": "connect",
"event_type_disc": "disconnect"
}, headers=auth_headers(api_token))
# --- Call the /sessions/calendar API ---
start_date = (now - timedelta(days=3)).strftime("%Y-%m-%d")
end_date = (now + timedelta(days=1)).strftime("%Y-%m-%d")
resp = client.get(
f"/sessions/calendar?start={start_date}&end={end_date}",
headers=auth_headers(api_token)
)
assert resp.status_code == 200
data = resp.json
assert data.get("success") is True
assert "sessions" in data
sessions = data["sessions"]
# --- Verify calendar sessions ---
assert len(sessions) >= 2 # at least our two sessions
# Check expected keys
expected_keys = {"resourceId", "title", "start", "end", "color", "tooltip", "className"}
for ses in sessions:
assert set(ses.keys()) == expected_keys
# Check that all sessions belong to the test MAC
mac_sessions = [ses for ses in sessions if ses["resourceId"] == test_mac]
assert len(mac_sessions) == 2 # or exact number if you know it
# Check ISO date formatting for start/end
for ses in mac_sessions:
# start must always be present
assert ses["start"] is not None, f"Session start is None: {ses}"
datetime.fromisoformat(ses["start"])
# end can be None only if tooltip mentions "<still connected>"
if ses["end"] is not None:
datetime.fromisoformat(ses["end"])
else:
assert "<still connected>" in ses["tooltip"], f"End is None but session not marked as still connected: {ses}"
# --- Cleanup: delete all test sessions for this MAC ---
client.delete(f"/sessions/delete?mac={test_mac}", headers=auth_headers(api_token))