mirror of
https://github.com/jokob-sk/NetAlertX.git
synced 2026-04-12 05:01:27 -07:00
Compare commits
10 Commits
5fd30fe3c8
...
f78c84d9a8
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f78c84d9a8 | ||
|
|
2d11d3dd3e | ||
|
|
39c556576c | ||
|
|
73fd094cfc | ||
|
|
cbf2cd0ee8 | ||
|
|
915bb523d6 | ||
|
|
3dc87d2adb | ||
|
|
9155303674 | ||
|
|
0777824d96 | ||
|
|
b170ca3e18 |
@@ -683,7 +683,9 @@ function initializeDatatable (status) {
|
||||
|
||||
return JSON.stringify(query); // Send the JSON request
|
||||
},
|
||||
"dataSrc": function (json) {
|
||||
"dataSrc": function (res) {
|
||||
|
||||
json = res["data"];
|
||||
// Set the total number of records for pagination
|
||||
json.recordsTotal = json.devices.count || 0;
|
||||
json.recordsFiltered = json.devices.count || 0;
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
<?php
|
||||
require dirname(__FILE__).'/../server/init.php';
|
||||
|
||||
// EQUIVALENT: /nettools/speedtest
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
// check if authenticated
|
||||
require_once $_SERVER['DOCUMENT_ROOT'] . '/php/templates/security.php';
|
||||
|
||||
@@ -19,6 +19,8 @@ require dirname(__FILE__).'/../server/init.php';
|
||||
// check if authenticated
|
||||
require_once $_SERVER['DOCUMENT_ROOT'] . '/php/templates/security.php';
|
||||
|
||||
// NEW ENDPOINT EQUIVALENT: /nettools/traceroute
|
||||
|
||||
// Get IP
|
||||
$ip = $_GET['ip'];
|
||||
|
||||
|
||||
@@ -199,8 +199,8 @@ $settingsJSON_DB = json_encode($settings, JSON_HEX_TAG | JSON_HEX_AMP | JSON_HEX
|
||||
console.log("Response:", response);
|
||||
|
||||
// Handle the successful response
|
||||
if (response && response.settings) {
|
||||
const settingsData = response.settings.settings;
|
||||
if (response && response.data && response.data.settings && response.data.settings.settings) {
|
||||
const settingsData = response.data.settings.settings;
|
||||
console.log("Settings:", settingsData);
|
||||
|
||||
// Wrong number of settings processing
|
||||
|
||||
@@ -249,7 +249,7 @@ function fetchUsedIps(callback) {
|
||||
|
||||
console.log(response);
|
||||
|
||||
const usedIps = (response?.devices?.devices || [])
|
||||
const usedIps = (response?.data?.devices?.devices || [])
|
||||
.map(d => d.devLastIP)
|
||||
.filter(ip => ip && ip.includes('.'));
|
||||
callback(usedIps);
|
||||
|
||||
@@ -4,10 +4,11 @@ from flask_cors import CORS
|
||||
from .graphql_endpoint import devicesSchema
|
||||
from .device_endpoint import get_device_data, set_device_data, delete_device, delete_device_events, reset_device_props, copy_device, update_device_column
|
||||
from .devices_endpoint import get_all_devices, delete_unknown_devices, delete_all_with_empty_macs, delete_devices, export_devices, import_csv, devices_totals, devices_by_status
|
||||
from .events_endpoint import delete_events, delete_events_30, get_events
|
||||
from .events_endpoint import delete_events, delete_events_older_than, get_events, create_event, get_events_totals
|
||||
from .history_endpoint import delete_online_history
|
||||
from .prometheus_endpoint import getMetricStats
|
||||
from .nettools_endpoint import wakeonlan
|
||||
from .prometheus_endpoint import get_metric_stats
|
||||
from .sessions_endpoint import get_sessions, delete_session, create_session, get_sessions_calendar, get_device_sessions, get_session_events
|
||||
from .nettools_endpoint import wakeonlan, traceroute, speedtest, nslookup, nmap_scan, internet_info
|
||||
from .sync_endpoint import handle_sync_post, handle_sync_get
|
||||
import sys
|
||||
|
||||
@@ -17,6 +18,7 @@ sys.path.extend([f"{INSTALL_PATH}/server"])
|
||||
|
||||
from logger import mylog
|
||||
from helper import get_setting_value, timeNowTZ
|
||||
from db.db_helper import get_date_from_period
|
||||
from app_state import updateState
|
||||
from messaging.in_app import write_notification
|
||||
|
||||
@@ -30,6 +32,7 @@ CORS(
|
||||
r"/devices/*": {"origins": "*"},
|
||||
r"/history/*": {"origins": "*"},
|
||||
r"/nettools/*": {"origins": "*"},
|
||||
r"/sessions/*": {"origins": "*"},
|
||||
r"/events/*": {"origins": "*"}
|
||||
},
|
||||
supports_credentials=True,
|
||||
@@ -62,8 +65,15 @@ def graphql_endpoint():
|
||||
# Execute the GraphQL query
|
||||
result = devicesSchema.execute(data.get("query"), variables=data.get("variables"))
|
||||
|
||||
# Return the result as JSON
|
||||
return jsonify(result.data)
|
||||
# Initialize response
|
||||
response = {}
|
||||
|
||||
if result.errors:
|
||||
response["errors"] = [str(e) for e in result.errors]
|
||||
if result.data:
|
||||
response["data"] = result.data
|
||||
|
||||
return jsonify(response)
|
||||
|
||||
# --------------------------
|
||||
# Device Endpoints
|
||||
@@ -200,6 +210,59 @@ def api_wakeonlan():
|
||||
mac = request.json.get("devMac")
|
||||
return wakeonlan(mac)
|
||||
|
||||
@app.route("/nettools/traceroute", methods=["POST"])
|
||||
def api_traceroute():
|
||||
if not is_authorized():
|
||||
return jsonify({"error": "Forbidden"}), 403
|
||||
ip = request.json.get("devLastIP")
|
||||
return traceroute(ip)
|
||||
|
||||
@app.route("/nettools/speedtest", methods=["GET"])
|
||||
def api_speedtest():
|
||||
if not is_authorized():
|
||||
return jsonify({"error": "Forbidden"}), 403
|
||||
return speedtest()
|
||||
|
||||
@app.route("/nettools/nslookup", methods=["POST"])
|
||||
def api_nslookup():
|
||||
"""
|
||||
API endpoint to handle nslookup requests.
|
||||
Expects JSON with 'devLastIP'.
|
||||
"""
|
||||
if not is_authorized():
|
||||
return jsonify({"success": False, "error": "Forbidden"}), 403
|
||||
|
||||
data = request.get_json(silent=True)
|
||||
if not data or "devLastIP" not in data:
|
||||
return jsonify({"success": False, "error": "Missing 'devLastIP'"}), 400
|
||||
|
||||
ip = data["devLastIP"]
|
||||
return nslookup(ip)
|
||||
|
||||
@app.route("/nettools/nmap", methods=["POST"])
|
||||
def api_nmap():
|
||||
"""
|
||||
API endpoint to handle nmap scan requests.
|
||||
Expects JSON with 'scan' (IP address) and 'mode' (scan mode).
|
||||
"""
|
||||
if not is_authorized():
|
||||
return jsonify({"success": False, "error": "Forbidden"}), 403
|
||||
|
||||
data = request.get_json(silent=True)
|
||||
if not data or "scan" not in data or "mode" not in data:
|
||||
return jsonify({"success": False, "error": "Missing 'scan' or 'mode'"}), 400
|
||||
|
||||
ip = data["scan"]
|
||||
mode = data["mode"]
|
||||
return nmap_scan(ip, mode)
|
||||
|
||||
|
||||
@app.route("/nettools/internetinfo", methods=["GET"])
|
||||
def api_internet_info():
|
||||
if not is_authorized():
|
||||
return jsonify({"success": False, "error": "Forbidden"}), 403
|
||||
return internet_info()
|
||||
|
||||
# --------------------------
|
||||
# Online history
|
||||
# --------------------------
|
||||
@@ -214,6 +277,25 @@ def api_delete_online_history():
|
||||
# Device Events
|
||||
# --------------------------
|
||||
|
||||
@app.route("/events/create/<mac>", methods=["POST"])
|
||||
def api_create_event(mac):
|
||||
if not is_authorized():
|
||||
return jsonify({"error": "Forbidden"}), 403
|
||||
|
||||
data = request.json or {}
|
||||
ip = data.get("ip", "0.0.0.0")
|
||||
event_type = data.get("event_type", "Device Down")
|
||||
additional_info = data.get("additional_info", "")
|
||||
pending_alert = data.get("pending_alert", 1)
|
||||
event_time = data.get("event_time", None)
|
||||
|
||||
# Call the helper to insert into DB
|
||||
create_event(mac, ip, event_type, additional_info, pending_alert, event_time)
|
||||
|
||||
# Return consistent JSON response
|
||||
return jsonify({"success": True, "message": f"Event created for {mac}"})
|
||||
|
||||
|
||||
@app.route("/events/<mac>", methods=["DELETE"])
|
||||
def api_events_by_mac(mac):
|
||||
if not is_authorized():
|
||||
@@ -231,29 +313,112 @@ def api_get_events():
|
||||
if not is_authorized():
|
||||
return jsonify({"error": "Forbidden"}), 403
|
||||
|
||||
mac = request.json.get("mac") if request.is_json else None
|
||||
|
||||
mac = request.args.get("mac")
|
||||
return get_events(mac)
|
||||
|
||||
@app.route("/events/30days", methods=["DELETE"])
|
||||
def api_delete_old_events():
|
||||
@app.route("/events/<int:days>", methods=["DELETE"])
|
||||
def api_delete_old_events(days: int):
|
||||
"""
|
||||
Delete events older than <days> days.
|
||||
Example: DELETE /events/30
|
||||
"""
|
||||
if not is_authorized():
|
||||
return jsonify({"error": "Forbidden"}), 403
|
||||
return delete_events_30()
|
||||
|
||||
return delete_events_older_than(days)
|
||||
|
||||
@app.route("/sessions/totals", methods=["GET"])
|
||||
def api_get_events_totals():
|
||||
if not is_authorized():
|
||||
return jsonify({"error": "Forbidden"}), 403
|
||||
|
||||
period = get_date_from_period(request.args.get("period", "7 days"))
|
||||
return get_events_totals(period)
|
||||
|
||||
# --------------------------
|
||||
# Sessions
|
||||
# --------------------------
|
||||
|
||||
@app.route("/sessions/create", methods=["POST"])
|
||||
def api_create_session():
|
||||
if not is_authorized():
|
||||
return jsonify({"error": "Forbidden"}), 403
|
||||
|
||||
data = request.json
|
||||
mac = data.get("mac")
|
||||
ip = data.get("ip")
|
||||
start_time = data.get("start_time")
|
||||
end_time = data.get("end_time")
|
||||
event_type_conn = data.get("event_type_conn", "Connected")
|
||||
event_type_disc = data.get("event_type_disc", "Disconnected")
|
||||
|
||||
if not mac or not ip or not start_time:
|
||||
return jsonify({"success": False, "error": "Missing required parameters"}), 400
|
||||
|
||||
return create_session(mac, ip, start_time, end_time, event_type_conn, event_type_disc)
|
||||
|
||||
|
||||
@app.route("/sessions/delete", methods=["DELETE"])
|
||||
def api_delete_session():
|
||||
if not is_authorized():
|
||||
return jsonify({"error": "Forbidden"}), 403
|
||||
|
||||
mac = request.json.get("mac") if request.is_json else None
|
||||
if not mac:
|
||||
return jsonify({"success": False, "error": "Missing MAC parameter"}), 400
|
||||
|
||||
return delete_session(mac)
|
||||
|
||||
|
||||
@app.route("/sessions/list", methods=["GET"])
|
||||
def api_get_sessions():
|
||||
if not is_authorized():
|
||||
return jsonify({"error": "Forbidden"}), 403
|
||||
|
||||
mac = request.args.get("mac")
|
||||
start_date = request.args.get("start_date")
|
||||
end_date = request.args.get("end_date")
|
||||
|
||||
return get_sessions(mac, start_date, end_date)
|
||||
|
||||
@app.route("/sessions/calendar", methods=["GET"])
|
||||
def api_get_sessions_calendar():
|
||||
if not is_authorized():
|
||||
return jsonify({"error": "Forbidden"}), 403
|
||||
|
||||
# Query params: /sessions/calendar?start=2025-08-01&end=2025-08-21
|
||||
start_date = request.args.get("start")
|
||||
end_date = request.args.get("end")
|
||||
|
||||
return get_sessions_calendar(start_date, end_date)
|
||||
|
||||
@app.route("/sessions/<mac>", methods=["GET"])
|
||||
def api_device_sessions(mac):
|
||||
if not is_authorized():
|
||||
return jsonify({"error": "Forbidden"}), 403
|
||||
|
||||
period = request.args.get("period", "1 day")
|
||||
return get_device_sessions(mac, period)
|
||||
|
||||
@app.route("/sessions/session-events", methods=["GET"])
|
||||
def api_get_session_events():
|
||||
if not is_authorized():
|
||||
return jsonify({"error": "Forbidden"}), 403
|
||||
|
||||
session_event_type = request.args.get("type", "all")
|
||||
period = get_date_from_period(request.args.get("period", "7 days"))
|
||||
return get_session_events(session_event_type, period)
|
||||
|
||||
# --------------------------
|
||||
# Prometheus metrics endpoint
|
||||
# --------------------------
|
||||
@app.route("/metrics")
|
||||
def metrics():
|
||||
|
||||
# Check for API token in headers
|
||||
if not is_authorized():
|
||||
return jsonify({"error": "Forbidden"}), 403
|
||||
|
||||
|
||||
# Return Prometheus metrics as plain text
|
||||
return Response(getMetricStats(), mimetype="text/plain")
|
||||
return Response(get_metric_stats(), mimetype="text/plain")
|
||||
|
||||
# --------------------------
|
||||
# SYNC endpoint
|
||||
|
||||
@@ -14,14 +14,46 @@ INSTALL_PATH="/app"
|
||||
sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
|
||||
|
||||
from database import get_temp_db_connection
|
||||
from helper import is_random_mac, format_date, get_setting_value
|
||||
from db.db_helper import row_to_json
|
||||
from helper import is_random_mac, format_date, get_setting_value, format_date_iso, format_event_date, timeNowTZ, mylog, ensure_datetime
|
||||
from db.db_helper import row_to_json, get_date_from_period
|
||||
|
||||
|
||||
# --------------------------
|
||||
# Events Endpoints Functions
|
||||
# --------------------------
|
||||
|
||||
|
||||
def create_event(
|
||||
mac: str,
|
||||
ip: str,
|
||||
event_type: str = "Device Down",
|
||||
additional_info: str = "",
|
||||
pending_alert: int = 1,
|
||||
event_time: datetime | None = None
|
||||
):
|
||||
"""
|
||||
Insert a single event into the Events table and return a standardized JSON response.
|
||||
Exceptions will propagate to the caller.
|
||||
"""
|
||||
conn = get_temp_db_connection()
|
||||
cur = conn.cursor()
|
||||
if isinstance(event_time, str):
|
||||
start_time = ensure_datetime(event_time)
|
||||
|
||||
start_time = ensure_datetime(event_time)
|
||||
|
||||
cur.execute("""
|
||||
INSERT INTO Events (eve_MAC, eve_IP, eve_DateTime, eve_EventType, eve_AdditionalInfo, eve_PendingAlertEmail)
|
||||
VALUES (?, ?, ?, ?, ?, ?)
|
||||
""", (mac, ip, start_time, event_type, additional_info, pending_alert))
|
||||
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
mylog("debug", f"[Events] Created event for {mac} ({event_type})")
|
||||
return jsonify({"success": True, "message": f"Created event for {mac}"})
|
||||
|
||||
|
||||
def get_events(mac=None):
|
||||
"""
|
||||
Fetch all events, or events for a specific MAC if provided.
|
||||
@@ -43,18 +75,23 @@ def get_events(mac=None):
|
||||
conn.close()
|
||||
return jsonify({"success": True, "events": events})
|
||||
|
||||
def delete_events_30():
|
||||
"""Delete all events older than 30 days"""
|
||||
def delete_events_older_than(days):
|
||||
"""Delete all events older than a specified number of days"""
|
||||
|
||||
conn = get_temp_db_connection()
|
||||
cur = conn.cursor()
|
||||
|
||||
sql = "DELETE FROM Events WHERE eve_DateTime <= date('now', '-30 day')"
|
||||
cur.execute(sql)
|
||||
# Use a parameterized query with sqlite date function
|
||||
sql = "DELETE FROM Events WHERE eve_DateTime <= date('now', ?)"
|
||||
cur.execute(sql, [f'-{days} days'])
|
||||
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
return jsonify({"success": True, "message": "Deleted events older than 30 days"})
|
||||
return jsonify({
|
||||
"success": True,
|
||||
"message": f"Deleted events older than {days} days"
|
||||
})
|
||||
|
||||
def delete_events():
|
||||
"""Delete all events"""
|
||||
@@ -70,3 +107,40 @@ def delete_events():
|
||||
return jsonify({"success": True, "message": "Deleted all events"})
|
||||
|
||||
|
||||
|
||||
def get_events_totals(period: str = "7 days"):
|
||||
"""
|
||||
Return counts for events and sessions totals over a given period.
|
||||
period: "7 days", "1 month", "1 year", "100 years"
|
||||
"""
|
||||
# Convert period to SQLite date expression
|
||||
period_date_sql = get_date_from_period(period)
|
||||
|
||||
conn = get_temp_db_connection()
|
||||
cur = conn.cursor()
|
||||
|
||||
sql = f"""
|
||||
SELECT
|
||||
(SELECT COUNT(*) FROM Events WHERE eve_DateTime >= {period_date_sql}) AS all_events,
|
||||
(SELECT COUNT(*) FROM Sessions WHERE
|
||||
ses_DateTimeConnection >= {period_date_sql}
|
||||
OR ses_DateTimeDisconnection >= {period_date_sql}
|
||||
OR ses_StillConnected = 1
|
||||
) AS sessions,
|
||||
(SELECT COUNT(*) FROM Sessions WHERE
|
||||
(ses_DateTimeConnection IS NULL AND ses_DateTimeDisconnection >= {period_date_sql})
|
||||
OR (ses_DateTimeDisconnection IS NULL AND ses_StillConnected = 0 AND ses_DateTimeConnection >= {period_date_sql})
|
||||
) AS missing,
|
||||
(SELECT COUNT(*) FROM Events WHERE eve_DateTime >= {period_date_sql} AND eve_EventType LIKE 'VOIDED%') AS voided,
|
||||
(SELECT COUNT(*) FROM Events WHERE eve_DateTime >= {period_date_sql} AND eve_EventType LIKE 'New Device') AS new,
|
||||
(SELECT COUNT(*) FROM Events WHERE eve_DateTime >= {period_date_sql} AND eve_EventType LIKE 'Device Down') AS down
|
||||
"""
|
||||
|
||||
cur.execute(sql)
|
||||
row = cur.fetchone()
|
||||
conn.close()
|
||||
|
||||
# Return as JSON array
|
||||
result_json = [row[0], row[1], row[2], row[3], row[4], row[5]]
|
||||
return jsonify(result_json)
|
||||
|
||||
|
||||
@@ -113,6 +113,7 @@ class Query(ObjectType):
|
||||
try:
|
||||
with open(folder + 'table_devices.json', 'r') as f:
|
||||
devices_data = json.load(f)["data"]
|
||||
total_count = len(devices_data)
|
||||
except (FileNotFoundError, json.JSONDecodeError) as e:
|
||||
mylog('none', f'[graphql_schema] Error loading devices data: {e}')
|
||||
return DeviceResult(devices=[], count=0)
|
||||
@@ -124,7 +125,7 @@ class Query(ObjectType):
|
||||
device["devParentChildrenCount"] = get_number_of_children(device["devMac"], devices_data)
|
||||
device["devIpLong"] = format_ip_long(device.get("devLastIP", ""))
|
||||
|
||||
mylog('verbose', f'[graphql_schema] devices_data: {devices_data}')
|
||||
mylog('trace', f'[graphql_schema] devices_data: {devices_data}')
|
||||
|
||||
|
||||
# Apply sorting if options are provided
|
||||
@@ -133,16 +134,16 @@ class Query(ObjectType):
|
||||
# Define status-specific filtering
|
||||
if options.status:
|
||||
status = options.status
|
||||
mylog('verbose', f'[graphql_schema] Applying status filter: {status}')
|
||||
mylog('trace', f'[graphql_schema] Applying status filter: {status}')
|
||||
|
||||
# Include devices matching criteria in UI_MY_DEVICES
|
||||
allowed_statuses = get_setting_value("UI_MY_DEVICES")
|
||||
hidden_relationships = get_setting_value("UI_hide_rel_types")
|
||||
network_dev_types = get_setting_value("NETWORK_DEVICE_TYPES")
|
||||
|
||||
mylog('verbose', f'[graphql_schema] allowed_statuses: {allowed_statuses}')
|
||||
mylog('verbose', f'[graphql_schema] hidden_relationships: {hidden_relationships}')
|
||||
mylog('verbose', f'[graphql_schema] network_dev_types: {network_dev_types}')
|
||||
mylog('trace', f'[graphql_schema] allowed_statuses: {allowed_statuses}')
|
||||
mylog('trace', f'[graphql_schema] hidden_relationships: {hidden_relationships}')
|
||||
mylog('trace', f'[graphql_schema] network_dev_types: {network_dev_types}')
|
||||
|
||||
# Filtering based on the "status"
|
||||
if status == "my_devices":
|
||||
@@ -248,7 +249,7 @@ class Query(ObjectType):
|
||||
return SettingResult(settings=[], count=0)
|
||||
|
||||
|
||||
mylog('verbose', f'[graphql_schema] settings_data: {settings_data}')
|
||||
mylog('trace', f'[graphql_schema] settings_data: {settings_data}')
|
||||
|
||||
# Convert to Setting objects
|
||||
settings = [Setting(**setting) for setting in settings_data]
|
||||
|
||||
@@ -1,7 +1,13 @@
|
||||
import subprocess
|
||||
import re
|
||||
import sys
|
||||
import ipaddress
|
||||
from flask import jsonify
|
||||
|
||||
# Register NetAlertX directories
|
||||
INSTALL_PATH = "/app"
|
||||
sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
|
||||
|
||||
def wakeonlan(mac):
|
||||
|
||||
# Validate MAC
|
||||
@@ -19,3 +25,198 @@ def wakeonlan(mac):
|
||||
except subprocess.CalledProcessError as e:
|
||||
return jsonify({"success": False, "error": "Failed to send WOL packet", "details": e.stderr.strip()}), 500
|
||||
|
||||
def traceroute(ip):
|
||||
"""
|
||||
Executes a traceroute to the given IP address.
|
||||
|
||||
Parameters:
|
||||
ip (str): The target IP address to trace.
|
||||
|
||||
Returns:
|
||||
JSON response with:
|
||||
- success (bool)
|
||||
- output (str) if successful
|
||||
- error (str) and details (str) if failed
|
||||
"""
|
||||
# --------------------------
|
||||
# Step 1: Validate IP address
|
||||
# --------------------------
|
||||
try:
|
||||
ipaddress.ip_address(ip)
|
||||
except ValueError:
|
||||
# Return 400 if IP is invalid
|
||||
return jsonify({"success": False, "error": f"Invalid IP: {ip}"}), 400
|
||||
|
||||
# --------------------------
|
||||
# Step 2: Execute traceroute
|
||||
# --------------------------
|
||||
try:
|
||||
result = subprocess.run(
|
||||
["traceroute", ip], # Command and argument
|
||||
capture_output=True, # Capture stdout/stderr
|
||||
text=True, # Return output as string
|
||||
check=True # Raise CalledProcessError on non-zero exit
|
||||
)
|
||||
# Return success response with traceroute output
|
||||
return jsonify({"success": True, "output": result.stdout.strip()})
|
||||
|
||||
# --------------------------
|
||||
# Step 3: Handle command errors
|
||||
# --------------------------
|
||||
except subprocess.CalledProcessError as e:
|
||||
# Return 500 if traceroute fails
|
||||
return jsonify({
|
||||
"success": False,
|
||||
"error": "Traceroute failed",
|
||||
"details": e.stderr.strip()
|
||||
}), 500
|
||||
|
||||
|
||||
def speedtest():
|
||||
"""
|
||||
API endpoint to run a speedtest using speedtest-cli.
|
||||
Returns JSON with the test output or error.
|
||||
"""
|
||||
try:
|
||||
# Run speedtest-cli command
|
||||
result = subprocess.run(
|
||||
[f"{INSTALL_PATH}/back/speedtest-cli", "--secure", "--simple"],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
check=True
|
||||
)
|
||||
|
||||
# Return each line as a list
|
||||
output_lines = result.stdout.strip().split("\n")
|
||||
return jsonify({"success": True, "output": output_lines})
|
||||
|
||||
except subprocess.CalledProcessError as e:
|
||||
return jsonify({
|
||||
"success": False,
|
||||
"error": "Speedtest failed",
|
||||
"details": e.stderr.strip()
|
||||
}), 500
|
||||
|
||||
|
||||
def nslookup(ip):
|
||||
"""
|
||||
Run an nslookup on the given IP address.
|
||||
Returns JSON with the lookup output or error.
|
||||
"""
|
||||
# Validate IP
|
||||
try:
|
||||
ipaddress.ip_address(ip)
|
||||
except ValueError:
|
||||
return jsonify({
|
||||
"success": False,
|
||||
"error": "Invalid IP address"
|
||||
}), 400
|
||||
|
||||
try:
|
||||
# Run nslookup command
|
||||
result = subprocess.run(
|
||||
["nslookup", ip],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
check=True
|
||||
)
|
||||
|
||||
output_lines = result.stdout.strip().split("\n")
|
||||
return jsonify({"success": True, "output": output_lines})
|
||||
|
||||
except subprocess.CalledProcessError as e:
|
||||
return jsonify({
|
||||
"success": False,
|
||||
"error": "nslookup failed",
|
||||
"details": e.stderr.strip()
|
||||
}), 500
|
||||
|
||||
|
||||
def nmap_scan(ip, mode):
|
||||
"""
|
||||
Run an nmap scan on the given IP address with the requested mode.
|
||||
Modes supported:
|
||||
- "fast" → nmap -F <ip>
|
||||
- "normal" → nmap <ip>
|
||||
- "detail" → nmap -A <ip>
|
||||
- "skipdiscovery" → nmap -Pn <ip>
|
||||
Returns JSON with the scan output or error.
|
||||
"""
|
||||
# Validate IP
|
||||
try:
|
||||
ipaddress.ip_address(ip)
|
||||
except ValueError:
|
||||
return jsonify({
|
||||
"success": False,
|
||||
"error": "Invalid IP address"
|
||||
}), 400
|
||||
|
||||
# Map scan modes to nmap arguments
|
||||
mode_args = {
|
||||
"fast": ["-F"],
|
||||
"normal": [],
|
||||
"detail": ["-A"],
|
||||
"skipdiscovery": ["-Pn"]
|
||||
}
|
||||
|
||||
if mode not in mode_args:
|
||||
return jsonify({
|
||||
"success": False,
|
||||
"error": f"Invalid scan mode '{mode}'"
|
||||
}), 400
|
||||
|
||||
try:
|
||||
# Build and run nmap command
|
||||
cmd = ["nmap"] + mode_args[mode] + [ip]
|
||||
result = subprocess.run(
|
||||
cmd,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
check=True
|
||||
)
|
||||
|
||||
output_lines = result.stdout.strip().split("\n")
|
||||
return jsonify({
|
||||
"success": True,
|
||||
"mode": mode,
|
||||
"ip": ip,
|
||||
"output": output_lines
|
||||
})
|
||||
|
||||
except subprocess.CalledProcessError as e:
|
||||
return jsonify({
|
||||
"success": False,
|
||||
"error": "nmap scan failed",
|
||||
"details": e.stderr.strip()
|
||||
}), 500
|
||||
|
||||
|
||||
def internet_info():
|
||||
"""
|
||||
API endpoint to fetch internet info using ipinfo.io.
|
||||
Returns JSON with the info or error.
|
||||
"""
|
||||
try:
|
||||
# Perform the request via curl
|
||||
result = subprocess.run(
|
||||
["curl", "-s", "https://ipinfo.io"],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
check=True
|
||||
)
|
||||
|
||||
output = result.stdout.strip()
|
||||
if not output:
|
||||
raise ValueError("Empty response from ipinfo.io")
|
||||
|
||||
# Clean up the JSON-like string by removing { } , and "
|
||||
cleaned_output = output.replace("{", "").replace("}", "").replace(",", "").replace('"', "")
|
||||
|
||||
return jsonify({"success": True, "output": cleaned_output})
|
||||
|
||||
except (subprocess.CalledProcessError, ValueError) as e:
|
||||
return jsonify({
|
||||
"success": False,
|
||||
"error": "Failed to fetch internet info",
|
||||
"details": str(e)
|
||||
}), 500
|
||||
|
||||
@@ -18,7 +18,7 @@ def escape_label_value(val):
|
||||
# Define a base URL with the user's home directory
|
||||
folder = apiPath
|
||||
|
||||
def getMetricStats():
|
||||
def get_metric_stats():
|
||||
output = []
|
||||
|
||||
# 1. Dashboard totals
|
||||
|
||||
383
server/api_server/sessions_endpoint.py
Executable file
383
server/api_server/sessions_endpoint.py
Executable file
@@ -0,0 +1,383 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
import json
|
||||
import subprocess
|
||||
import argparse
|
||||
import os
|
||||
import pathlib
|
||||
import sqlite3
|
||||
import time
|
||||
import sys
|
||||
from datetime import datetime
|
||||
from flask import jsonify, request
|
||||
|
||||
# Register NetAlertX directories
|
||||
INSTALL_PATH="/app"
|
||||
sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
|
||||
|
||||
from database import get_temp_db_connection
|
||||
from helper import is_random_mac, format_date, get_setting_value, format_date_iso, format_event_date, mylog, timeNowTZ, format_date_diff, format_ip_long, parse_datetime
|
||||
from db.db_helper import row_to_json, get_date_from_period
|
||||
|
||||
|
||||
# --------------------------
|
||||
# Sessions Endpoints Functions
|
||||
# --------------------------
|
||||
# -------------------------------------------------------------------------------------------
|
||||
def create_session(mac, ip, start_time, end_time=None, event_type_conn="Connected", event_type_disc="Disconnected"):
|
||||
"""Insert a new session into Sessions table"""
|
||||
conn = get_temp_db_connection()
|
||||
cur = conn.cursor()
|
||||
|
||||
cur.execute("""
|
||||
INSERT INTO Sessions (ses_MAC, ses_IP, ses_DateTimeConnection, ses_DateTimeDisconnection,
|
||||
ses_EventTypeConnection, ses_EventTypeDisconnection)
|
||||
VALUES (?, ?, ?, ?, ?, ?)
|
||||
""", (mac, ip, start_time, end_time, event_type_conn, event_type_disc))
|
||||
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
return jsonify({"success": True, "message": f"Session created for MAC {mac}"})
|
||||
|
||||
|
||||
# -------------------------------------------------------------------------------------------
|
||||
def delete_session(mac):
|
||||
"""Delete all sessions for a given MAC"""
|
||||
conn = get_temp_db_connection()
|
||||
cur = conn.cursor()
|
||||
|
||||
cur.execute("DELETE FROM Sessions WHERE ses_MAC = ?", (mac,))
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
return jsonify({"success": True, "message": f"Deleted sessions for MAC {mac}"})
|
||||
|
||||
|
||||
# -------------------------------------------------------------------------------------------
|
||||
def get_sessions(mac=None, start_date=None, end_date=None):
|
||||
"""Retrieve sessions optionally filtered by MAC and date range"""
|
||||
conn = get_temp_db_connection()
|
||||
cur = conn.cursor()
|
||||
|
||||
sql = "SELECT * FROM Sessions WHERE 1=1"
|
||||
params = []
|
||||
|
||||
if mac:
|
||||
sql += " AND ses_MAC = ?"
|
||||
params.append(mac)
|
||||
if start_date:
|
||||
sql += " AND ses_DateTimeConnection >= ?"
|
||||
params.append(start_date)
|
||||
if end_date:
|
||||
sql += " AND ses_DateTimeDisconnection <= ?"
|
||||
params.append(end_date)
|
||||
|
||||
cur.execute(sql, tuple(params))
|
||||
rows = cur.fetchall()
|
||||
conn.close()
|
||||
|
||||
# Convert rows to list of dicts
|
||||
table_data = [dict(r) for r in rows]
|
||||
|
||||
return jsonify({"success": True, "sessions": table_data})
|
||||
|
||||
|
||||
|
||||
def get_sessions_calendar(start_date, end_date):
|
||||
"""
|
||||
Fetch sessions between a start and end date for calendar display.
|
||||
Returns JSON list of calendar sessions.
|
||||
"""
|
||||
|
||||
if not start_date or not end_date:
|
||||
return jsonify({"success": False, "error": "Missing start or end date"}), 400
|
||||
|
||||
conn = get_temp_db_connection()
|
||||
cur = conn.cursor()
|
||||
|
||||
sql = """
|
||||
-- Correct missing connection/disconnection sessions:
|
||||
-- If ses_EventTypeConnection is missing, backfill from last disconnection
|
||||
-- If ses_EventTypeDisconnection is missing, forward-fill from next connection
|
||||
|
||||
SELECT
|
||||
SES1.ses_MAC, SES1.ses_EventTypeConnection, SES1.ses_DateTimeConnection,
|
||||
SES1.ses_EventTypeDisconnection, SES1.ses_DateTimeDisconnection, SES1.ses_IP,
|
||||
SES1.ses_AdditionalInfo, SES1.ses_StillConnected,
|
||||
|
||||
CASE
|
||||
WHEN SES1.ses_EventTypeConnection = '<missing event>' THEN
|
||||
IFNULL(
|
||||
(SELECT MAX(SES2.ses_DateTimeDisconnection)
|
||||
FROM Sessions AS SES2
|
||||
WHERE SES2.ses_MAC = SES1.ses_MAC
|
||||
AND SES2.ses_DateTimeDisconnection < SES1.ses_DateTimeDisconnection
|
||||
AND SES2.ses_DateTimeDisconnection BETWEEN Date(?) AND Date(?)
|
||||
),
|
||||
DATETIME(SES1.ses_DateTimeDisconnection, '-1 hour')
|
||||
)
|
||||
ELSE SES1.ses_DateTimeConnection
|
||||
END AS ses_DateTimeConnectionCorrected,
|
||||
|
||||
CASE
|
||||
WHEN SES1.ses_EventTypeDisconnection = '<missing event>' THEN
|
||||
(SELECT MIN(SES2.ses_DateTimeConnection)
|
||||
FROM Sessions AS SES2
|
||||
WHERE SES2.ses_MAC = SES1.ses_MAC
|
||||
AND SES2.ses_DateTimeConnection > SES1.ses_DateTimeConnection
|
||||
AND SES2.ses_DateTimeConnection BETWEEN Date(?) AND Date(?)
|
||||
)
|
||||
ELSE SES1.ses_DateTimeDisconnection
|
||||
END AS ses_DateTimeDisconnectionCorrected
|
||||
|
||||
FROM Sessions AS SES1
|
||||
WHERE (SES1.ses_DateTimeConnection BETWEEN Date(?) AND Date(?))
|
||||
OR (SES1.ses_DateTimeDisconnection BETWEEN Date(?) AND Date(?))
|
||||
OR SES1.ses_StillConnected = 1
|
||||
"""
|
||||
|
||||
cur.execute(sql, (start_date, end_date, start_date, end_date, start_date, end_date, start_date, end_date))
|
||||
rows = cur.fetchall()
|
||||
|
||||
table_data = []
|
||||
for r in rows:
|
||||
row = dict(r)
|
||||
|
||||
# Determine color
|
||||
if row["ses_EventTypeConnection"] == "<missing event>" or row["ses_EventTypeDisconnection"] == "<missing event>":
|
||||
color = "#f39c12"
|
||||
elif row["ses_StillConnected"] == 1:
|
||||
color = "#00a659"
|
||||
else:
|
||||
color = "#0073b7"
|
||||
|
||||
# Tooltip
|
||||
tooltip = (
|
||||
f"Connection: {format_event_date(row['ses_DateTimeConnection'], row['ses_EventTypeConnection'])}\n"
|
||||
f"Disconnection: {format_event_date(row['ses_DateTimeDisconnection'], row['ses_EventTypeDisconnection'])}\n"
|
||||
f"IP: {row['ses_IP']}"
|
||||
)
|
||||
|
||||
# Append calendar entry
|
||||
table_data.append({
|
||||
"resourceId": row["ses_MAC"],
|
||||
"title": "",
|
||||
"start": format_date_iso(row["ses_DateTimeConnectionCorrected"]),
|
||||
"end": format_date_iso(row["ses_DateTimeDisconnectionCorrected"]),
|
||||
"color": color,
|
||||
"tooltip": tooltip,
|
||||
"className": "no-border"
|
||||
})
|
||||
|
||||
conn.close()
|
||||
return jsonify({"success": True, "sessions": table_data})
|
||||
|
||||
|
||||
|
||||
def get_device_sessions(mac, period):
|
||||
"""
|
||||
Fetch device sessions for a given MAC address and period.
|
||||
"""
|
||||
period_date = get_date_from_period(period)
|
||||
|
||||
conn = get_temp_db_connection()
|
||||
cur = conn.cursor()
|
||||
|
||||
sql = f"""
|
||||
SELECT
|
||||
IFNULL(ses_DateTimeConnection, ses_DateTimeDisconnection) AS ses_DateTimeOrder,
|
||||
ses_EventTypeConnection,
|
||||
ses_DateTimeConnection,
|
||||
ses_EventTypeDisconnection,
|
||||
ses_DateTimeDisconnection,
|
||||
ses_StillConnected,
|
||||
ses_IP,
|
||||
ses_AdditionalInfo
|
||||
FROM Sessions
|
||||
WHERE ses_MAC = ?
|
||||
AND (
|
||||
ses_DateTimeConnection >= {period_date}
|
||||
OR ses_DateTimeDisconnection >= {period_date}
|
||||
OR ses_StillConnected = 1
|
||||
)
|
||||
"""
|
||||
|
||||
|
||||
cur.execute(sql, (mac,))
|
||||
rows = cur.fetchall()
|
||||
conn.close()
|
||||
|
||||
table_data = {"data": []}
|
||||
|
||||
for row in rows:
|
||||
# Connection DateTime
|
||||
if row["ses_EventTypeConnection"] == "<missing event>":
|
||||
ini = row["ses_EventTypeConnection"]
|
||||
else:
|
||||
ini = format_date(row["ses_DateTimeConnection"])
|
||||
|
||||
# Disconnection DateTime
|
||||
if row["ses_StillConnected"]:
|
||||
end = "..."
|
||||
elif row["ses_EventTypeDisconnection"] == "<missing event>":
|
||||
end = row["ses_EventTypeDisconnection"]
|
||||
else:
|
||||
end = format_date(row["ses_DateTimeDisconnection"])
|
||||
|
||||
# Duration
|
||||
if row["ses_EventTypeConnection"] in ("<missing event>", None) or row["ses_EventTypeDisconnection"] in ("<missing event>", None):
|
||||
dur = "..."
|
||||
elif row["ses_StillConnected"]:
|
||||
dur = format_date_diff(row["ses_DateTimeConnection"], None)["text"]
|
||||
else:
|
||||
dur = format_date_diff(row["ses_DateTimeConnection"], row["ses_DateTimeDisconnection"])["text"]
|
||||
|
||||
# Additional Info
|
||||
info = row["ses_AdditionalInfo"]
|
||||
if row["ses_EventTypeConnection"] == "New Device":
|
||||
info = f"{row['ses_EventTypeConnection']}: {info}"
|
||||
|
||||
# Push row data
|
||||
table_data["data"].append({
|
||||
"ses_MAC": mac,
|
||||
"ses_DateTimeOrder": row["ses_DateTimeOrder"],
|
||||
"ses_Connection": ini,
|
||||
"ses_Disconnection": end,
|
||||
"ses_Duration": dur,
|
||||
"ses_IP": row["ses_IP"],
|
||||
"ses_Info": info,
|
||||
})
|
||||
|
||||
# Control no rows
|
||||
if not table_data["data"]:
|
||||
table_data["data"] = []
|
||||
|
||||
sessions = table_data["data"]
|
||||
|
||||
return jsonify({
|
||||
"success": True,
|
||||
"sessions": sessions
|
||||
})
|
||||
|
||||
|
||||
def get_session_events(event_type, period_date):
|
||||
"""
|
||||
Fetch events or sessions based on type and period.
|
||||
"""
|
||||
conn = get_temp_db_connection()
|
||||
conn.row_factory = sqlite3.Row
|
||||
cur = conn.cursor()
|
||||
|
||||
# Base SQLs
|
||||
sql_events = f"""
|
||||
SELECT
|
||||
eve_DateTime AS eve_DateTimeOrder,
|
||||
devName,
|
||||
devOwner,
|
||||
eve_DateTime,
|
||||
eve_EventType,
|
||||
NULL,
|
||||
NULL,
|
||||
NULL,
|
||||
NULL,
|
||||
eve_IP,
|
||||
NULL,
|
||||
eve_AdditionalInfo,
|
||||
NULL,
|
||||
devMac,
|
||||
eve_PendingAlertEmail
|
||||
FROM Events_Devices
|
||||
WHERE eve_DateTime >= {period_date}
|
||||
"""
|
||||
|
||||
sql_sessions = f"""
|
||||
SELECT
|
||||
IFNULL(ses_DateTimeConnection, ses_DateTimeDisconnection) AS ses_DateTimeOrder,
|
||||
devName,
|
||||
devOwner,
|
||||
NULL,
|
||||
NULL,
|
||||
ses_DateTimeConnection,
|
||||
ses_DateTimeDisconnection,
|
||||
NULL,
|
||||
NULL,
|
||||
ses_IP,
|
||||
NULL,
|
||||
ses_AdditionalInfo,
|
||||
ses_StillConnected,
|
||||
devMac
|
||||
FROM Sessions_Devices
|
||||
"""
|
||||
|
||||
# Build SQL based on type
|
||||
if event_type == "all":
|
||||
sql = sql_events
|
||||
elif event_type == "sessions":
|
||||
sql = sql_sessions + f"""
|
||||
WHERE (
|
||||
ses_DateTimeConnection >= {period_date}
|
||||
OR ses_DateTimeDisconnection >= {period_date}
|
||||
OR ses_StillConnected = 1
|
||||
)
|
||||
"""
|
||||
elif event_type == "missing":
|
||||
sql = sql_sessions + f"""
|
||||
WHERE (
|
||||
(ses_DateTimeConnection IS NULL AND ses_DateTimeDisconnection >= {period_date})
|
||||
OR (ses_DateTimeDisconnection IS NULL AND ses_StillConnected = 0 AND ses_DateTimeConnection >= {period_date})
|
||||
)
|
||||
"""
|
||||
elif event_type == "voided":
|
||||
sql = sql_events + ' AND eve_EventType LIKE "VOIDED%"'
|
||||
elif event_type == "new":
|
||||
sql = sql_events + ' AND eve_EventType = "New Device"'
|
||||
elif event_type == "down":
|
||||
sql = sql_events + ' AND eve_EventType = "Device Down"'
|
||||
else:
|
||||
sql = sql_events + ' AND 1=0'
|
||||
|
||||
cur.execute(sql)
|
||||
rows = cur.fetchall()
|
||||
conn.close()
|
||||
|
||||
table_data = {"data": []}
|
||||
|
||||
for row in rows:
|
||||
row = list(row) # make mutable
|
||||
|
||||
if event_type in ("sessions", "missing"):
|
||||
# Duration
|
||||
if row[5] and row[6]:
|
||||
delta = format_date_diff(row[5], row[6])
|
||||
row[7] = delta["text"]
|
||||
row[8] = int(delta["total_minutes"] * 60) # seconds
|
||||
elif row[12] == 1:
|
||||
delta = format_date_diff(row[5], None)
|
||||
row[7] = delta["text"]
|
||||
row[8] = int(delta["total_minutes"] * 60) # seconds
|
||||
else:
|
||||
row[7] = "..."
|
||||
row[8] = 0
|
||||
|
||||
# Connection
|
||||
row[5] = format_date(row[5]) if row[5] else "<missing event>"
|
||||
|
||||
# Disconnection
|
||||
if row[6]:
|
||||
row[6] = format_date(row[6])
|
||||
elif row[12] == 0:
|
||||
row[6] = "<missing event>"
|
||||
else:
|
||||
row[6] = "..."
|
||||
|
||||
else:
|
||||
# Event Date
|
||||
row[3] = format_date(row[3])
|
||||
|
||||
# IP Order
|
||||
row[10] = format_ip_long(row[9])
|
||||
|
||||
table_data["data"].append(row)
|
||||
|
||||
return jsonify(table_data)
|
||||
141
server/helper.py
141
server/helper.py
@@ -7,6 +7,7 @@ import os
|
||||
import re
|
||||
import unicodedata
|
||||
import subprocess
|
||||
from typing import Union
|
||||
import pytz
|
||||
from pytz import timezone
|
||||
import json
|
||||
@@ -16,6 +17,7 @@ import requests
|
||||
import base64
|
||||
import hashlib
|
||||
import random
|
||||
import email
|
||||
import string
|
||||
import ipaddress
|
||||
|
||||
@@ -52,6 +54,116 @@ def get_timezone_offset():
|
||||
return offset_formatted
|
||||
|
||||
|
||||
#-------------------------------------------------------------------------------
|
||||
# Date and time methods
|
||||
#-------------------------------------------------------------------------------
|
||||
|
||||
# # -------------------------------------------------------------------------------------------
|
||||
# def format_date(date_str: str) -> str:
|
||||
# """Format a date string as 'YYYY-MM-DD HH:MM'"""
|
||||
# dt = datetime.datetime.fromisoformat(date_str) if isinstance(date_str, str) else date_str
|
||||
# return dt.strftime('%Y-%m-%d %H:%M')
|
||||
|
||||
# # -------------------------------------------------------------------------------------------
|
||||
# def format_date_diff(date1: str, date2: str) -> str:
|
||||
# """Return difference between two dates formatted as 'Xd HH:MM'"""
|
||||
# dt1 = datetime.datetime.fromisoformat(date1) if isinstance(date1, str) else date1
|
||||
# dt2 = datetime.datetime.fromisoformat(date2) if isinstance(date2, str) else date2
|
||||
# delta = dt2 - dt1
|
||||
|
||||
# days = delta.days
|
||||
# hours, remainder = divmod(delta.seconds, 3600)
|
||||
# minutes = remainder // 60
|
||||
|
||||
# return f"{days}d {hours:02}:{minutes:02}"
|
||||
|
||||
# -------------------------------------------------------------------------------------------
|
||||
def format_date_iso(date1: str) -> str:
|
||||
"""Return ISO 8601 string for a date or None if empty"""
|
||||
if date1 is None:
|
||||
return None
|
||||
dt = datetime.datetime.fromisoformat(date1) if isinstance(date1, str) else date1
|
||||
return dt.isoformat()
|
||||
|
||||
# -------------------------------------------------------------------------------------------
|
||||
def format_event_date(date_str: str, event_type: str) -> str:
|
||||
"""Format event date with fallback rules."""
|
||||
if date_str:
|
||||
return format_date(date_str)
|
||||
elif event_type == "<missing event>":
|
||||
return "<missing event>"
|
||||
else:
|
||||
return "<still connected>"
|
||||
|
||||
# -------------------------------------------------------------------------------------------
|
||||
def ensure_datetime(dt: Union[str, datetime, None]) -> datetime:
|
||||
if dt is None:
|
||||
return timeNowTZ()
|
||||
if isinstance(dt, str):
|
||||
return datetime.datetime.fromisoformat(dt)
|
||||
return dt
|
||||
|
||||
|
||||
def parse_datetime(dt_str):
|
||||
if not dt_str:
|
||||
return None
|
||||
try:
|
||||
# Try ISO8601 first
|
||||
return datetime.datetime.fromisoformat(dt_str)
|
||||
except ValueError:
|
||||
# Try RFC1123 / HTTP format
|
||||
try:
|
||||
return datetime.datetime.strptime(dt_str, '%a, %d %b %Y %H:%M:%S GMT')
|
||||
except ValueError:
|
||||
return None
|
||||
|
||||
def format_date(date_str: str) -> str:
|
||||
dt = parse_datetime(date_str)
|
||||
return dt.strftime('%Y-%m-%d %H:%M') if dt else "invalid"
|
||||
|
||||
def format_date_diff(date1, date2):
|
||||
"""
|
||||
Return difference between two datetimes as 'Xd HH:MM'.
|
||||
Uses app timezone if datetime is naive.
|
||||
date2 can be None (uses now).
|
||||
"""
|
||||
# Get timezone from settings
|
||||
tz_name = get_setting_value("TIMEZONE") or "UTC"
|
||||
tz = pytz.timezone(tz_name)
|
||||
|
||||
def parse_dt(dt):
|
||||
if dt is None:
|
||||
return datetime.datetime.now(tz)
|
||||
if isinstance(dt, str):
|
||||
try:
|
||||
dt_parsed = email.utils.parsedate_to_datetime(dt)
|
||||
except Exception:
|
||||
# fallback: parse ISO string
|
||||
dt_parsed = datetime.datetime.fromisoformat(dt)
|
||||
# convert naive GMT/UTC to app timezone
|
||||
if dt_parsed.tzinfo is None:
|
||||
dt_parsed = tz.localize(dt_parsed)
|
||||
else:
|
||||
dt_parsed = dt_parsed.astimezone(tz)
|
||||
return dt_parsed
|
||||
return dt if dt.tzinfo else tz.localize(dt)
|
||||
|
||||
dt1 = parse_dt(date1)
|
||||
dt2 = parse_dt(date2)
|
||||
|
||||
delta = dt2 - dt1
|
||||
total_minutes = int(delta.total_seconds() // 60)
|
||||
days, rem_minutes = divmod(total_minutes, 1440) # 1440 mins in a day
|
||||
hours, minutes = divmod(rem_minutes, 60)
|
||||
|
||||
return {
|
||||
"text": f"{days}d {hours:02}:{minutes:02}",
|
||||
"days": days,
|
||||
"hours": hours,
|
||||
"minutes": minutes,
|
||||
"total_minutes": total_minutes
|
||||
}
|
||||
|
||||
#-------------------------------------------------------------------------------
|
||||
# File system permission handling
|
||||
#-------------------------------------------------------------------------------
|
||||
@@ -592,35 +704,6 @@ def collect_lang_strings(json, pref, stringSqlParams):
|
||||
|
||||
return stringSqlParams
|
||||
|
||||
#-------------------------------------------------------------------------------
|
||||
# Date and time methods
|
||||
#-------------------------------------------------------------------------------
|
||||
|
||||
# -------------------------------------------------------------------------------------------
|
||||
def format_date(date_str: str) -> str:
|
||||
"""Format a date string as 'YYYY-MM-DD HH:MM'"""
|
||||
dt = datetime.datetime.fromisoformat(date_str) if isinstance(date_str, str) else date_str
|
||||
return dt.strftime('%Y-%m-%d %H:%M')
|
||||
|
||||
# -------------------------------------------------------------------------------------------
|
||||
def format_date_diff(date1: str, date2: str) -> str:
|
||||
"""Return difference between two dates formatted as 'Xd HH:MM'"""
|
||||
dt1 = datetime.datetime.fromisoformat(date1) if isinstance(date1, str) else date1
|
||||
dt2 = datetime.datetime.fromisoformat(date2) if isinstance(date2, str) else date2
|
||||
delta = dt2 - dt1
|
||||
|
||||
days = delta.days
|
||||
hours, remainder = divmod(delta.seconds, 3600)
|
||||
minutes = remainder // 60
|
||||
|
||||
return f"{days}d {hours:02}:{minutes:02}"
|
||||
|
||||
# -------------------------------------------------------------------------------------------
|
||||
def format_date_iso(date1: str) -> str:
|
||||
"""Return ISO 8601 string for a date"""
|
||||
dt = datetime.datetime.fromisoformat(date1) if isinstance(date1, str) else date1
|
||||
return dt.isoformat()
|
||||
|
||||
#-------------------------------------------------------------------------------
|
||||
def checkNewVersion():
|
||||
mylog('debug', [f"[Version check] Checking if new version available"])
|
||||
|
||||
@@ -5,6 +5,7 @@ import random
|
||||
import string
|
||||
import uuid
|
||||
import pytest
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
INSTALL_PATH = "/app"
|
||||
sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
|
||||
@@ -30,21 +31,33 @@ def auth_headers(token):
|
||||
return {"Authorization": f"Bearer {token}"}
|
||||
|
||||
def create_event(client, api_token, mac, event="UnitTest Event", days_old=None):
|
||||
"""
|
||||
Create event using API (POST /event/<mac>).
|
||||
If days_old is set, adds it to payload for backdating support.
|
||||
"""
|
||||
payload = {
|
||||
"event": event,
|
||||
}
|
||||
if days_old:
|
||||
payload["days_old"] = days_old
|
||||
return client.post(f"/event/{mac}", json=payload, headers=auth_headers(api_token))
|
||||
payload = {"ip": "0.0.0.0", "event_type": event}
|
||||
|
||||
# Calculate the event_time if days_old is given
|
||||
if days_old is not None:
|
||||
event_time = timeNowTZ() - timedelta(days=days_old)
|
||||
# ISO 8601 string
|
||||
payload["event_time"] = event_time.isoformat()
|
||||
|
||||
return client.post(f"/events/create/{mac}", json=payload, headers=auth_headers(api_token))
|
||||
|
||||
def list_events(client, api_token, mac=None):
|
||||
url = "/events" if mac is None else f"/events/{mac}"
|
||||
url = "/events" if mac is None else f"/events?mac={mac}"
|
||||
return client.get(url, headers=auth_headers(api_token))
|
||||
|
||||
def test_create_event(client, api_token, test_mac):
|
||||
# create event
|
||||
resp = create_event(client, api_token, test_mac)
|
||||
assert resp.status_code == 200
|
||||
data = resp.get_json()
|
||||
assert data.get("success") is True
|
||||
|
||||
# confirm event exists
|
||||
resp = list_events(client, api_token, test_mac)
|
||||
assert resp.status_code == 200
|
||||
events = resp.get_json().get("events", [])
|
||||
assert any(ev.get("eve_MAC") == test_mac for ev in events)
|
||||
|
||||
|
||||
def test_delete_events_for_mac(client, api_token, test_mac):
|
||||
# create event
|
||||
@@ -54,7 +67,8 @@ def test_delete_events_for_mac(client, api_token, test_mac):
|
||||
# confirm exists
|
||||
resp = list_events(client, api_token, test_mac)
|
||||
assert resp.status_code == 200
|
||||
assert any(ev["eve_MAC"] == test_mac for ev in resp.json)
|
||||
events = resp.json.get("events", [])
|
||||
assert any(ev["eve_MAC"] == test_mac for ev in events)
|
||||
|
||||
# delete
|
||||
resp = client.delete(f"/events/{test_mac}", headers=auth_headers(api_token))
|
||||
@@ -64,7 +78,33 @@ def test_delete_events_for_mac(client, api_token, test_mac):
|
||||
# confirm deleted
|
||||
resp = list_events(client, api_token, test_mac)
|
||||
assert resp.status_code == 200
|
||||
assert len(resp.json) == 0
|
||||
assert len(resp.json.get("events", [])) == 0
|
||||
|
||||
def test_get_events_totals(client, api_token):
|
||||
# 1. Request totals with default period
|
||||
resp = client.get(
|
||||
"/sessions/totals",
|
||||
headers=auth_headers(api_token)
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
|
||||
data = resp.json
|
||||
assert isinstance(data, list)
|
||||
# Expecting 6 counts: all_events, sessions, missing, voided, new, down
|
||||
assert len(data) == 6
|
||||
for count in data:
|
||||
assert isinstance(count, int) # each should be a number
|
||||
|
||||
# 2. Request totals with custom period
|
||||
resp_month = client.get(
|
||||
"/sessions/totals?period=1 month",
|
||||
headers=auth_headers(api_token)
|
||||
)
|
||||
assert resp_month.status_code == 200
|
||||
data_month = resp_month.json
|
||||
assert isinstance(data_month, list)
|
||||
assert len(data_month) == 6
|
||||
|
||||
|
||||
|
||||
def test_delete_all_events(client, api_token, test_mac):
|
||||
@@ -82,10 +122,10 @@ def test_delete_all_events(client, api_token, test_mac):
|
||||
|
||||
# confirm no events
|
||||
resp = list_events(client, api_token)
|
||||
assert len(resp.json) == 0
|
||||
assert len(resp.json.get("events", [])) == 0
|
||||
|
||||
|
||||
def test_delete_events_30days(client, api_token, test_mac):
|
||||
def test_delete_events_dynamic_days(client, api_token, test_mac):
|
||||
# create old + new events
|
||||
create_event(client, api_token, test_mac, days_old=40) # should be deleted
|
||||
create_event(client, api_token, test_mac, days_old=5) # should remain
|
||||
@@ -94,11 +134,15 @@ def test_delete_events_30days(client, api_token, test_mac):
|
||||
assert len(resp.json) == 2
|
||||
|
||||
# delete events older than 30 days
|
||||
resp = client.delete("/events/30days", headers=auth_headers(api_token))
|
||||
resp = client.delete("/events/30", headers=auth_headers(api_token))
|
||||
assert resp.status_code == 200
|
||||
assert resp.json.get("success") is True
|
||||
assert "Deleted events older than 30 days" in resp.json.get("message", "")
|
||||
|
||||
# confirm only recent remains
|
||||
resp = list_events(client, api_token, test_mac)
|
||||
mac_events = [ev for ev in resp.json if ev["eve_MAC"] == test_mac]
|
||||
events = resp.get_json().get("events", [])
|
||||
mac_events = [ev for ev in events if ev.get("eve_MAC") == test_mac]
|
||||
assert len(mac_events) == 1
|
||||
|
||||
|
||||
|
||||
93
test/test_graphq_endpoints.py
Executable file
93
test/test_graphq_endpoints.py
Executable file
@@ -0,0 +1,93 @@
|
||||
import sys
|
||||
import pathlib
|
||||
import sqlite3
|
||||
import random
|
||||
import string
|
||||
import uuid
|
||||
import pytest
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
INSTALL_PATH = "/app"
|
||||
sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
|
||||
|
||||
from helper import timeNowTZ, get_setting_value
|
||||
from api_server.api_server_start import app
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def api_token():
|
||||
return get_setting_value("API_TOKEN")
|
||||
|
||||
@pytest.fixture
|
||||
def client():
|
||||
with app.test_client() as client:
|
||||
yield client
|
||||
|
||||
@pytest.fixture
|
||||
def test_mac():
|
||||
# Generate a unique MAC for each test run
|
||||
return "AA:BB:CC:" + ":".join(f"{random.randint(0,255):02X}" for _ in range(3))
|
||||
|
||||
def auth_headers(token):
|
||||
return {"Authorization": f"Bearer {token}"}
|
||||
|
||||
|
||||
def test_graphql_debug_get(client):
|
||||
"""GET /graphql should return the debug string"""
|
||||
resp = client.get("/graphql")
|
||||
assert resp.status_code == 200
|
||||
assert resp.data.decode() == "NetAlertX GraphQL server running."
|
||||
|
||||
def test_graphql_post_unauthorized(client):
|
||||
"""POST /graphql without token should return 401"""
|
||||
query = {"query": "{ devices { devName devMac } }"}
|
||||
resp = client.post("/graphql", json=query)
|
||||
assert resp.status_code == 401
|
||||
assert "Unauthorized access attempt" in resp.json.get("error", "")
|
||||
|
||||
def test_graphql_post_devices(client, api_token):
|
||||
"""POST /graphql with a valid token should return device data"""
|
||||
query = {
|
||||
"query": """
|
||||
{
|
||||
devices {
|
||||
devices {
|
||||
devGUID
|
||||
devGroup
|
||||
devIsRandomMac
|
||||
devParentChildrenCount
|
||||
}
|
||||
count
|
||||
}
|
||||
}
|
||||
"""
|
||||
}
|
||||
resp = client.post("/graphql", json=query, headers=auth_headers(api_token))
|
||||
assert resp.status_code == 200
|
||||
|
||||
body = resp.get_json()
|
||||
|
||||
# GraphQL spec: response always under "data"
|
||||
assert "data" in body
|
||||
data = body["data"]
|
||||
|
||||
assert "devices" in data
|
||||
assert isinstance(data["devices"]["devices"], list)
|
||||
assert isinstance(data["devices"]["count"], int)
|
||||
|
||||
def test_graphql_post_settings(client, api_token):
|
||||
"""POST /graphql should return settings data"""
|
||||
query = {
|
||||
"query": """
|
||||
{
|
||||
settings {
|
||||
settings { setKey setValue setGroup }
|
||||
count
|
||||
}
|
||||
}
|
||||
"""
|
||||
}
|
||||
resp = client.post("/graphql", json=query, headers=auth_headers(api_token))
|
||||
assert resp.status_code == 200
|
||||
data = resp.json.get("data", {})
|
||||
assert "settings" in data
|
||||
assert isinstance(data["settings"]["settings"], list)
|
||||
@@ -71,6 +71,133 @@ def test_wakeonlan_device(client, api_token, test_mac):
|
||||
data = resp.json
|
||||
assert data.get("success") is True
|
||||
assert "WOL packet sent" in data.get("message", "")
|
||||
|
||||
def test_speedtest_endpoint(client, api_token):
|
||||
# 1. Call the speedtest endpoint
|
||||
resp = client.get("/nettools/speedtest", headers=auth_headers(api_token))
|
||||
|
||||
# 2. Assertions
|
||||
if resp.status_code == 403:
|
||||
# Unauthorized access
|
||||
data = resp.json
|
||||
assert "error" in data
|
||||
assert data["error"] == "Forbidden"
|
||||
else:
|
||||
# Expect success
|
||||
assert resp.status_code == 200
|
||||
data = resp.json
|
||||
assert data.get("success") is True
|
||||
assert "output" in data
|
||||
assert isinstance(data["output"], list)
|
||||
# Optionally check that output lines are strings
|
||||
assert all(isinstance(line, str) for line in data["output"])
|
||||
|
||||
|
||||
|
||||
def test_traceroute_device(client, api_token, test_mac):
|
||||
# 1. Ensure at least one device exists
|
||||
create_dummy(client, api_token, test_mac)
|
||||
|
||||
# 2. Fetch all devices
|
||||
resp = client.get("/devices", headers=auth_headers(api_token))
|
||||
assert resp.status_code == 200
|
||||
devices = resp.json.get("devices", [])
|
||||
assert len(devices) > 0
|
||||
|
||||
# 3. Pick the first device
|
||||
device_ip = devices[0].get("devLastIP", "192.168.1.1") # fallback if dummy has no IP
|
||||
|
||||
# 4. Call the traceroute endpoint
|
||||
resp = client.post(
|
||||
"/nettools/traceroute",
|
||||
json={"devLastIP": device_ip},
|
||||
headers=auth_headers(api_token)
|
||||
)
|
||||
|
||||
# 5. Assertions
|
||||
if not device_ip or device_ip.lower() == 'invalid':
|
||||
# Expect 400 if IP is missing or invalid
|
||||
assert resp.status_code == 400
|
||||
data = resp.json
|
||||
assert data.get("success") is False
|
||||
else:
|
||||
# Expect 200 and valid traceroute output
|
||||
assert resp.status_code == 200
|
||||
data = resp.json
|
||||
assert data.get("success") is True
|
||||
assert "output" in data
|
||||
assert isinstance(data["output"], str)
|
||||
|
||||
@pytest.mark.parametrize("ip,expected_status", [
|
||||
("8.8.8.8", 200),
|
||||
("256.256.256.256", 400), # Invalid IP
|
||||
("", 400), # Missing IP
|
||||
])
|
||||
def test_nslookup_endpoint(client, api_token, ip, expected_status):
|
||||
payload = {"devLastIP": ip} if ip else {}
|
||||
resp = client.post("/nettools/nslookup", json=payload, headers=auth_headers(api_token))
|
||||
|
||||
assert resp.status_code == expected_status
|
||||
data = resp.json
|
||||
|
||||
if expected_status == 200:
|
||||
assert data.get("success") is True
|
||||
assert isinstance(data["output"], list)
|
||||
assert all(isinstance(line, str) for line in data["output"])
|
||||
else:
|
||||
assert data.get("success") is False
|
||||
assert "error" in data
|
||||
|
||||
@pytest.mark.parametrize("ip,mode,expected_status", [
|
||||
("127.0.0.1", "fast", 200),
|
||||
("127.0.0.1", "normal", 200),
|
||||
("127.0.0.1", "detail", 200),
|
||||
("127.0.0.1", "skipdiscovery", 200),
|
||||
("127.0.0.1", "invalidmode", 400),
|
||||
("999.999.999.999", "fast", 400),
|
||||
])
|
||||
def test_nmap_endpoint(client, api_token, ip, mode, expected_status):
|
||||
payload = {"scan": ip, "mode": mode}
|
||||
resp = client.post("/nettools/nmap", json=payload, headers=auth_headers(api_token))
|
||||
|
||||
assert resp.status_code == expected_status
|
||||
data = resp.json
|
||||
|
||||
if expected_status == 200:
|
||||
assert data.get("success") is True
|
||||
assert data.get("mode") == mode
|
||||
assert data.get("ip") == ip
|
||||
assert isinstance(data["output"], list)
|
||||
assert all(isinstance(line, str) for line in data["output"])
|
||||
else:
|
||||
assert data.get("success") is False
|
||||
assert "error" in data
|
||||
|
||||
def test_nslookup_unauthorized(client):
|
||||
# No auth headers
|
||||
resp = client.post("/nettools/nslookup", json={"devLastIP": "8.8.8.8"})
|
||||
assert resp.status_code == 403
|
||||
data = resp.json
|
||||
assert data.get("success") is False
|
||||
assert data.get("error") == "Forbidden"
|
||||
|
||||
def test_nmap_unauthorized(client):
|
||||
# No auth headers
|
||||
resp = client.post("/nettools/nmap", json={"scan": "127.0.0.1", "mode": "fast"})
|
||||
assert resp.status_code == 403
|
||||
data = resp.json
|
||||
assert data.get("success") is False
|
||||
assert data.get("error") == "Forbidden"
|
||||
|
||||
|
||||
def test_internet_info_endpoint(client, api_token):
|
||||
resp = client.get("/nettools/internetinfo", headers=auth_headers(api_token))
|
||||
data = resp.json
|
||||
|
||||
if resp.status_code == 200:
|
||||
assert data.get("success") is True
|
||||
assert isinstance(data.get("output"), str)
|
||||
assert len(data["output"]) > 0 # ensure output is not empty
|
||||
else:
|
||||
# Handle errors, e.g., curl failure
|
||||
assert data.get("success") is False
|
||||
assert "error" in data
|
||||
assert "details" in data
|
||||
257
test/test_sessions_endpoints.py
Executable file
257
test/test_sessions_endpoints.py
Executable file
@@ -0,0 +1,257 @@
|
||||
import sys
|
||||
import pathlib
|
||||
import sqlite3
|
||||
import random
|
||||
import string
|
||||
import uuid
|
||||
import pytest
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
INSTALL_PATH = "/app"
|
||||
sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
|
||||
|
||||
from helper import timeNowTZ, get_setting_value
|
||||
from api_server.api_server_start import app
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def api_token():
|
||||
return get_setting_value("API_TOKEN")
|
||||
|
||||
@pytest.fixture
|
||||
def client():
|
||||
with app.test_client() as client:
|
||||
yield client
|
||||
|
||||
@pytest.fixture
|
||||
def test_mac():
|
||||
# Generate a unique MAC for each test run
|
||||
return "AA:BB:CC:" + ":".join(f"{random.randint(0,255):02X}" for _ in range(3))
|
||||
|
||||
def auth_headers(token):
|
||||
return {"Authorization": f"Bearer {token}"}
|
||||
|
||||
def test_create_device(client, api_token, test_mac):
|
||||
payload = {
|
||||
"createNew": True,
|
||||
"devType": "Test Device",
|
||||
"devOwner": "Unit Test",
|
||||
"devType": "Router",
|
||||
"devVendor": "TestVendor",
|
||||
}
|
||||
resp = client.post(f"/device/{test_mac}", json=payload, headers=auth_headers(api_token))
|
||||
assert resp.status_code == 200
|
||||
assert resp.json.get("success") is True
|
||||
|
||||
|
||||
# -----------------------------
|
||||
def test_create_session(client, api_token, test_mac):
|
||||
payload = {
|
||||
"mac": test_mac,
|
||||
"ip": "192.168.1.100",
|
||||
"start_time": timeNowTZ(),
|
||||
"event_type_conn": "Connected",
|
||||
"event_type_disc": "Disconnected"
|
||||
}
|
||||
resp = client.post("/sessions/create", json=payload, headers=auth_headers(api_token))
|
||||
assert resp.status_code == 200
|
||||
assert resp.json.get("success") is True
|
||||
|
||||
|
||||
# -----------------------------
|
||||
def test_list_sessions(client, api_token, test_mac):
|
||||
# Ensure at least one session exists
|
||||
payload = {
|
||||
"mac": test_mac,
|
||||
"ip": "192.168.1.100",
|
||||
"start_time": timeNowTZ()
|
||||
}
|
||||
client.post("/sessions/create", json=payload, headers=auth_headers(api_token))
|
||||
|
||||
# List sessions for MAC
|
||||
resp = client.get(f"/sessions/list?mac={test_mac}", headers=auth_headers(api_token))
|
||||
assert resp.status_code == 200
|
||||
assert resp.json.get("success") is True
|
||||
sessions = resp.json.get("sessions")
|
||||
assert any(ses["ses_MAC"] == test_mac for ses in sessions)
|
||||
|
||||
|
||||
def test_device_sessions_by_period(client, api_token, test_mac):
|
||||
# 1. Create a dummy session so we have data
|
||||
payload = {
|
||||
"mac": test_mac,
|
||||
"ip": "192.168.1.200",
|
||||
"start_time": timeNowTZ()
|
||||
}
|
||||
resp_create = client.post("/sessions/create", json=payload, headers=auth_headers(api_token))
|
||||
assert resp_create.status_code == 200
|
||||
assert resp_create.json.get("success") is True
|
||||
|
||||
# 2. Query sessions for the device with a valid period
|
||||
resp = client.get(
|
||||
f"/sessions/{test_mac}?period=7 days",
|
||||
headers=auth_headers(api_token)
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
|
||||
data = resp.json
|
||||
assert data.get("success") is True
|
||||
assert "sessions" in data
|
||||
|
||||
sessions = data["sessions"]
|
||||
|
||||
print(sessions)
|
||||
print(test_mac)
|
||||
|
||||
assert isinstance(sessions, list)
|
||||
assert any(s["ses_MAC"] == test_mac for s in sessions)
|
||||
|
||||
|
||||
def test_device_session_events(client, api_token, test_mac):
|
||||
"""
|
||||
Test fetching session/events from the /sessions/session-events endpoint.
|
||||
"""
|
||||
|
||||
# 1. Create a dummy session to ensure we have data
|
||||
payload = {
|
||||
"mac": test_mac,
|
||||
"ip": "192.168.1.250",
|
||||
"start_time": timeNowTZ()
|
||||
}
|
||||
resp_create = client.post(
|
||||
"/sessions/create",
|
||||
json=payload,
|
||||
headers=auth_headers(api_token)
|
||||
)
|
||||
assert resp_create.status_code == 200
|
||||
assert resp_create.json.get("success") is True
|
||||
|
||||
# 2. Fetch session events with default type ('all') and period ('7 days')
|
||||
resp = client.get(
|
||||
f"/sessions/session-events?type=all&period=7 days",
|
||||
headers=auth_headers(api_token)
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
|
||||
data = resp.json
|
||||
assert "data" in data # table data key
|
||||
events = data["data"]
|
||||
|
||||
# 3. Validate the response structure
|
||||
assert isinstance(events, list)
|
||||
|
||||
# If there is at least one row, check fields for sessions
|
||||
if events:
|
||||
row = events[0]
|
||||
# Expecting row as list with at least expected columns
|
||||
assert isinstance(row, list)
|
||||
# IP and datetime fields should exist
|
||||
assert row[9] # IP column
|
||||
assert row[3] # Event datetime column
|
||||
|
||||
# 4. Optionally, test filtering by session type
|
||||
resp_sessions = client.get(
|
||||
"/sessions/session-events?type=sessions&period=7 days",
|
||||
headers=auth_headers(api_token)
|
||||
)
|
||||
assert resp_sessions.status_code == 200
|
||||
sessions = resp_sessions.json["data"]
|
||||
assert isinstance(sessions, list)
|
||||
|
||||
# -----------------------------
|
||||
def test_delete_session(client, api_token, test_mac):
|
||||
# First create session
|
||||
payload = {
|
||||
"mac": test_mac,
|
||||
"ip": "192.168.1.100",
|
||||
"start_time": timeNowTZ()
|
||||
}
|
||||
client.post("/sessions/create", json=payload, headers=auth_headers(api_token))
|
||||
|
||||
# Delete session
|
||||
resp = client.delete("/sessions/delete", json={"mac": test_mac}, headers=auth_headers(api_token))
|
||||
assert resp.status_code == 200
|
||||
assert resp.json.get("success") is True
|
||||
|
||||
# Confirm deletion
|
||||
resp = client.get(f"/sessions/list?mac={test_mac}", headers=auth_headers(api_token))
|
||||
sessions = resp.json.get("sessions")
|
||||
assert not any(ev["ses_MAC"] == test_mac for ses in sessions)
|
||||
|
||||
|
||||
|
||||
def test_get_sessions_calendar(client, api_token, test_mac):
|
||||
"""
|
||||
Test the /sessions/calendar endpoint.
|
||||
Creates session and ensures the calendar output is correct.
|
||||
Cleans up test sessions after test.
|
||||
"""
|
||||
|
||||
|
||||
# --- Setup: create two sessions for the test MAC ---
|
||||
now = timeNowTZ()
|
||||
start1 = (now - timedelta(days=2)).isoformat(timespec="seconds")
|
||||
end1 = (now - timedelta(days=1, hours=20)).isoformat(timespec="seconds")
|
||||
|
||||
start2 = (now - timedelta(days=1)).isoformat(timespec="seconds")
|
||||
end2 = (now - timedelta(hours=20)).isoformat(timespec="seconds")
|
||||
|
||||
# Create sessions using your endpoint
|
||||
client.post("/sessions/create", json={
|
||||
"mac": test_mac,
|
||||
"ip": "192.168.1.100",
|
||||
"start_time": start1,
|
||||
"end_time": end1,
|
||||
"event_type_conn": "connect",
|
||||
"event_type_disc": "disconnect"
|
||||
}, headers=auth_headers(api_token))
|
||||
|
||||
client.post("/sessions/create", json={
|
||||
"mac": test_mac,
|
||||
"ip": "192.168.1.100",
|
||||
"start_time": start2,
|
||||
"end_time": end2,
|
||||
"event_type_conn": "connect",
|
||||
"event_type_disc": "disconnect"
|
||||
}, headers=auth_headers(api_token))
|
||||
|
||||
# --- Call the /sessions/calendar API ---
|
||||
start_date = (now - timedelta(days=3)).strftime("%Y-%m-%d")
|
||||
end_date = (now + timedelta(days=1)).strftime("%Y-%m-%d")
|
||||
|
||||
resp = client.get(
|
||||
f"/sessions/calendar?start={start_date}&end={end_date}",
|
||||
headers=auth_headers(api_token)
|
||||
)
|
||||
|
||||
assert resp.status_code == 200
|
||||
data = resp.json
|
||||
assert data.get("success") is True
|
||||
assert "sessions" in data
|
||||
sessions = data["sessions"]
|
||||
|
||||
# --- Verify calendar sessions ---
|
||||
assert len(sessions) >= 2 # at least our two sessions
|
||||
|
||||
# Check expected keys
|
||||
expected_keys = {"resourceId", "title", "start", "end", "color", "tooltip", "className"}
|
||||
for ses in sessions:
|
||||
assert set(ses.keys()) == expected_keys
|
||||
|
||||
# Check that all sessions belong to the test MAC
|
||||
mac_sessions = [ses for ses in sessions if ses["resourceId"] == test_mac]
|
||||
assert len(mac_sessions) == 2 # or exact number if you know it
|
||||
|
||||
# Check ISO date formatting for start/end
|
||||
for ses in mac_sessions:
|
||||
# start must always be present
|
||||
assert ses["start"] is not None, f"Session start is None: {ses}"
|
||||
datetime.fromisoformat(ses["start"])
|
||||
|
||||
# end can be None only if tooltip mentions "<still connected>"
|
||||
if ses["end"] is not None:
|
||||
datetime.fromisoformat(ses["end"])
|
||||
else:
|
||||
assert "<still connected>" in ses["tooltip"], f"End is None but session not marked as still connected: {ses}"
|
||||
|
||||
# --- Cleanup: delete all test sessions for this MAC ---
|
||||
client.delete(f"/sessions/delete?mac={test_mac}", headers=auth_headers(api_token))
|
||||
Reference in New Issue
Block a user