Files
NetAlertX/server/api_server/api_server_start.py
jokob-sk fbb4a2f8b4
Some checks failed
Code checks / check-url-paths (push) Has been cancelled
Code checks / lint (push) Has been cancelled
Code checks / docker-tests (push) Has been cancelled
docker / docker_dev (push) Has been cancelled
Deploy MkDocs / deploy (push) Has been cancelled
BE: added /auth endpoint
Signed-off-by: jokob-sk <jokob.sk@gmail.com>
2025-12-01 09:24:44 +11:00

796 lines
26 KiB
Python
Executable File

import threading
import sys
import os
from flask import Flask, request, jsonify, Response
from flask_cors import CORS
# Register NetAlertX directories
INSTALL_PATH = os.getenv("NETALERTX_APP", "/app")
sys.path.extend([f"{INSTALL_PATH}/server"])
from logger import mylog # noqa: E402 [flake8 lint suppression]
from helper import get_setting_value # noqa: E402 [flake8 lint suppression]
from db.db_helper import get_date_from_period # noqa: E402 [flake8 lint suppression]
from app_state import updateState # noqa: E402 [flake8 lint suppression]
from .graphql_endpoint import devicesSchema # noqa: E402 [flake8 lint suppression]
from .device_endpoint import ( # noqa: E402 [flake8 lint suppression]
get_device_data,
set_device_data,
delete_device,
delete_device_events,
reset_device_props,
copy_device,
update_device_column
)
from .devices_endpoint import ( # noqa: E402 [flake8 lint suppression]
get_all_devices,
delete_unknown_devices,
delete_all_with_empty_macs,
delete_devices,
export_devices,
import_csv,
devices_totals,
devices_by_status
)
from .events_endpoint import ( # noqa: E402 [flake8 lint suppression]
delete_events,
delete_events_older_than,
get_events,
create_event,
get_events_totals
)
from .history_endpoint import delete_online_history # noqa: E402 [flake8 lint suppression]
from .prometheus_endpoint import get_metric_stats # noqa: E402 [flake8 lint suppression]
from .sessions_endpoint import ( # noqa: E402 [flake8 lint suppression]
get_sessions,
delete_session,
create_session,
get_sessions_calendar,
get_device_sessions,
get_session_events
)
from .nettools_endpoint import ( # noqa: E402 [flake8 lint suppression]
wakeonlan,
traceroute,
speedtest,
nslookup,
nmap_scan,
internet_info
)
from .dbquery_endpoint import read_query, write_query, update_query, delete_query # noqa: E402 [flake8 lint suppression]
from .sync_endpoint import handle_sync_post, handle_sync_get # noqa: E402 [flake8 lint suppression]
from .logs_endpoint import clean_log # noqa: E402 [flake8 lint suppression]
from models.user_events_queue_instance import UserEventsQueueInstance # noqa: E402 [flake8 lint suppression]
from messaging.in_app import ( # noqa: E402 [flake8 lint suppression]
write_notification,
mark_all_notifications_read,
delete_notifications,
get_unread_notifications,
delete_notification,
mark_notification_as_read
)
# Flask application
app = Flask(__name__)
CORS(
app,
resources={
r"/metrics": {"origins": "*"},
r"/device/*": {"origins": "*"},
r"/devices/*": {"origins": "*"},
r"/history/*": {"origins": "*"},
r"/nettools/*": {"origins": "*"},
r"/sessions/*": {"origins": "*"},
r"/settings/*": {"origins": "*"},
r"/dbquery/*": {"origins": "*"},
r"/messaging/*": {"origins": "*"},
r"/events/*": {"origins": "*"},
r"/logs/*": {"origins": "*"},
r"/auth/*": {"origins": "*"}
},
supports_credentials=True,
allow_headers=["Authorization", "Content-Type"],
)
# -------------------------------------------------------------------
# Custom handler for 404 - Route not found
# -------------------------------------------------------------------
@app.errorhandler(404)
def not_found(error):
response = {
"success": False,
"error": "API route not found",
"message": f"The requested URL {error.description if hasattr(error, 'description') else ''} was not found on the server.",
}
return jsonify(response), 404
# --------------------------
# GraphQL Endpoints
# --------------------------
# Endpoint used when accessed via browser
@app.route("/graphql", methods=["GET"])
def graphql_debug():
# Handles GET requests
return "NetAlertX GraphQL server running."
# Endpoint for GraphQL queries
@app.route("/graphql", methods=["POST"])
def graphql_endpoint():
# Check for API token in headers
if not is_authorized():
msg = '[graphql_server] Unauthorized access attempt - make sure your GRAPHQL_PORT and API_TOKEN settings are correct.'
mylog('verbose', [msg])
return jsonify({"success": False, "message": msg, "error": "Forbidden"}), 401
# Retrieve and log request data
data = request.get_json()
mylog("verbose", [f"[graphql_server] data: {data}"])
# Execute the GraphQL query
result = devicesSchema.execute(data.get("query"), variables=data.get("variables"))
# Initialize response
response = {}
if result.errors:
response["errors"] = [str(e) for e in result.errors]
if result.data:
response["data"] = result.data
return jsonify(response)
# --------------------------
# Settings Endpoints
# --------------------------
@app.route("/settings/<setKey>", methods=["GET"])
def api_get_setting(setKey):
if not is_authorized():
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
value = get_setting_value(setKey)
return jsonify({"success": True, "value": value})
# --------------------------
# Device Endpoints
# --------------------------
@app.route("/device/<mac>", methods=["GET"])
def api_get_device(mac):
if not is_authorized():
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
return get_device_data(mac)
@app.route("/device/<mac>", methods=["POST"])
def api_set_device(mac):
if not is_authorized():
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
return set_device_data(mac, request.json)
@app.route("/device/<mac>/delete", methods=["DELETE"])
def api_delete_device(mac):
if not is_authorized():
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
return delete_device(mac)
@app.route("/device/<mac>/events/delete", methods=["DELETE"])
def api_delete_device_events(mac):
if not is_authorized():
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
return delete_device_events(mac)
@app.route("/device/<mac>/reset-props", methods=["POST"])
def api_reset_device_props(mac):
if not is_authorized():
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
return reset_device_props(mac, request.json)
@app.route("/device/copy", methods=["POST"])
def api_copy_device():
if not is_authorized():
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
data = request.get_json() or {}
mac_from = data.get("macFrom")
mac_to = data.get("macTo")
if not mac_from or not mac_to:
return jsonify({"success": False, "message": "ERROR: Missing parameters", "error": "macFrom and macTo are required"}), 400
return copy_device(mac_from, mac_to)
@app.route("/device/<mac>/update-column", methods=["POST"])
def api_update_device_column(mac):
if not is_authorized():
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
data = request.get_json() or {}
column_name = data.get("columnName")
column_value = data.get("columnValue")
if not column_name or not column_value:
return jsonify({"success": False, "message": "ERROR: Missing parameters", "error": "columnName and columnValue are required"}), 400
return update_device_column(mac, column_name, column_value)
# --------------------------
# Devices Collections
# --------------------------
@app.route("/devices", methods=["GET"])
def api_get_devices():
if not is_authorized():
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
return get_all_devices()
@app.route("/devices", methods=["DELETE"])
def api_delete_devices():
if not is_authorized():
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
macs = request.json.get("macs") if request.is_json else None
return delete_devices(macs)
@app.route("/devices/empty-macs", methods=["DELETE"])
def api_delete_all_empty_macs():
if not is_authorized():
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
return delete_all_with_empty_macs()
@app.route("/devices/unknown", methods=["DELETE"])
def api_delete_unknown_devices():
if not is_authorized():
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
return delete_unknown_devices()
@app.route("/devices/export", methods=["GET"])
@app.route("/devices/export/<format>", methods=["GET"])
def api_export_devices(format=None):
if not is_authorized():
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
export_format = (format or request.args.get("format", "csv")).lower()
return export_devices(export_format)
@app.route("/devices/import", methods=["POST"])
def api_import_csv():
if not is_authorized():
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
return import_csv(request.files.get("file"))
@app.route("/devices/totals", methods=["GET"])
def api_devices_totals():
if not is_authorized():
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
return devices_totals()
@app.route("/devices/by-status", methods=["GET"])
def api_devices_by_status():
if not is_authorized():
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
status = request.args.get("status", "") if request.args else None
return devices_by_status(status)
# --------------------------
# Net tools
# --------------------------
@app.route("/nettools/wakeonlan", methods=["POST"])
def api_wakeonlan():
if not is_authorized():
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
mac = request.json.get("devMac")
return wakeonlan(mac)
@app.route("/nettools/traceroute", methods=["POST"])
def api_traceroute():
if not is_authorized():
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
ip = request.json.get("devLastIP")
return traceroute(ip)
@app.route("/nettools/speedtest", methods=["GET"])
def api_speedtest():
if not is_authorized():
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
return speedtest()
@app.route("/nettools/nslookup", methods=["POST"])
def api_nslookup():
"""
API endpoint to handle nslookup requests.
Expects JSON with 'devLastIP'.
"""
if not is_authorized():
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
data = request.get_json(silent=True)
if not data or "devLastIP" not in data:
return jsonify({"success": False, "message": "ERROR: Missing parameters", "error": "Missing 'devLastIP'"}), 400
ip = data["devLastIP"]
return nslookup(ip)
@app.route("/nettools/nmap", methods=["POST"])
def api_nmap():
"""
API endpoint to handle nmap scan requests.
Expects JSON with 'scan' (IP address) and 'mode' (scan mode).
"""
if not is_authorized():
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
data = request.get_json(silent=True)
if not data or "scan" not in data or "mode" not in data:
return jsonify({"success": False, "message": "ERROR: Missing parameters", "error": "Missing 'scan' or 'mode'"}), 400
ip = data["scan"]
mode = data["mode"]
return nmap_scan(ip, mode)
@app.route("/nettools/internetinfo", methods=["GET"])
def api_internet_info():
if not is_authorized():
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
return internet_info()
# --------------------------
# DB query
# --------------------------
@app.route("/dbquery/read", methods=["POST"])
def dbquery_read():
if not is_authorized():
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
data = request.get_json() or {}
raw_sql_b64 = data.get("rawSql")
if not raw_sql_b64:
return jsonify({"success": False, "message": "ERROR: Missing parameters", "error": "rawSql is required"}), 400
return read_query(raw_sql_b64)
@app.route("/dbquery/write", methods=["POST"])
def dbquery_write():
if not is_authorized():
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
data = request.get_json() or {}
raw_sql_b64 = data.get("rawSql")
if not raw_sql_b64:
return jsonify({"success": False, "message": "ERROR: Missing parameters", "error": "rawSql is required"}), 400
return write_query(raw_sql_b64)
@app.route("/dbquery/update", methods=["POST"])
def dbquery_update():
if not is_authorized():
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
data = request.get_json() or {}
required = ["columnName", "id", "dbtable", "columns", "values"]
if not all(data.get(k) for k in required):
return jsonify(
{
"success": False,
"message": "ERROR: Missing parameters",
"error": "Missing required 'columnName', 'id', 'dbtable', 'columns', or 'values' query parameter"
}
), 400
return update_query(
column_name=data["columnName"],
ids=data["id"],
dbtable=data["dbtable"],
columns=data["columns"],
values=data["values"],
)
@app.route("/dbquery/delete", methods=["POST"])
def dbquery_delete():
if not is_authorized():
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
data = request.get_json() or {}
required = ["columnName", "id", "dbtable"]
if not all(data.get(k) for k in required):
return jsonify({"success": False, "message": "ERROR: Missing parameters", "error": "Missing required 'columnName', 'id', or 'dbtable' query parameter"}), 400
return delete_query(
column_name=data["columnName"],
ids=data["id"],
dbtable=data["dbtable"],
)
# --------------------------
# Online history
# --------------------------
@app.route("/history", methods=["DELETE"])
def api_delete_online_history():
if not is_authorized():
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
return delete_online_history()
# --------------------------
# Logs
# --------------------------
@app.route("/logs", methods=["DELETE"])
def api_clean_log():
if not is_authorized():
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
file = request.args.get("file")
if not file:
return jsonify({"success": False, "message": "ERROR: Missing parameters", "error": "Missing 'file' query parameter"}), 400
return clean_log(file)
@app.route("/logs/add-to-execution-queue", methods=["POST"])
def api_add_to_execution_queue():
if not is_authorized():
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
queue = UserEventsQueueInstance()
# Get JSON payload safely
data = request.get_json(silent=True) or {}
action = data.get("action")
if not action:
return jsonify({
"success": False, "message": "ERROR: Missing parameters", "error": "Missing required 'action' field in JSON body"}), 400
success, message = queue.add_event(action)
status_code = 200 if success else 400
response = {"success": success, "message": message}
if not success:
response["error"] = "ERROR"
return jsonify(response), status_code
# --------------------------
# Device Events
# --------------------------
@app.route("/events/create/<mac>", methods=["POST"])
def api_create_event(mac):
if not is_authorized():
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
data = request.json or {}
ip = data.get("ip", "0.0.0.0")
event_type = data.get("event_type", "Device Down")
additional_info = data.get("additional_info", "")
pending_alert = data.get("pending_alert", 1)
event_time = data.get("event_time", None)
# Call the helper to insert into DB
create_event(mac, ip, event_type, additional_info, pending_alert, event_time)
# Return consistent JSON response
return jsonify({"success": True, "message": f"Event created for {mac}"})
@app.route("/events/<mac>", methods=["DELETE"])
def api_events_by_mac(mac):
if not is_authorized():
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
return delete_device_events(mac)
@app.route("/events", methods=["DELETE"])
def api_delete_all_events():
if not is_authorized():
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
return delete_events()
@app.route("/events", methods=["GET"])
def api_get_events():
if not is_authorized():
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
mac = request.args.get("mac")
return get_events(mac)
@app.route("/events/<int:days>", methods=["DELETE"])
def api_delete_old_events(days: int):
"""
Delete events older than <days> days.
Example: DELETE /events/30
"""
if not is_authorized():
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
return delete_events_older_than(days)
@app.route("/sessions/totals", methods=["GET"])
def api_get_events_totals():
if not is_authorized():
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
period = get_date_from_period(request.args.get("period", "7 days"))
return get_events_totals(period)
# --------------------------
# Sessions
# --------------------------
@app.route("/sessions/create", methods=["POST"])
def api_create_session():
if not is_authorized():
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
data = request.json
mac = data.get("mac")
ip = data.get("ip")
start_time = data.get("start_time")
end_time = data.get("end_time")
event_type_conn = data.get("event_type_conn", "Connected")
event_type_disc = data.get("event_type_disc", "Disconnected")
if not mac or not ip or not start_time:
return jsonify({"success": False, "message": "ERROR: Missing parameters", "error": "Missing required 'mac', 'ip', or 'start_time' query parameter"}), 400
return create_session(
mac, ip, start_time, end_time, event_type_conn, event_type_disc
)
@app.route("/sessions/delete", methods=["DELETE"])
def api_delete_session():
if not is_authorized():
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
mac = request.json.get("mac") if request.is_json else None
if not mac:
return jsonify({"success": False, "message": "ERROR: Missing parameters", "error": "Missing 'mac' query parameter"}), 400
return delete_session(mac)
@app.route("/sessions/list", methods=["GET"])
def api_get_sessions():
if not is_authorized():
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
mac = request.args.get("mac")
start_date = request.args.get("start_date")
end_date = request.args.get("end_date")
return get_sessions(mac, start_date, end_date)
@app.route("/sessions/calendar", methods=["GET"])
def api_get_sessions_calendar():
if not is_authorized():
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
# Query params: /sessions/calendar?start=2025-08-01&end=2025-08-21
start_date = request.args.get("start")
end_date = request.args.get("end")
return get_sessions_calendar(start_date, end_date)
@app.route("/sessions/<mac>", methods=["GET"])
def api_device_sessions(mac):
if not is_authorized():
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
period = request.args.get("period", "1 day")
return get_device_sessions(mac, period)
@app.route("/sessions/session-events", methods=["GET"])
def api_get_session_events():
if not is_authorized():
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
session_event_type = request.args.get("type", "all")
period = get_date_from_period(request.args.get("period", "7 days"))
return get_session_events(session_event_type, period)
# --------------------------
# Prometheus metrics endpoint
# --------------------------
@app.route("/metrics")
def metrics():
if not is_authorized():
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
# Return Prometheus metrics as plain text
return Response(get_metric_stats(), mimetype="text/plain")
# --------------------------
# In-app notifications
# --------------------------
@app.route("/messaging/in-app/write", methods=["POST"])
def api_write_notification():
if not is_authorized():
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
data = request.json or {}
content = data.get("content")
level = data.get("level", "alert")
if not content:
return jsonify({"success": False, "message": "ERROR: Missing parameters", "error": "Missing content"}), 400
write_notification(content, level)
return jsonify({"success": True})
@app.route("/messaging/in-app/unread", methods=["GET"])
def api_get_unread_notifications():
if not is_authorized():
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
return get_unread_notifications()
@app.route("/messaging/in-app/read/all", methods=["POST"])
def api_mark_all_notifications_read():
if not is_authorized():
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
return jsonify(mark_all_notifications_read())
@app.route("/messaging/in-app/delete", methods=["DELETE"])
def api_delete_all_notifications():
if not is_authorized():
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
return delete_notifications()
@app.route("/messaging/in-app/delete/<guid>", methods=["DELETE"])
def api_delete_notification(guid):
"""Delete a single notification by GUID."""
if not is_authorized():
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
result = delete_notification(guid)
if result.get("success"):
return jsonify({"success": True})
else:
return jsonify({"success": False, "message": "ERROR", "error": result.get("error")}), 500
@app.route("/messaging/in-app/read/<guid>", methods=["POST"])
def api_mark_notification_read(guid):
"""Mark a single notification as read by GUID."""
if not is_authorized():
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
result = mark_notification_as_read(guid)
if result.get("success"):
return jsonify({"success": True})
else:
return jsonify({"success": False, "message": "ERROR", "error": result.get("error")}), 500
# --------------------------
# SYNC endpoint
# --------------------------
@app.route("/sync", methods=["GET", "POST"])
def sync_endpoint():
if not is_authorized():
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
if request.method == "GET":
return handle_sync_get()
elif request.method == "POST":
return handle_sync_post()
else:
msg = "[sync endpoint] Method Not Allowed"
write_notification(msg, "alert")
mylog("verbose", [msg])
return jsonify({"success": False, "message": "ERROR: No allowed", "error": "Method Not Allowed"}), 405
# --------------------------
# Auth endpoint
# --------------------------
@app.route("/auth", methods=["GET"])
def check_auth():
if not is_authorized():
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
elif request.method == "GET":
return jsonify({"success": True, "message": "Authentication check successful"}), 200
else:
msg = "[sync endpoint] Method Not Allowed"
write_notification(msg, "alert")
mylog("verbose", [msg])
return jsonify({"success": False, "message": "ERROR: No allowed", "error": "Method Not Allowed"}), 405
# --------------------------
# Background Server Start
# --------------------------
def is_authorized():
token = request.headers.get("Authorization")
is_authorized = token == f"Bearer {get_setting_value('API_TOKEN')}"
if not is_authorized:
msg = "[api] Unauthorized access attempt - make sure your GRAPHQL_PORT and API_TOKEN settings are correct."
write_notification(msg, "alert")
mylog("verbose", [msg])
return is_authorized
def start_server(graphql_port, app_state):
"""Start the GraphQL server in a background thread."""
if app_state.graphQLServerStarted == 0:
mylog("verbose", [f"[graphql endpoint] Starting on port: {graphql_port}"])
# Start Flask app in a separate thread
thread = threading.Thread(
target=lambda: app.run(
host="0.0.0.0", port=graphql_port, debug=True, use_reloader=False
)
)
thread.start()
# Update the state to indicate the server has started
app_state = updateState("Process: Idle", None, None, None, 1)