From 915bb523d68751b2565686476c82cb979f59f6f1 Mon Sep 17 00:00:00 2001 From: jokob-sk Date: Thu, 21 Aug 2025 15:10:47 +1000 Subject: [PATCH] api layer v0.2.5 - /sessions + graphql tests --- server/api_server/api_server_start.py | 133 +++++++++++++++-- server/api_server/events_endpoint.py | 37 ++++- server/api_server/graphql_endpoint.py | 1 + server/api_server/nettools_endpoint.py | 126 +++++++++++++++- server/api_server/prometheus_endpoint.py | 2 +- server/api_server/sessions_endpoint.py | 172 +++++++++++++++++++++ server/helper.py | 79 ++++++---- test/test_events_endpoints.py | 46 ++++-- test/test_graphq_endpoints.py | 81 ++++++++++ test/test_nettools_endpoints.py | 74 +++++++++ test/test_sessions_endpoints.py | 181 +++++++++++++++++++++++ 11 files changed, 875 insertions(+), 57 deletions(-) create mode 100755 server/api_server/sessions_endpoint.py create mode 100755 test/test_graphq_endpoints.py create mode 100755 test/test_sessions_endpoints.py diff --git a/server/api_server/api_server_start.py b/server/api_server/api_server_start.py index 44d970ac..204ae5af 100755 --- a/server/api_server/api_server_start.py +++ b/server/api_server/api_server_start.py @@ -4,10 +4,11 @@ from flask_cors import CORS from .graphql_endpoint import devicesSchema from .device_endpoint import get_device_data, set_device_data, delete_device, delete_device_events, reset_device_props, copy_device, update_device_column from .devices_endpoint import get_all_devices, delete_unknown_devices, delete_all_with_empty_macs, delete_devices, export_devices, import_csv, devices_totals, devices_by_status -from .events_endpoint import delete_events, delete_events_30, get_events +from .events_endpoint import delete_events, delete_events_30, get_events, create_event from .history_endpoint import delete_online_history -from .prometheus_endpoint import getMetricStats -from .nettools_endpoint import wakeonlan, traceroute, speedtest +from .prometheus_endpoint import get_metric_stats +from .sessions_endpoint import get_sessions, delete_session, create_session, get_sessions_calendar +from .nettools_endpoint import wakeonlan, traceroute, speedtest, nslookup, nmap_scan, internet_info from .sync_endpoint import handle_sync_post, handle_sync_get import sys @@ -30,6 +31,7 @@ CORS( r"/devices/*": {"origins": "*"}, r"/history/*": {"origins": "*"}, r"/nettools/*": {"origins": "*"}, + r"/sessions/*": {"origins": "*"}, r"/events/*": {"origins": "*"} }, supports_credentials=True, @@ -212,7 +214,47 @@ def api_speedtest(): if not is_authorized(): return jsonify({"error": "Forbidden"}), 403 return speedtest() + +@app.route("/nettools/nslookup", methods=["POST"]) +def api_nslookup(): + """ + API endpoint to handle nslookup requests. + Expects JSON with 'devLastIP'. + """ + if not is_authorized(): + return jsonify({"success": False, "error": "Forbidden"}), 403 + + data = request.get_json(silent=True) + if not data or "devLastIP" not in data: + return jsonify({"success": False, "error": "Missing 'devLastIP'"}), 400 + + ip = data["devLastIP"] + return nslookup(ip) + +@app.route("/nettools/nmap", methods=["POST"]) +def api_nmap(): + """ + API endpoint to handle nmap scan requests. + Expects JSON with 'scan' (IP address) and 'mode' (scan mode). + """ + if not is_authorized(): + return jsonify({"success": False, "error": "Forbidden"}), 403 + + data = request.get_json(silent=True) + if not data or "scan" not in data or "mode" not in data: + return jsonify({"success": False, "error": "Missing 'scan' or 'mode'"}), 400 + + ip = data["scan"] + mode = data["mode"] + return nmap_scan(ip, mode) + +@app.route("/nettools/internetinfo", methods=["GET"]) +def api_internet_info(): + if not is_authorized(): + return jsonify({"success": False, "error": "Forbidden"}), 403 + return internet_info() + # -------------------------- # Online history # -------------------------- @@ -227,6 +269,25 @@ def api_delete_online_history(): # Device Events # -------------------------- +@app.route("/events/create/", methods=["POST"]) +def api_create_event(mac): + if not is_authorized(): + return jsonify({"error": "Forbidden"}), 403 + + data = request.json or {} + ip = data.get("ip", "0.0.0.0") + event_type = data.get("event_type", "Device Down") + additional_info = data.get("additional_info", "") + pending_alert = data.get("pending_alert", 1) + event_time = data.get("event_time", None) + + # Call the helper to insert into DB + create_event(mac, ip, event_type, additional_info, pending_alert, event_time) + + # Return consistent JSON response + return jsonify({"success": True, "message": f"Event created for {mac}"}) + + @app.route("/events/", methods=["DELETE"]) def api_events_by_mac(mac): if not is_authorized(): @@ -244,8 +305,7 @@ def api_get_events(): if not is_authorized(): return jsonify({"error": "Forbidden"}), 403 - mac = request.json.get("mac") if request.is_json else None - + mac = request.args.get("mac") return get_events(mac) @app.route("/events/30days", methods=["DELETE"]) @@ -254,19 +314,74 @@ def api_delete_old_events(): return jsonify({"error": "Forbidden"}), 403 return delete_events_30() +# -------------------------- +# Sessions +# -------------------------- + +@app.route("/sessions/create", methods=["POST"]) +def api_create_session(): + if not is_authorized(): + return jsonify({"error": "Forbidden"}), 403 + + data = request.json + mac = data.get("mac") + ip = data.get("ip") + start_time = data.get("start_time") + end_time = data.get("end_time") + event_type_conn = data.get("event_type_conn", "Connected") + event_type_disc = data.get("event_type_disc", "Disconnected") + + if not mac or not ip or not start_time: + return jsonify({"success": False, "error": "Missing required parameters"}), 400 + + return create_session(mac, ip, start_time, end_time, event_type_conn, event_type_disc) + + +@app.route("/sessions/delete", methods=["DELETE"]) +def api_delete_session(): + if not is_authorized(): + return jsonify({"error": "Forbidden"}), 403 + + mac = request.json.get("mac") if request.is_json else None + if not mac: + return jsonify({"success": False, "error": "Missing MAC parameter"}), 400 + + return delete_session(mac) + + +@app.route("/sessions/list", methods=["GET"]) +def api_get_sessions(): + if not is_authorized(): + return jsonify({"error": "Forbidden"}), 403 + + mac = request.args.get("mac") + start_date = request.args.get("start_date") + end_date = request.args.get("end_date") + + return get_sessions(mac, start_date, end_date) + +@app.route("/sessions/calendar", methods=["GET"]) +def api_get_sessions_calendar(): + if not is_authorized(): + return jsonify({"error": "Forbidden"}), 403 + + # Query params: /sessions/calendar?start=2025-08-01&end=2025-08-21 + start_date = request.args.get("start") + end_date = request.args.get("end") + + return get_sessions_calendar(start_date, end_date) + + # -------------------------- # Prometheus metrics endpoint # -------------------------- @app.route("/metrics") def metrics(): - - # Check for API token in headers if not is_authorized(): return jsonify({"error": "Forbidden"}), 403 - # Return Prometheus metrics as plain text - return Response(getMetricStats(), mimetype="text/plain") + return Response(get_metric_stats(), mimetype="text/plain") # -------------------------- # SYNC endpoint diff --git a/server/api_server/events_endpoint.py b/server/api_server/events_endpoint.py index 0731218d..d884e226 100755 --- a/server/api_server/events_endpoint.py +++ b/server/api_server/events_endpoint.py @@ -14,7 +14,7 @@ INSTALL_PATH="/app" sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"]) from database import get_temp_db_connection -from helper import is_random_mac, format_date, get_setting_value +from helper import is_random_mac, format_date, get_setting_value, format_date_iso, format_event_date, timeNowTZ, mylog, ensure_datetime from db.db_helper import row_to_json @@ -22,6 +22,38 @@ from db.db_helper import row_to_json # Events Endpoints Functions # -------------------------- + +def create_event( + mac: str, + ip: str, + event_type: str = "Device Down", + additional_info: str = "", + pending_alert: int = 1, + event_time: datetime | None = None +): + """ + Insert a single event into the Events table and return a standardized JSON response. + Exceptions will propagate to the caller. + """ + conn = get_temp_db_connection() + cur = conn.cursor() + if isinstance(event_time, str): + start_time = ensure_datetime(event_time) + + start_time = ensure_datetime(event_time) + + cur.execute(""" + INSERT INTO Events (eve_MAC, eve_IP, eve_DateTime, eve_EventType, eve_AdditionalInfo, eve_PendingAlertEmail) + VALUES (?, ?, ?, ?, ?, ?) + """, (mac, ip, start_time, event_type, additional_info, pending_alert)) + + conn.commit() + conn.close() + + mylog("debug", f"[Events] Created event for {mac} ({event_type})") + return jsonify({"success": True, "message": f"Created event for {mac}"}) + + def get_events(mac=None): """ Fetch all events, or events for a specific MAC if provided. @@ -49,7 +81,7 @@ def delete_events_30(): conn = get_temp_db_connection() cur = conn.cursor() - sql = "DELETE FROM Events WHERE eve_DateTime <= date('now', '-30 day')" + sql = "DELETE FROM Events WHERE eve_DateTime <= date('now', '-30 days')" cur.execute(sql) conn.commit() conn.close() @@ -70,3 +102,4 @@ def delete_events(): return jsonify({"success": True, "message": "Deleted all events"}) + diff --git a/server/api_server/graphql_endpoint.py b/server/api_server/graphql_endpoint.py index ff6c20db..45da39f0 100755 --- a/server/api_server/graphql_endpoint.py +++ b/server/api_server/graphql_endpoint.py @@ -113,6 +113,7 @@ class Query(ObjectType): try: with open(folder + 'table_devices.json', 'r') as f: devices_data = json.load(f)["data"] + total_count = len(devices_data) except (FileNotFoundError, json.JSONDecodeError) as e: mylog('none', f'[graphql_schema] Error loading devices data: {e}') return DeviceResult(devices=[], count=0) diff --git a/server/api_server/nettools_endpoint.py b/server/api_server/nettools_endpoint.py index 00f3b6cd..d44e04eb 100755 --- a/server/api_server/nettools_endpoint.py +++ b/server/api_server/nettools_endpoint.py @@ -95,4 +95,128 @@ def speedtest(): "success": False, "error": "Speedtest failed", "details": e.stderr.strip() - }), 500 \ No newline at end of file + }), 500 + + +def nslookup(ip): + """ + Run an nslookup on the given IP address. + Returns JSON with the lookup output or error. + """ + # Validate IP + try: + ipaddress.ip_address(ip) + except ValueError: + return jsonify({ + "success": False, + "error": "Invalid IP address" + }), 400 + + try: + # Run nslookup command + result = subprocess.run( + ["nslookup", ip], + capture_output=True, + text=True, + check=True + ) + + output_lines = result.stdout.strip().split("\n") + return jsonify({"success": True, "output": output_lines}) + + except subprocess.CalledProcessError as e: + return jsonify({ + "success": False, + "error": "nslookup failed", + "details": e.stderr.strip() + }), 500 + + +def nmap_scan(ip, mode): + """ + Run an nmap scan on the given IP address with the requested mode. + Modes supported: + - "fast" → nmap -F + - "normal" → nmap + - "detail" → nmap -A + - "skipdiscovery" → nmap -Pn + Returns JSON with the scan output or error. + """ + # Validate IP + try: + ipaddress.ip_address(ip) + except ValueError: + return jsonify({ + "success": False, + "error": "Invalid IP address" + }), 400 + + # Map scan modes to nmap arguments + mode_args = { + "fast": ["-F"], + "normal": [], + "detail": ["-A"], + "skipdiscovery": ["-Pn"] + } + + if mode not in mode_args: + return jsonify({ + "success": False, + "error": f"Invalid scan mode '{mode}'" + }), 400 + + try: + # Build and run nmap command + cmd = ["nmap"] + mode_args[mode] + [ip] + result = subprocess.run( + cmd, + capture_output=True, + text=True, + check=True + ) + + output_lines = result.stdout.strip().split("\n") + return jsonify({ + "success": True, + "mode": mode, + "ip": ip, + "output": output_lines + }) + + except subprocess.CalledProcessError as e: + return jsonify({ + "success": False, + "error": "nmap scan failed", + "details": e.stderr.strip() + }), 500 + + +def internet_info(): + """ + API endpoint to fetch internet info using ipinfo.io. + Returns JSON with the info or error. + """ + try: + # Perform the request via curl + result = subprocess.run( + ["curl", "-s", "https://ipinfo.io"], + capture_output=True, + text=True, + check=True + ) + + output = result.stdout.strip() + if not output: + raise ValueError("Empty response from ipinfo.io") + + # Clean up the JSON-like string by removing { } , and " + cleaned_output = output.replace("{", "").replace("}", "").replace(",", "").replace('"', "") + + return jsonify({"success": True, "output": cleaned_output}) + + except (subprocess.CalledProcessError, ValueError) as e: + return jsonify({ + "success": False, + "error": "Failed to fetch internet info", + "details": str(e) + }), 500 diff --git a/server/api_server/prometheus_endpoint.py b/server/api_server/prometheus_endpoint.py index de9d8fcf..8204636e 100755 --- a/server/api_server/prometheus_endpoint.py +++ b/server/api_server/prometheus_endpoint.py @@ -18,7 +18,7 @@ def escape_label_value(val): # Define a base URL with the user's home directory folder = apiPath -def getMetricStats(): +def get_metric_stats(): output = [] # 1. Dashboard totals diff --git a/server/api_server/sessions_endpoint.py b/server/api_server/sessions_endpoint.py new file mode 100755 index 00000000..43442917 --- /dev/null +++ b/server/api_server/sessions_endpoint.py @@ -0,0 +1,172 @@ +#!/usr/bin/env python + +import json +import subprocess +import argparse +import os +import pathlib +import sys +from datetime import datetime +from flask import jsonify, request + +# Register NetAlertX directories +INSTALL_PATH="/app" +sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"]) + +from database import get_temp_db_connection +from helper import is_random_mac, format_date, get_setting_value, format_date_iso, format_event_date, mylog, timeNowTZ +from db.db_helper import row_to_json + + +# -------------------------- +# Sessions Endpoints Functions +# -------------------------- +# ------------------------------------------------------------------------------------------- +def create_session(mac, ip, start_time, end_time=None, event_type_conn="Connected", event_type_disc="Disconnected"): + """Insert a new session into Sessions table""" + conn = get_temp_db_connection() + cur = conn.cursor() + + cur.execute(""" + INSERT INTO Sessions (ses_MAC, ses_IP, ses_DateTimeConnection, ses_DateTimeDisconnection, + ses_EventTypeConnection, ses_EventTypeDisconnection) + VALUES (?, ?, ?, ?, ?, ?) + """, (mac, ip, start_time, end_time, event_type_conn, event_type_disc)) + + conn.commit() + conn.close() + + return jsonify({"success": True, "message": f"Session created for MAC {mac}"}) + + +# ------------------------------------------------------------------------------------------- +def delete_session(mac): + """Delete all sessions for a given MAC""" + conn = get_temp_db_connection() + cur = conn.cursor() + + cur.execute("DELETE FROM Sessions WHERE ses_MAC = ?", (mac,)) + conn.commit() + conn.close() + + return jsonify({"success": True, "message": f"Deleted sessions for MAC {mac}"}) + + +# ------------------------------------------------------------------------------------------- +def get_sessions(mac=None, start_date=None, end_date=None): + """Retrieve sessions optionally filtered by MAC and date range""" + conn = get_temp_db_connection() + cur = conn.cursor() + + sql = "SELECT * FROM Sessions WHERE 1=1" + params = [] + + if mac: + sql += " AND ses_MAC = ?" + params.append(mac) + if start_date: + sql += " AND ses_DateTimeConnection >= ?" + params.append(start_date) + if end_date: + sql += " AND ses_DateTimeDisconnection <= ?" + params.append(end_date) + + cur.execute(sql, tuple(params)) + rows = cur.fetchall() + conn.close() + + # Convert rows to list of dicts + table_data = [dict(r) for r in rows] + + return jsonify({"success": True, "sessions": table_data}) + + + +def get_sessions_calendar(start_date, end_date): + """ + Fetch sessions between a start and end date for calendar display. + Returns JSON list of calendar sessions. + """ + + if not start_date or not end_date: + return jsonify({"success": False, "error": "Missing start or end date"}), 400 + + conn = get_temp_db_connection() + cur = conn.cursor() + + sql = """ + -- Correct missing connection/disconnection sessions: + -- If ses_EventTypeConnection is missing, backfill from last disconnection + -- If ses_EventTypeDisconnection is missing, forward-fill from next connection + + SELECT + SES1.ses_MAC, SES1.ses_EventTypeConnection, SES1.ses_DateTimeConnection, + SES1.ses_EventTypeDisconnection, SES1.ses_DateTimeDisconnection, SES1.ses_IP, + SES1.ses_AdditionalInfo, SES1.ses_StillConnected, + + CASE + WHEN SES1.ses_EventTypeConnection = '' THEN + IFNULL( + (SELECT MAX(SES2.ses_DateTimeDisconnection) + FROM Sessions AS SES2 + WHERE SES2.ses_MAC = SES1.ses_MAC + AND SES2.ses_DateTimeDisconnection < SES1.ses_DateTimeDisconnection + AND SES2.ses_DateTimeDisconnection BETWEEN Date(?) AND Date(?) + ), + DATETIME(SES1.ses_DateTimeDisconnection, '-1 hour') + ) + ELSE SES1.ses_DateTimeConnection + END AS ses_DateTimeConnectionCorrected, + + CASE + WHEN SES1.ses_EventTypeDisconnection = '' THEN + (SELECT MIN(SES2.ses_DateTimeConnection) + FROM Sessions AS SES2 + WHERE SES2.ses_MAC = SES1.ses_MAC + AND SES2.ses_DateTimeConnection > SES1.ses_DateTimeConnection + AND SES2.ses_DateTimeConnection BETWEEN Date(?) AND Date(?) + ) + ELSE SES1.ses_DateTimeDisconnection + END AS ses_DateTimeDisconnectionCorrected + + FROM Sessions AS SES1 + WHERE (SES1.ses_DateTimeConnection BETWEEN Date(?) AND Date(?)) + OR (SES1.ses_DateTimeDisconnection BETWEEN Date(?) AND Date(?)) + OR SES1.ses_StillConnected = 1 + """ + + cur.execute(sql, (start_date, end_date, start_date, end_date, start_date, end_date, start_date, end_date)) + rows = cur.fetchall() + + table_data = [] + for r in rows: + row = dict(r) + + # Determine color + if row["ses_EventTypeConnection"] == "" or row["ses_EventTypeDisconnection"] == "": + color = "#f39c12" + elif row["ses_StillConnected"] == 1: + color = "#00a659" + else: + color = "#0073b7" + + # Tooltip + tooltip = ( + f"Connection: {format_event_date(row['ses_DateTimeConnection'], row['ses_EventTypeConnection'])}\n" + f"Disconnection: {format_event_date(row['ses_DateTimeDisconnection'], row['ses_EventTypeDisconnection'])}\n" + f"IP: {row['ses_IP']}" + ) + + # Append calendar entry + table_data.append({ + "resourceId": row["ses_MAC"], + "title": "", + "start": format_date_iso(row["ses_DateTimeConnectionCorrected"]), + "end": format_date_iso(row["ses_DateTimeDisconnectionCorrected"]), + "color": color, + "tooltip": tooltip, + "className": "no-border" + }) + + conn.close() + return jsonify({"success": True, "sessions": table_data}) \ No newline at end of file diff --git a/server/helper.py b/server/helper.py index 7eb3fcc3..630e6fc9 100755 --- a/server/helper.py +++ b/server/helper.py @@ -7,6 +7,7 @@ import os import re import unicodedata import subprocess +from typing import Union import pytz from pytz import timezone import json @@ -52,6 +53,55 @@ def get_timezone_offset(): return offset_formatted +#------------------------------------------------------------------------------- +# Date and time methods +#------------------------------------------------------------------------------- + +# ------------------------------------------------------------------------------------------- +def format_date(date_str: str) -> str: + """Format a date string as 'YYYY-MM-DD HH:MM'""" + dt = datetime.datetime.fromisoformat(date_str) if isinstance(date_str, str) else date_str + return dt.strftime('%Y-%m-%d %H:%M') + +# ------------------------------------------------------------------------------------------- +def format_date_diff(date1: str, date2: str) -> str: + """Return difference between two dates formatted as 'Xd HH:MM'""" + dt1 = datetime.datetime.fromisoformat(date1) if isinstance(date1, str) else date1 + dt2 = datetime.datetime.fromisoformat(date2) if isinstance(date2, str) else date2 + delta = dt2 - dt1 + + days = delta.days + hours, remainder = divmod(delta.seconds, 3600) + minutes = remainder // 60 + + return f"{days}d {hours:02}:{minutes:02}" + +# ------------------------------------------------------------------------------------------- +def format_date_iso(date1: str) -> str: + """Return ISO 8601 string for a date or None if empty""" + if date1 is None: + return None + dt = datetime.datetime.fromisoformat(date1) if isinstance(date1, str) else date1 + return dt.isoformat() + +# ------------------------------------------------------------------------------------------- +def format_event_date(date_str: str, event_type: str) -> str: + """Format event date with fallback rules.""" + if date_str: + return format_date(date_str) + elif event_type == "": + return "" + else: + return "" + +# ------------------------------------------------------------------------------------------- +def ensure_datetime(dt: Union[str, datetime, None]) -> datetime: + if dt is None: + return timeNowTZ() + if isinstance(dt, str): + return datetime.datetime.fromisoformat(dt) + return dt + #------------------------------------------------------------------------------- # File system permission handling #------------------------------------------------------------------------------- @@ -592,35 +642,6 @@ def collect_lang_strings(json, pref, stringSqlParams): return stringSqlParams -#------------------------------------------------------------------------------- -# Date and time methods -#------------------------------------------------------------------------------- - -# ------------------------------------------------------------------------------------------- -def format_date(date_str: str) -> str: - """Format a date string as 'YYYY-MM-DD HH:MM'""" - dt = datetime.datetime.fromisoformat(date_str) if isinstance(date_str, str) else date_str - return dt.strftime('%Y-%m-%d %H:%M') - -# ------------------------------------------------------------------------------------------- -def format_date_diff(date1: str, date2: str) -> str: - """Return difference between two dates formatted as 'Xd HH:MM'""" - dt1 = datetime.datetime.fromisoformat(date1) if isinstance(date1, str) else date1 - dt2 = datetime.datetime.fromisoformat(date2) if isinstance(date2, str) else date2 - delta = dt2 - dt1 - - days = delta.days - hours, remainder = divmod(delta.seconds, 3600) - minutes = remainder // 60 - - return f"{days}d {hours:02}:{minutes:02}" - -# ------------------------------------------------------------------------------------------- -def format_date_iso(date1: str) -> str: - """Return ISO 8601 string for a date""" - dt = datetime.datetime.fromisoformat(date1) if isinstance(date1, str) else date1 - return dt.isoformat() - #------------------------------------------------------------------------------- def checkNewVersion(): mylog('debug', [f"[Version check] Checking if new version available"]) diff --git a/test/test_events_endpoints.py b/test/test_events_endpoints.py index c0076f7f..d3d7ffa9 100755 --- a/test/test_events_endpoints.py +++ b/test/test_events_endpoints.py @@ -5,6 +5,7 @@ import random import string import uuid import pytest +from datetime import datetime, timedelta INSTALL_PATH = "/app" sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"]) @@ -30,21 +31,33 @@ def auth_headers(token): return {"Authorization": f"Bearer {token}"} def create_event(client, api_token, mac, event="UnitTest Event", days_old=None): - """ - Create event using API (POST /event/). - If days_old is set, adds it to payload for backdating support. - """ - payload = { - "event": event, - } - if days_old: - payload["days_old"] = days_old - return client.post(f"/event/{mac}", json=payload, headers=auth_headers(api_token)) + payload = {"ip": "0.0.0.0", "event_type": event} + + # Calculate the event_time if days_old is given + if days_old is not None: + event_time = timeNowTZ() - timedelta(days=days_old) + # ISO 8601 string + payload["event_time"] = event_time.isoformat() + + return client.post(f"/events/create/{mac}", json=payload, headers=auth_headers(api_token)) def list_events(client, api_token, mac=None): - url = "/events" if mac is None else f"/events/{mac}" + url = "/events" if mac is None else f"/events?mac={mac}" return client.get(url, headers=auth_headers(api_token)) +def test_create_event(client, api_token, test_mac): + # create event + resp = create_event(client, api_token, test_mac) + assert resp.status_code == 200 + data = resp.get_json() + assert data.get("success") is True + + # confirm event exists + resp = list_events(client, api_token, test_mac) + assert resp.status_code == 200 + events = resp.get_json().get("events", []) + assert any(ev.get("eve_MAC") == test_mac for ev in events) + def test_delete_events_for_mac(client, api_token, test_mac): # create event @@ -54,7 +67,8 @@ def test_delete_events_for_mac(client, api_token, test_mac): # confirm exists resp = list_events(client, api_token, test_mac) assert resp.status_code == 200 - assert any(ev["eve_MAC"] == test_mac for ev in resp.json) + events = resp.json.get("events", []) + assert any(ev["eve_MAC"] == test_mac for ev in events) # delete resp = client.delete(f"/events/{test_mac}", headers=auth_headers(api_token)) @@ -64,7 +78,7 @@ def test_delete_events_for_mac(client, api_token, test_mac): # confirm deleted resp = list_events(client, api_token, test_mac) assert resp.status_code == 200 - assert len(resp.json) == 0 + assert len(resp.json.get("events", [])) == 0 def test_delete_all_events(client, api_token, test_mac): @@ -82,7 +96,7 @@ def test_delete_all_events(client, api_token, test_mac): # confirm no events resp = list_events(client, api_token) - assert len(resp.json) == 0 + assert len(resp.json.get("events", [])) == 0 def test_delete_events_30days(client, api_token, test_mac): @@ -100,5 +114,7 @@ def test_delete_events_30days(client, api_token, test_mac): # confirm only recent remains resp = list_events(client, api_token, test_mac) - mac_events = [ev for ev in resp.json if ev["eve_MAC"] == test_mac] + events = resp.get_json().get("events", []) + mac_events = [ev for ev in events if ev.get("eve_MAC") == test_mac] assert len(mac_events) == 1 + diff --git a/test/test_graphq_endpoints.py b/test/test_graphq_endpoints.py new file mode 100755 index 00000000..222deb3a --- /dev/null +++ b/test/test_graphq_endpoints.py @@ -0,0 +1,81 @@ +import sys +import pathlib +import sqlite3 +import random +import string +import uuid +import pytest +from datetime import datetime, timedelta + +INSTALL_PATH = "/app" +sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"]) + +from helper import timeNowTZ, get_setting_value +from api_server.api_server_start import app + +@pytest.fixture(scope="session") +def api_token(): + return get_setting_value("API_TOKEN") + +@pytest.fixture +def client(): + with app.test_client() as client: + yield client + +@pytest.fixture +def test_mac(): + # Generate a unique MAC for each test run + return "AA:BB:CC:" + ":".join(f"{random.randint(0,255):02X}" for _ in range(3)) + +def auth_headers(token): + return {"Authorization": f"Bearer {token}"} + + +def test_graphql_debug_get(client): + """GET /graphql should return the debug string""" + resp = client.get("/graphql") + assert resp.status_code == 200 + assert resp.data.decode() == "NetAlertX GraphQL server running." + +def test_graphql_post_unauthorized(client): + """POST /graphql without token should return 401""" + query = {"query": "{ devices { devName devMac } }"} + resp = client.post("/graphql", json=query) + assert resp.status_code == 401 + assert "Unauthorized access attempt" in resp.json.get("error", "") + +def test_graphql_post_devices(client, api_token): + """POST /graphql with a valid token should return device data""" + query = { + "query": """ + { + devices { + devices { devName devMac devIsRandomMac devParentChildrenCount } + count + } + } + """ + } + resp = client.post("/graphql", json=query, headers=auth_headers(api_token)) + assert resp.status_code == 200 + data = resp.json.get("data", {}) + assert "devices" in data + assert isinstance(data["devices"]["devices"], list) + +def test_graphql_post_settings(client, api_token): + """POST /graphql should return settings data""" + query = { + "query": """ + { + settings { + settings { setKey setValue setGroup } + count + } + } + """ + } + resp = client.post("/graphql", json=query, headers=auth_headers(api_token)) + assert resp.status_code == 200 + data = resp.json.get("data", {}) + assert "settings" in data + assert isinstance(data["settings"]["settings"], list) diff --git a/test/test_nettools_endpoints.py b/test/test_nettools_endpoints.py index 1ed50928..1cde5e55 100755 --- a/test/test_nettools_endpoints.py +++ b/test/test_nettools_endpoints.py @@ -126,4 +126,78 @@ def test_traceroute_device(client, api_token, test_mac): assert "output" in data assert isinstance(data["output"], str) +@pytest.mark.parametrize("ip,expected_status", [ + ("8.8.8.8", 200), + ("256.256.256.256", 400), # Invalid IP + ("", 400), # Missing IP +]) +def test_nslookup_endpoint(client, api_token, ip, expected_status): + payload = {"devLastIP": ip} if ip else {} + resp = client.post("/nettools/nslookup", json=payload, headers=auth_headers(api_token)) + assert resp.status_code == expected_status + data = resp.json + + if expected_status == 200: + assert data.get("success") is True + assert isinstance(data["output"], list) + assert all(isinstance(line, str) for line in data["output"]) + else: + assert data.get("success") is False + assert "error" in data + +@pytest.mark.parametrize("ip,mode,expected_status", [ + ("127.0.0.1", "fast", 200), + ("127.0.0.1", "normal", 200), + ("127.0.0.1", "detail", 200), + ("127.0.0.1", "skipdiscovery", 200), + ("127.0.0.1", "invalidmode", 400), + ("999.999.999.999", "fast", 400), +]) +def test_nmap_endpoint(client, api_token, ip, mode, expected_status): + payload = {"scan": ip, "mode": mode} + resp = client.post("/nettools/nmap", json=payload, headers=auth_headers(api_token)) + + assert resp.status_code == expected_status + data = resp.json + + if expected_status == 200: + assert data.get("success") is True + assert data.get("mode") == mode + assert data.get("ip") == ip + assert isinstance(data["output"], list) + assert all(isinstance(line, str) for line in data["output"]) + else: + assert data.get("success") is False + assert "error" in data + +def test_nslookup_unauthorized(client): + # No auth headers + resp = client.post("/nettools/nslookup", json={"devLastIP": "8.8.8.8"}) + assert resp.status_code == 403 + data = resp.json + assert data.get("success") is False + assert data.get("error") == "Forbidden" + +def test_nmap_unauthorized(client): + # No auth headers + resp = client.post("/nettools/nmap", json={"scan": "127.0.0.1", "mode": "fast"}) + assert resp.status_code == 403 + data = resp.json + assert data.get("success") is False + assert data.get("error") == "Forbidden" + + +def test_internet_info_endpoint(client, api_token): + resp = client.get("/nettools/internetinfo", headers=auth_headers(api_token)) + data = resp.json + + if resp.status_code == 200: + assert data.get("success") is True + assert isinstance(data.get("output"), str) + assert len(data["output"]) > 0 # ensure output is not empty + else: + # Handle errors, e.g., curl failure + assert data.get("success") is False + assert "error" in data + assert "details" in data \ No newline at end of file diff --git a/test/test_sessions_endpoints.py b/test/test_sessions_endpoints.py new file mode 100755 index 00000000..bcf40a21 --- /dev/null +++ b/test/test_sessions_endpoints.py @@ -0,0 +1,181 @@ +import sys +import pathlib +import sqlite3 +import random +import string +import uuid +import pytest +from datetime import datetime, timedelta + +INSTALL_PATH = "/app" +sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"]) + +from helper import timeNowTZ, get_setting_value +from api_server.api_server_start import app + +@pytest.fixture(scope="session") +def api_token(): + return get_setting_value("API_TOKEN") + +@pytest.fixture +def client(): + with app.test_client() as client: + yield client + +@pytest.fixture +def test_mac(): + # Generate a unique MAC for each test run + return "AA:BB:CC:" + ":".join(f"{random.randint(0,255):02X}" for _ in range(3)) + +def auth_headers(token): + return {"Authorization": f"Bearer {token}"} + +def test_create_device(client, api_token, test_mac): + payload = { + "createNew": True, + "devType": "Test Device", + "devOwner": "Unit Test", + "devType": "Router", + "devVendor": "TestVendor", + } + resp = client.post(f"/device/{test_mac}", json=payload, headers=auth_headers(api_token)) + assert resp.status_code == 200 + assert resp.json.get("success") is True + + +# ----------------------------- +# CREATE SESSION +# ----------------------------- +def test_create_session(client, api_token, test_mac): + payload = { + "mac": test_mac, + "ip": "192.168.1.100", + "start_time": timeNowTZ(), + "event_type_conn": "Connected", + "event_type_disc": "Disconnected" + } + resp = client.post("/sessions/create", json=payload, headers=auth_headers(api_token)) + assert resp.status_code == 200 + assert resp.json.get("success") is True + + +# ----------------------------- +# LIST SESSIONS +# ----------------------------- +def test_list_sessions(client, api_token, test_mac): + # Ensure at least one session exists + payload = { + "mac": test_mac, + "ip": "192.168.1.100", + "start_time": timeNowTZ() + } + client.post("/sessions/create", json=payload, headers=auth_headers(api_token)) + + # List sessions for MAC + resp = client.get(f"/sessions/list?mac={test_mac}", headers=auth_headers(api_token)) + assert resp.status_code == 200 + assert resp.json.get("success") is True + sessions = resp.json.get("sessions") + assert any(ses["ses_MAC"] == test_mac for ses in sessions) + + +# ----------------------------- +# DELETE SESSION +# ----------------------------- +def test_delete_session(client, api_token, test_mac): + # First create session + payload = { + "mac": test_mac, + "ip": "192.168.1.100", + "start_time": timeNowTZ() + } + client.post("/sessions/create", json=payload, headers=auth_headers(api_token)) + + # Delete session + resp = client.delete("/sessions/delete", json={"mac": test_mac}, headers=auth_headers(api_token)) + assert resp.status_code == 200 + assert resp.json.get("success") is True + + # Confirm deletion + resp = client.get(f"/sessions/list?mac={test_mac}", headers=auth_headers(api_token)) + sessions = resp.json.get("sessions") + assert not any(ev["ses_MAC"] == test_mac for ses in sessions) + + + +def test_get_sessions_calendar(client, api_token, test_mac): + """ + Test the /sessions/calendar endpoint. + Creates session and ensures the calendar output is correct. + Cleans up test sessions after test. + """ + + + # --- Setup: create two sessions for the test MAC --- + now = timeNowTZ() + start1 = (now - timedelta(days=2)).isoformat(timespec="seconds") + end1 = (now - timedelta(days=1, hours=20)).isoformat(timespec="seconds") + + start2 = (now - timedelta(days=1)).isoformat(timespec="seconds") + end2 = (now - timedelta(hours=20)).isoformat(timespec="seconds") + + # Create sessions using your endpoint + client.post("/sessions/create", json={ + "mac": test_mac, + "ip": "192.168.1.100", + "start_time": start1, + "end_time": end1, + "event_type_conn": "connect", + "event_type_disc": "disconnect" + }, headers=auth_headers(api_token)) + + client.post("/sessions/create", json={ + "mac": test_mac, + "ip": "192.168.1.100", + "start_time": start2, + "end_time": end2, + "event_type_conn": "connect", + "event_type_disc": "disconnect" + }, headers=auth_headers(api_token)) + + # --- Call the /sessions/calendar API --- + start_date = (now - timedelta(days=3)).strftime("%Y-%m-%d") + end_date = (now + timedelta(days=1)).strftime("%Y-%m-%d") + + resp = client.get( + f"/sessions/calendar?start={start_date}&end={end_date}", + headers=auth_headers(api_token) + ) + + assert resp.status_code == 200 + data = resp.json + assert data.get("success") is True + assert "sessions" in data + sessions = data["sessions"] + + # --- Verify calendar sessions --- + assert len(sessions) >= 2 # at least our two sessions + + # Check expected keys + expected_keys = {"resourceId", "title", "start", "end", "color", "tooltip", "className"} + for ses in sessions: + assert set(ses.keys()) == expected_keys + + # Check that all sessions belong to the test MAC + mac_sessions = [ses for ses in sessions if ses["resourceId"] == test_mac] + assert len(mac_sessions) == 2 # or exact number if you know it + + # Check ISO date formatting for start/end + for ses in mac_sessions: + # start must always be present + assert ses["start"] is not None, f"Session start is None: {ses}" + datetime.fromisoformat(ses["start"]) + + # end can be None only if tooltip mentions "" + if ses["end"] is not None: + datetime.fromisoformat(ses["end"]) + else: + assert "" in ses["tooltip"], f"End is None but session not marked as still connected: {ses}" + + # --- Cleanup: delete all test sessions for this MAC --- + client.delete(f"/sessions/delete?mac={test_mac}", headers=auth_headers(api_token)) \ No newline at end of file