MCP enhancements #1343

This commit is contained in:
Jokob @NetAlertX
2025-12-12 05:21:23 +00:00
parent a627cc6abe
commit aed7a91bf0
8 changed files with 1110 additions and 826 deletions

View File

@@ -16,32 +16,6 @@ from db.db_helper import get_date_from_period # noqa: E402 [flake8 lint suppres
from app_state import updateState # noqa: E402 [flake8 lint suppression]
from .graphql_endpoint import devicesSchema # noqa: E402 [flake8 lint suppression]
from .device_endpoint import ( # noqa: E402 [flake8 lint suppression]
get_device_data,
set_device_data,
delete_device,
delete_device_events,
reset_device_props,
copy_device,
update_device_column
)
from .devices_endpoint import ( # noqa: E402 [flake8 lint suppression]
get_all_devices,
delete_unknown_devices,
delete_all_with_empty_macs,
delete_devices,
export_devices,
import_csv,
devices_totals,
devices_by_status
)
from .events_endpoint import ( # noqa: E402 [flake8 lint suppression]
delete_events,
delete_events_older_than,
get_events,
create_event,
get_events_totals
)
from .history_endpoint import delete_online_history # noqa: E402 [flake8 lint suppression]
from .prometheus_endpoint import get_metric_stats # noqa: E402 [flake8 lint suppression]
from .sessions_endpoint import ( # noqa: E402 [flake8 lint suppression]
@@ -223,35 +197,55 @@ def api_get_setting(setKey):
def api_get_device(mac):
if not is_authorized():
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
return get_device_data(mac)
period = request.args.get("period", "")
device_handler = DeviceInstance()
device_data = device_handler.getDeviceData(mac, period)
if device_data is None:
return jsonify({"error": "Device not found"}), 404
return jsonify(device_data)
@app.route("/device/<mac>", methods=["POST"])
def api_set_device(mac):
if not is_authorized():
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
return set_device_data(mac, request.json)
device_handler = DeviceInstance()
result = device_handler.setDeviceData(mac, request.json)
return jsonify(result)
@app.route("/device/<mac>/delete", methods=["DELETE"])
def api_delete_device(mac):
if not is_authorized():
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
return delete_device(mac)
device_handler = DeviceInstance()
result = device_handler.deleteDeviceByMAC(mac)
return jsonify(result)
@app.route("/device/<mac>/events/delete", methods=["DELETE"])
def api_delete_device_events(mac):
if not is_authorized():
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
return delete_device_events(mac)
device_handler = DeviceInstance()
result = device_handler.deleteDeviceEvents(mac)
return jsonify(result)
@app.route("/device/<mac>/reset-props", methods=["POST"])
def api_reset_device_props(mac):
if not is_authorized():
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
return reset_device_props(mac, request.json)
device_handler = DeviceInstance()
result = device_handler.resetDeviceProps(mac)
return jsonify(result)
@app.route("/device/copy", methods=["POST"])
@@ -266,7 +260,9 @@ def api_copy_device():
if not mac_from or not mac_to:
return jsonify({"success": False, "message": "ERROR: Missing parameters", "error": "macFrom and macTo are required"}), 400
return copy_device(mac_from, mac_to)
device_handler = DeviceInstance()
result = device_handler.copyDevice(mac_from, mac_to)
return jsonify(result)
@app.route("/device/<mac>/update-column", methods=["POST"])
@@ -281,20 +277,29 @@ def api_update_device_column(mac):
if not column_name or not column_value:
return jsonify({"success": False, "message": "ERROR: Missing parameters", "error": "columnName and columnValue are required"}), 400
return update_device_column(mac, column_name, column_value)
device_handler = DeviceInstance()
result = device_handler.updateDeviceColumn(mac, column_name, column_value)
if not result.get("success"):
return jsonify(result), 404
return jsonify(result)
@app.route('/mcp/sse/device/<mac>/set-alias', methods=['POST'])
@app.route('/device/<mac>/set-alias', methods=['POST'])
def api_device_set_alias(mac):
"""Set the device alias - convenience wrapper around update_device_column."""
"""Set the device alias - convenience wrapper around updateDeviceColumn."""
if not is_authorized():
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
data = request.get_json() or {}
alias = data.get('alias')
if not alias:
return jsonify({"success": False, "message": "ERROR: Missing parameters", "error": "alias is required"}), 400
return update_device_column(mac, 'devName', alias)
device_handler = DeviceInstance()
result = device_handler.updateDeviceColumn(mac, 'devName', alias)
return jsonify(result)
@app.route('/mcp/sse/device/open_ports', methods=['POST'])
@@ -327,7 +332,9 @@ def api_device_open_ports():
def api_get_devices():
if not is_authorized():
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
return get_all_devices()
device_handler = DeviceInstance()
devices = device_handler.getAll_AsResponse()
return jsonify({"success": True, "devices": devices})
@app.route("/devices", methods=["DELETE"])
@@ -336,24 +343,27 @@ def api_delete_devices():
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
macs = request.json.get("macs") if request.is_json else None
return delete_devices(macs)
device_handler = DeviceInstance()
return jsonify(device_handler.deleteDevices(macs))
@app.route("/devices/empty-macs", methods=["DELETE"])
def api_delete_all_empty_macs():
if not is_authorized():
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
return delete_all_with_empty_macs()
device_handler = DeviceInstance()
return jsonify(device_handler.deleteAllWithEmptyMacs())
@app.route("/devices/unknown", methods=["DELETE"])
def api_delete_unknown_devices():
if not is_authorized():
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
return delete_unknown_devices()
device_handler = DeviceInstance()
return jsonify(device_handler.deleteUnknownDevices())
@app.route('/mcp/sse/devices/export', methods=['GET'])
@app.route("/devices/export", methods=["GET"])
@app.route("/devices/export/<format>", methods=["GET"])
def api_export_devices(format=None):
@@ -361,21 +371,52 @@ def api_export_devices(format=None):
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
export_format = (format or request.args.get("format", "csv")).lower()
return export_devices(export_format)
device_handler = DeviceInstance()
result = device_handler.exportDevices(export_format)
if "error" in result:
return jsonify(result), 400
if result["format"] == "json":
return jsonify({"data": result["data"], "columns": result["columns"]})
elif result["format"] == "csv":
return Response(
result["content"],
mimetype="text/csv",
headers={"Content-Disposition": "attachment; filename=devices.csv"},
)
@app.route('/mcp/sse/devices/import', methods=['POST'])
@app.route("/devices/import", methods=["POST"])
def api_import_csv():
if not is_authorized():
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
return import_csv(request.files.get("file"))
device_handler = DeviceInstance()
json_content = None
file_storage = None
if request.is_json and request.json.get("content"):
json_content = request.json.get("content")
else:
file_storage = request.files.get("file")
result = device_handler.importCSV(file_storage=file_storage, json_content=json_content)
if not result.get("success"):
return jsonify(result), 400
return jsonify(result)
@app.route('/mcp/sse/devices/totals', methods=['GET'])
@app.route("/devices/totals", methods=["GET"])
def api_devices_totals():
if not is_authorized():
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
return devices_totals()
device_handler = DeviceInstance()
return jsonify(device_handler.getTotals())
@app.route('/mcp/sse/devices/by-status', methods=['GET', 'POST'])
@@ -385,8 +426,8 @@ def api_devices_by_status():
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
status = request.args.get("status", "") if request.args else None
return devices_by_status(status)
device_handler = DeviceInstance()
return jsonify(device_handler.getByStatus(status))
@app.route('/mcp/sse/devices/search', methods=['POST'])
@@ -402,16 +443,16 @@ def api_devices_search():
if not query:
return jsonify({"success": False, "message": "Missing 'query' parameter", "error": "Missing query"}), 400
device_handler = DeviceInstance()
if is_mac(query):
device_data = get_device_data(query)
if device_data.status_code == 200:
return jsonify({"success": True, "devices": [device_data.get_json()]})
device_data = device_handler.getDeviceData(query)
if device_data:
return jsonify({"success": True, "devices": [device_data]})
else:
return jsonify({"success": False, "message": "Device not found", "error": "Device not found"}), 404
# Create fresh DB instance for this thread
device_handler = DeviceInstance()
matches = device_handler.search(query)
if not matches:
@@ -432,10 +473,26 @@ def api_devices_latest():
latest = device_handler.getLatest()
if not latest:
return jsonify({"message": "No devices found"}), 404
return jsonify({"success": False, "message": "No devices found"}), 404
return jsonify([latest])
@app.route('/mcp/sse/devices/favorite', methods=['GET'])
@app.route('/devices/favorite', methods=['GET'])
def api_devices_favorite():
"""Get favorite devices - maps to DeviceInstance.getFavorite()."""
if not is_authorized():
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
device_handler = DeviceInstance()
favorite = device_handler.getFavorite()
if not favorite:
return jsonify({"success": False, "message": "No devices found"}), 404
return jsonify([favorite])
@app.route('/mcp/sse/devices/network/topology', methods=['GET'])
@app.route('/devices/network/topology', methods=['GET'])
def api_devices_network_topology():
@@ -479,6 +536,7 @@ def api_wakeonlan():
return wakeonlan(mac)
@app.route('/mcp/sse/nettools/traceroute', methods=['POST'])
@app.route("/nettools/traceroute", methods=["POST"])
def api_traceroute():
if not is_authorized():
@@ -720,25 +778,30 @@ def api_create_event(mac):
pending_alert = data.get("pending_alert", 1)
event_time = data.get("event_time", None)
# Call the helper to insert into DB
create_event(mac, ip, event_type, additional_info, pending_alert, event_time)
event_handler = EventInstance()
result = event_handler.createEvent(mac, ip, event_type, additional_info, pending_alert, event_time)
# Return consistent JSON response
return jsonify({"success": True, "message": f"Event created for {mac}"})
return jsonify(result)
@app.route("/events/<mac>", methods=["DELETE"])
def api_events_by_mac(mac):
if not is_authorized():
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
return delete_device_events(mac)
device_handler = DeviceInstance()
result = device_handler.deleteDeviceEvents(mac)
return jsonify(result)
@app.route("/events", methods=["DELETE"])
def api_delete_all_events():
if not is_authorized():
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
return delete_events()
event_handler = EventInstance()
result = event_handler.deleteAllEvents()
return jsonify(result)
@app.route("/events", methods=["GET"])
@@ -747,7 +810,9 @@ def api_get_events():
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
mac = request.args.get("mac")
return get_events(mac)
event_handler = EventInstance()
events = event_handler.getEvents(mac)
return jsonify({"count": len(events), "events": events})
@app.route("/events/<int:days>", methods=["DELETE"])
@@ -759,7 +824,9 @@ def api_delete_old_events(days: int):
if not is_authorized():
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
return delete_events_older_than(days)
event_handler = EventInstance()
result = event_handler.deleteEventsOlderThan(days)
return jsonify(result)
@app.route("/sessions/totals", methods=["GET"])
@@ -767,8 +834,10 @@ def api_get_events_totals():
if not is_authorized():
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
period = get_date_from_period(request.args.get("period", "7 days"))
return get_events_totals(period)
period = request.args.get("period", "7 days")
event_handler = EventInstance()
totals = event_handler.getEventsTotals(period)
return jsonify(totals)
@app.route('/mcp/sse/events/recent', methods=['GET', 'POST'])

View File

@@ -1,344 +0,0 @@
#!/usr/bin/env python
import os
import sys
from flask import jsonify, request
# Register NetAlertX directories
INSTALL_PATH = os.getenv("NETALERTX_APP", "/app")
sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
from database import get_temp_db_connection # noqa: E402 [flake8 lint suppression]
from helper import is_random_mac, get_setting_value # noqa: E402 [flake8 lint suppression]
from utils.datetime_utils import timeNowDB, format_date # noqa: E402 [flake8 lint suppression]
from db.db_helper import row_to_json, get_date_from_period # noqa: E402 [flake8 lint suppression]
# --------------------------
# Device Endpoints Functions
# --------------------------
def get_device_data(mac):
"""Fetch device info with children, event stats, and presence calculation."""
# Open temporary connection for this request
conn = get_temp_db_connection()
cur = conn.cursor()
now = timeNowDB()
# Special case for new device
if mac.lower() == "new":
device_data = {
"devMac": "",
"devName": "",
"devOwner": "",
"devType": "",
"devVendor": "",
"devFavorite": 0,
"devGroup": "",
"devComments": "",
"devFirstConnection": now,
"devLastConnection": now,
"devLastIP": "",
"devStaticIP": 0,
"devScan": 0,
"devLogEvents": 0,
"devAlertEvents": 0,
"devAlertDown": 0,
"devParentRelType": "default",
"devReqNicsOnline": 0,
"devSkipRepeated": 0,
"devLastNotification": "",
"devPresentLastScan": 0,
"devIsNew": 1,
"devLocation": "",
"devIsArchived": 0,
"devParentMAC": "",
"devParentPort": "",
"devIcon": "",
"devGUID": "",
"devSite": "",
"devSSID": "",
"devSyncHubNode": "",
"devSourcePlugin": "",
"devCustomProps": "",
"devStatus": "Unknown",
"devIsRandomMAC": False,
"devSessions": 0,
"devEvents": 0,
"devDownAlerts": 0,
"devPresenceHours": 0,
"devFQDN": "",
}
return jsonify(device_data)
# Compute period date for sessions/events
period = request.args.get("period", "") # e.g., '7 days', '1 month', etc.
period_date_sql = get_date_from_period(period)
# Fetch device info + computed fields
sql = f"""
SELECT
d.*,
CASE
WHEN d.devAlertDown != 0 AND d.devPresentLastScan = 0 THEN 'Down'
WHEN d.devPresentLastScan = 1 THEN 'On-line'
ELSE 'Off-line'
END AS devStatus,
(SELECT COUNT(*) FROM Sessions
WHERE ses_MAC = d.devMac AND (
ses_DateTimeConnection >= {period_date_sql} OR
ses_DateTimeDisconnection >= {period_date_sql} OR
ses_StillConnected = 1
)) AS devSessions,
(SELECT COUNT(*) FROM Events
WHERE eve_MAC = d.devMac AND eve_DateTime >= {period_date_sql}
AND eve_EventType NOT IN ('Connected','Disconnected')) AS devEvents,
(SELECT COUNT(*) FROM Events
WHERE eve_MAC = d.devMac AND eve_DateTime >= {period_date_sql}
AND eve_EventType = 'Device Down') AS devDownAlerts,
(SELECT CAST(MAX(0, SUM(
julianday(IFNULL(ses_DateTimeDisconnection,'{now}')) -
julianday(CASE WHEN ses_DateTimeConnection < {period_date_sql}
THEN {period_date_sql} ELSE ses_DateTimeConnection END)
) * 24) AS INT)
FROM Sessions
WHERE ses_MAC = d.devMac
AND ses_DateTimeConnection IS NOT NULL
AND (ses_DateTimeDisconnection IS NOT NULL OR ses_StillConnected = 1)
AND (ses_DateTimeConnection >= {period_date_sql}
OR ses_DateTimeDisconnection >= {period_date_sql} OR ses_StillConnected = 1)
) AS devPresenceHours
FROM Devices d
WHERE d.devMac = ? OR CAST(d.rowid AS TEXT) = ?
"""
# Fetch device
cur.execute(sql, (mac, mac))
row = cur.fetchone()
if not row:
return jsonify({"error": "Device not found"}), 404
device_data = row_to_json(list(row.keys()), row)
device_data["devFirstConnection"] = format_date(device_data["devFirstConnection"])
device_data["devLastConnection"] = format_date(device_data["devLastConnection"])
device_data["devIsRandomMAC"] = is_random_mac(device_data["devMac"])
# Fetch children
cur.execute(
"SELECT * FROM Devices WHERE devParentMAC = ? ORDER BY devPresentLastScan DESC",
(device_data["devMac"],),
)
children_rows = cur.fetchall()
children = [row_to_json(list(r.keys()), r) for r in children_rows]
children_nics = [c for c in children if c.get("devParentRelType") == "nic"]
device_data["devChildrenDynamic"] = children
device_data["devChildrenNicsDynamic"] = children_nics
conn.close()
return jsonify(device_data)
def set_device_data(mac, data):
"""Update or create a device."""
if data.get("createNew", False):
sql = """
INSERT INTO Devices (
devMac, devName, devOwner, devType, devVendor, devIcon,
devFavorite, devGroup, devLocation, devComments,
devParentMAC, devParentPort, devSSID, devSite,
devStaticIP, devScan, devAlertEvents, devAlertDown,
devParentRelType, devReqNicsOnline, devSkipRepeated,
devIsNew, devIsArchived, devLastConnection,
devFirstConnection, devLastIP, devGUID, devCustomProps,
devSourcePlugin
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
"""
values = (
mac,
data.get("devName", ""),
data.get("devOwner", ""),
data.get("devType", ""),
data.get("devVendor", ""),
data.get("devIcon", ""),
data.get("devFavorite", 0),
data.get("devGroup", ""),
data.get("devLocation", ""),
data.get("devComments", ""),
data.get("devParentMAC", ""),
data.get("devParentPort", ""),
data.get("devSSID", ""),
data.get("devSite", ""),
data.get("devStaticIP", 0),
data.get("devScan", 0),
data.get("devAlertEvents", 0),
data.get("devAlertDown", 0),
data.get("devParentRelType", "default"),
data.get("devReqNicsOnline", 0),
data.get("devSkipRepeated", 0),
data.get("devIsNew", 0),
data.get("devIsArchived", 0),
data.get("devLastConnection", timeNowDB()),
data.get("devFirstConnection", timeNowDB()),
data.get("devLastIP", ""),
data.get("devGUID", ""),
data.get("devCustomProps", ""),
data.get("devSourcePlugin", "DUMMY"),
)
else:
sql = """
UPDATE Devices SET
devName=?, devOwner=?, devType=?, devVendor=?, devIcon=?,
devFavorite=?, devGroup=?, devLocation=?, devComments=?,
devParentMAC=?, devParentPort=?, devSSID=?, devSite=?,
devStaticIP=?, devScan=?, devAlertEvents=?, devAlertDown=?,
devParentRelType=?, devReqNicsOnline=?, devSkipRepeated=?,
devIsNew=?, devIsArchived=?, devCustomProps=?
WHERE devMac=?
"""
values = (
data.get("devName", ""),
data.get("devOwner", ""),
data.get("devType", ""),
data.get("devVendor", ""),
data.get("devIcon", ""),
data.get("devFavorite", 0),
data.get("devGroup", ""),
data.get("devLocation", ""),
data.get("devComments", ""),
data.get("devParentMAC", ""),
data.get("devParentPort", ""),
data.get("devSSID", ""),
data.get("devSite", ""),
data.get("devStaticIP", 0),
data.get("devScan", 0),
data.get("devAlertEvents", 0),
data.get("devAlertDown", 0),
data.get("devParentRelType", "default"),
data.get("devReqNicsOnline", 0),
data.get("devSkipRepeated", 0),
data.get("devIsNew", 0),
data.get("devIsArchived", 0),
data.get("devCustomProps", ""),
mac,
)
conn = get_temp_db_connection()
cur = conn.cursor()
cur.execute(sql, values)
conn.commit()
conn.close()
return jsonify({"success": True})
def delete_device(mac):
"""Delete a device by MAC."""
conn = get_temp_db_connection()
cur = conn.cursor()
cur.execute("DELETE FROM Devices WHERE devMac=?", (mac,))
conn.commit()
conn.close()
return jsonify({"success": True})
def delete_device_events(mac):
"""Delete all events for a device."""
conn = get_temp_db_connection()
cur = conn.cursor()
cur.execute("DELETE FROM Events WHERE eve_MAC=?", (mac,))
conn.commit()
conn.close()
return jsonify({"success": True})
def reset_device_props(mac, data=None):
"""Reset device custom properties to default."""
default_props = get_setting_value("NEWDEV_devCustomProps")
conn = get_temp_db_connection()
cur = conn.cursor()
cur.execute(
"UPDATE Devices SET devCustomProps=? WHERE devMac=?",
(default_props, mac),
)
conn.commit()
conn.close()
return jsonify({"success": True})
def update_device_column(mac, column_name, column_value):
"""
Update a specific column for a given device.
Example: update_device_column("AA:BB:CC:DD:EE:FF", "devParentMAC", "Internet")
"""
conn = get_temp_db_connection()
cur = conn.cursor()
# Build safe SQL with column name whitelisted
sql = f"UPDATE Devices SET {column_name}=? WHERE devMac=?"
cur.execute(sql, (column_value, mac))
conn.commit()
if cur.rowcount > 0:
return jsonify({"success": True})
else:
return jsonify({"success": False, "error": "Device not found"}), 404
conn.close()
return jsonify({"success": True})
def copy_device(mac_from, mac_to):
"""
Copy a device entry from one MAC to another.
If a device already exists with mac_to, it will be replaced.
"""
conn = get_temp_db_connection()
cur = conn.cursor()
try:
# Drop temporary table if exists
cur.execute("DROP TABLE IF EXISTS temp_devices")
# Create temporary table with source device
cur.execute(
"CREATE TABLE temp_devices AS SELECT * FROM Devices WHERE devMac = ?",
(mac_from,),
)
# Update temporary table to target MAC
cur.execute("UPDATE temp_devices SET devMac = ?", (mac_to,))
# Delete previous entry with target MAC
cur.execute("DELETE FROM Devices WHERE devMac = ?", (mac_to,))
# Insert new entry from temporary table
cur.execute(
"INSERT INTO Devices SELECT * FROM temp_devices WHERE devMac = ?", (mac_to,)
)
# Drop temporary table
cur.execute("DROP TABLE temp_devices")
conn.commit()
return jsonify(
{"success": True, "message": f"Device copied from {mac_from} to {mac_to}"}
)
except Exception as e:
conn.rollback()
return jsonify({"success": False, "error": str(e)})
finally:
conn.close()

View File

@@ -1,260 +0,0 @@
#!/usr/bin/env python
import os
import base64
import re
import sys
import sqlite3
from flask import jsonify, request, Response
import csv
from io import StringIO
from logger import mylog
# Register NetAlertX directories
INSTALL_PATH = os.getenv("NETALERTX_APP", "/app")
sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
from database import get_temp_db_connection # noqa: E402 [flake8 lint suppression]
from db.db_helper import get_table_json, get_device_condition_by_status # noqa: E402 [flake8 lint suppression]
# --------------------------
# Device Endpoints Functions
# --------------------------
def get_all_devices():
"""Retrieve all devices from the database."""
conn = get_temp_db_connection()
cur = conn.cursor()
cur.execute("SELECT * FROM Devices")
rows = cur.fetchall()
# Convert rows to list of dicts using column names
columns = [col[0] for col in cur.description]
devices = [dict(zip(columns, row)) for row in rows]
conn.close()
return jsonify({"success": True, "devices": devices})
def delete_devices(macs):
"""
Delete devices from the Devices table.
- If `macs` is None → delete ALL devices.
- If `macs` is a list → delete only matching MACs (supports wildcard '*').
"""
conn = get_temp_db_connection()
cur = conn.cursor()
if not macs:
# No MACs provided → delete all
cur.execute("DELETE FROM Devices")
conn.commit()
conn.close()
return jsonify({"success": True, "deleted": "all"})
deleted_count = 0
for mac in macs:
if "*" in mac:
# Wildcard matching
sql_pattern = mac.replace("*", "%")
cur.execute("DELETE FROM Devices WHERE devMAC LIKE ?", (sql_pattern,))
else:
# Exact match
cur.execute("DELETE FROM Devices WHERE devMAC = ?", (mac,))
deleted_count += cur.rowcount
conn.commit()
conn.close()
return jsonify({"success": True, "deleted_count": deleted_count})
def delete_all_with_empty_macs():
"""Delete devices with empty MAC addresses."""
conn = get_temp_db_connection()
cur = conn.cursor()
cur.execute("DELETE FROM Devices WHERE devMAC IS NULL OR devMAC = ''")
deleted = cur.rowcount
conn.commit()
conn.close()
return jsonify({"success": True, "deleted": deleted})
def delete_unknown_devices():
"""Delete devices marked as unknown."""
conn = get_temp_db_connection()
cur = conn.cursor()
cur.execute(
"""DELETE FROM Devices WHERE devName='(unknown)' OR devName='(name not found)'"""
)
conn.commit()
conn.close()
return jsonify({"success": True, "deleted": cur.rowcount})
def export_devices(export_format):
"""
Export devices from the Devices table in the desired format.
- If `macs` is None → delete ALL devices.
- If `macs` is a list → delete only matching MACs (supports wildcard '*').
"""
conn = get_temp_db_connection()
cur = conn.cursor()
# Fetch all devices
devices_json = get_table_json(cur, "SELECT * FROM Devices")
conn.close()
# Ensure columns exist
columns = devices_json.columnNames or (
list(devices_json["data"][0].keys()) if devices_json["data"] else []
)
if export_format == "json":
# Convert to standard dict for Flask JSON
return jsonify(
{"data": [row for row in devices_json["data"]], "columns": list(columns)}
)
elif export_format == "csv":
si = StringIO()
writer = csv.DictWriter(si, fieldnames=columns, quoting=csv.QUOTE_ALL)
writer.writeheader()
for row in devices_json.json["data"]:
writer.writerow(row)
return Response(
si.getvalue(),
mimetype="text/csv",
headers={"Content-Disposition": "attachment; filename=devices.csv"},
)
else:
return jsonify({"error": f"Unsupported format '{export_format}'"}), 400
def import_csv(file_storage=None):
data = ""
skipped = []
# 1. Try JSON `content` (base64-encoded CSV)
if request.is_json and request.json.get("content"):
try:
data = base64.b64decode(request.json["content"], validate=True).decode(
"utf-8"
)
except Exception as e:
return jsonify({"error": f"Base64 decode failed: {e}"}), 400
# 2. Otherwise, try uploaded file
elif file_storage:
data = file_storage.read().decode("utf-8")
# 3. Fallback: try local file (same as PHP `$file = '../../../config/devices.csv';`)
else:
config_root = os.environ.get("NETALERTX_CONFIG", "/data/config")
local_file = os.path.join(config_root, "devices.csv")
try:
with open(local_file, "r", encoding="utf-8") as f:
data = f.read()
except FileNotFoundError:
return jsonify({"error": "CSV file missing"}), 404
if not data:
return jsonify({"error": "No CSV data found"}), 400
# --- Clean up newlines inside quoted fields ---
data = re.sub(r'"([^"]*)"', lambda m: m.group(0).replace("\n", " "), data)
# --- Parse CSV ---
lines = data.splitlines()
reader = csv.reader(lines)
try:
header = [h.strip() for h in next(reader)]
except StopIteration:
return jsonify({"error": "CSV missing header"}), 400
# --- Wipe Devices table ---
conn = get_temp_db_connection()
sql = conn.cursor()
sql.execute("DELETE FROM Devices")
# --- Prepare insert ---
placeholders = ",".join(["?"] * len(header))
insert_sql = f"INSERT INTO Devices ({', '.join(header)}) VALUES ({placeholders})"
row_count = 0
for idx, row in enumerate(reader, start=1):
if len(row) != len(header):
skipped.append(idx)
continue
try:
sql.execute(insert_sql, [col.strip() for col in row])
row_count += 1
except sqlite3.Error as e:
mylog("error", [f"[ImportCSV] SQL ERROR row {idx}: {e}"])
skipped.append(idx)
conn.commit()
conn.close()
return jsonify({"success": True, "inserted": row_count, "skipped_lines": skipped})
def devices_totals():
conn = get_temp_db_connection()
sql = conn.cursor()
# Build a combined query with sub-selects for each status
query = f"""
SELECT
(SELECT COUNT(*) FROM Devices {get_device_condition_by_status("my")}) AS devices,
(SELECT COUNT(*) FROM Devices {get_device_condition_by_status("connected")}) AS connected,
(SELECT COUNT(*) FROM Devices {get_device_condition_by_status("favorites")}) AS favorites,
(SELECT COUNT(*) FROM Devices {get_device_condition_by_status("new")}) AS new,
(SELECT COUNT(*) FROM Devices {get_device_condition_by_status("down")}) AS down,
(SELECT COUNT(*) FROM Devices {get_device_condition_by_status("archived")}) AS archived
"""
sql.execute(query)
row = (
sql.fetchone()
) # returns a tuple like (devices, connected, favorites, new, down, archived)
conn.close()
# Return counts as JSON array
return jsonify(list(row))
def devices_by_status(status=None):
"""
Return devices filtered by status. Returns all if no status provided.
Possible statuses: my, connected, favorites, new, down, archived
"""
conn = get_temp_db_connection()
sql = conn.cursor()
# Build condition for SQL
condition = get_device_condition_by_status(status) if status else ""
query = f"SELECT * FROM Devices {condition}"
sql.execute(query)
table_data = []
for row in sql.fetchall():
r = dict(row) # Convert sqlite3.Row to dict for .get()
dev_name = r.get("devName", "")
if r.get("devFavorite") == 1:
dev_name = f'<span class="text-yellow">&#9733</span>&nbsp;{dev_name}'
table_data.append(
{
"id": r.get("devMac", ""),
"title": dev_name,
"favorite": r.get("devFavorite", 0),
}
)
conn.close()
return jsonify(table_data)

View File

@@ -1,145 +0,0 @@
#!/usr/bin/env python
import os
import sys
from datetime import datetime
from flask import jsonify
# Register NetAlertX directories
INSTALL_PATH = os.getenv("NETALERTX_APP", "/app")
sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
from database import get_temp_db_connection # noqa: E402 [flake8 lint suppression]
from helper import mylog # noqa: E402 [flake8 lint suppression]
from db.db_helper import row_to_json, get_date_from_period # noqa: E402 [flake8 lint suppression]
from utils.datetime_utils import ensure_datetime # noqa: E402 [flake8 lint suppression]
# --------------------------
# Events Endpoints Functions
# --------------------------
def create_event(
mac: str,
ip: str,
event_type: str = "Device Down",
additional_info: str = "",
pending_alert: int = 1,
event_time: datetime | None = None,
):
"""
Insert a single event into the Events table and return a standardized JSON response.
Exceptions will propagate to the caller.
"""
conn = get_temp_db_connection()
cur = conn.cursor()
if isinstance(event_time, str):
start_time = ensure_datetime(event_time)
start_time = ensure_datetime(event_time)
cur.execute(
"""
INSERT INTO Events (eve_MAC, eve_IP, eve_DateTime, eve_EventType, eve_AdditionalInfo, eve_PendingAlertEmail)
VALUES (?, ?, ?, ?, ?, ?)
""",
(mac, ip, start_time, event_type, additional_info, pending_alert),
)
conn.commit()
conn.close()
mylog("debug", f"[Events] Created event for {mac} ({event_type})")
return jsonify({"success": True, "message": f"Created event for {mac}"})
def get_events(mac=None):
"""
Fetch all events, or events for a specific MAC if provided.
Returns JSON list of events.
"""
conn = get_temp_db_connection()
cur = conn.cursor()
if mac:
sql = "SELECT * FROM Events WHERE eve_MAC=? ORDER BY eve_DateTime DESC"
cur.execute(sql, (mac,))
else:
sql = "SELECT * FROM Events ORDER BY eve_DateTime DESC"
cur.execute(sql)
rows = cur.fetchall()
events = [row_to_json(list(r.keys()), r) for r in rows]
conn.close()
return jsonify({"success": True, "events": events})
def delete_events_older_than(days):
"""Delete all events older than a specified number of days"""
conn = get_temp_db_connection()
cur = conn.cursor()
# Use a parameterized query with sqlite date function
sql = "DELETE FROM Events WHERE eve_DateTime <= date('now', ?)"
cur.execute(sql, [f"-{days} days"])
conn.commit()
conn.close()
return jsonify(
{"success": True, "message": f"Deleted events older than {days} days"}
)
def delete_events():
"""Delete all events"""
conn = get_temp_db_connection()
cur = conn.cursor()
sql = "DELETE FROM Events"
cur.execute(sql)
conn.commit()
conn.close()
return jsonify({"success": True, "message": "Deleted all events"})
def get_events_totals(period: str = "7 days"):
"""
Return counts for events and sessions totals over a given period.
period: "7 days", "1 month", "1 year", "100 years"
"""
# Convert period to SQLite date expression
period_date_sql = get_date_from_period(period)
conn = get_temp_db_connection()
cur = conn.cursor()
sql = f"""
SELECT
(SELECT COUNT(*) FROM Events WHERE eve_DateTime >= {period_date_sql}) AS all_events,
(SELECT COUNT(*) FROM Sessions WHERE
ses_DateTimeConnection >= {period_date_sql}
OR ses_DateTimeDisconnection >= {period_date_sql}
OR ses_StillConnected = 1
) AS sessions,
(SELECT COUNT(*) FROM Sessions WHERE
(ses_DateTimeConnection IS NULL AND ses_DateTimeDisconnection >= {period_date_sql})
OR (ses_DateTimeDisconnection IS NULL AND ses_StillConnected = 0 AND ses_DateTimeConnection >= {period_date_sql})
) AS missing,
(SELECT COUNT(*) FROM Events WHERE eve_DateTime >= {period_date_sql} AND eve_EventType LIKE 'VOIDED%') AS voided,
(SELECT COUNT(*) FROM Events WHERE eve_DateTime >= {period_date_sql} AND eve_EventType LIKE 'New Device') AS new,
(SELECT COUNT(*) FROM Events WHERE eve_DateTime >= {period_date_sql} AND eve_EventType LIKE 'Device Down') AS down
"""
cur.execute(sql)
row = cur.fetchone()
conn.close()
# Return as JSON array
result_json = [row[0], row[1], row[2], row[3], row[4], row[5]]
return jsonify(result_json)

View File

@@ -1,4 +1,15 @@
#!/usr/bin/env python
"""
NetAlertX MCP (Model Context Protocol) Server Endpoint.
This module implements an MCP server that exposes NetAlertX API endpoints as tools
for AI assistants. It provides JSON-RPC over HTTP and Server-Sent Events (SSE)
for tool discovery and execution.
The server maps OpenAPI specifications to MCP tools, allowing AIs to list available
tools and call them with appropriate parameters. Tools include device management,
network scanning, event querying, and more.
"""
import threading
from flask import Blueprint, request, jsonify, Response, stream_with_context
@@ -14,11 +25,18 @@ import queue
mcp_bp = Blueprint('mcp', __name__)
tools_bp = Blueprint('tools', __name__)
# Global session management for MCP SSE connections
mcp_sessions = {}
mcp_sessions_lock = threading.Lock()
def check_auth():
"""
Check if the request has valid authorization.
Returns:
bool: True if the Authorization header matches the expected API token, False otherwise.
"""
token = request.headers.get("Authorization")
expected_token = f"Bearer {get_setting_value('API_TOKEN')}"
return token == expected_token
@@ -28,6 +46,15 @@ def check_auth():
# Specs
# --------------------------
def openapi_spec():
"""
Generate the OpenAPI specification for NetAlertX tools.
This function returns a JSON representation of the available API endpoints
that are exposed as MCP tools, including paths, methods, and operation IDs.
Returns:
flask.Response: A JSON response containing the OpenAPI spec.
"""
# Spec matching actual available routes for MCP tools
mylog("verbose", ["[MCP] OpenAPI spec requested"])
spec = {
@@ -35,17 +62,112 @@ def openapi_spec():
"info": {"title": "NetAlertX Tools", "version": "1.1.0"},
"servers": [{"url": "/"}],
"paths": {
"/devices/by-status": {"post": {"operationId": "list_devices"}},
"/device/{mac}": {"post": {"operationId": "get_device_info"}},
"/devices/search": {"post": {"operationId": "search_devices"}},
"/devices/latest": {"get": {"operationId": "get_latest_device"}},
"/nettools/trigger-scan": {"post": {"operationId": "trigger_scan"}},
"/device/open_ports": {"post": {"operationId": "get_open_ports"}},
"/devices/network/topology": {"get": {"operationId": "get_network_topology"}},
"/events/recent": {"get": {"operationId": "get_recent_alerts"}, "post": {"operationId": "get_recent_alerts"}},
"/events/last": {"get": {"operationId": "get_last_events"}, "post": {"operationId": "get_last_events"}},
"/device/{mac}/set-alias": {"post": {"operationId": "set_device_alias"}},
"/nettools/wakeonlan": {"post": {"operationId": "wol_wake_device"}}
"/devices/by-status": {
"post": {
"operationId": "list_devices",
"description": "List devices filtered by their online/offline status. "
"Accepts optional 'status' query parameter (online/offline)."
}
},
"/device/{mac}": {
"post": {
"operationId": "get_device_info",
"description": "Retrieve detailed information about a specific device by MAC address."
}
},
"/devices/search": {
"post": {
"operationId": "search_devices",
"description": "Search for devices based on various criteria like name, IP, etc. "
"Accepts JSON with 'query' field."
}
},
"/devices/latest": {
"get": {
"operationId": "get_latest_device",
"description": "Get information about the most recently seen device."
}
},
"/devices/favorite": {
"get": {
"operationId": "get_favorite_devices",
"description": "Get favorite devices."
}
},
"/nettools/trigger-scan": {
"post": {
"operationId": "trigger_scan",
"description": "Trigger a network scan to discover new devices. "
"Accepts optional 'type' parameter for scan type - needs to match an enabled plugin name (e.g., ARPSCAN, NMAPDEV, NMAP)."
}
},
"/device/open_ports": {
"post": {
"operationId": "get_open_ports",
"description": "Get a list of open ports for a specific device. "
"Accepts JSON with 'target' (IP or MAC address). Trigger NMAP scan if no previous ports found with the /nettools/trigger-scan endpoint."
}
},
"/devices/network/topology": {
"get": {
"operationId": "get_network_topology",
"description": "Retrieve the network topology information."
}
},
"/events/recent": {
"get": {
"operationId": "get_recent_alerts",
"description": "Get recent events/alerts from the system. Defaults to last 24 hours."
},
"post": {"operationId": "get_recent_alerts"}
},
"/events/last": {
"get": {
"operationId": "get_last_events",
"description": "Get the last 10 events logged in the system."
},
"post": {"operationId": "get_last_events"}
},
"/device/{mac}/set-alias": {
"post": {
"operationId": "set_device_alias",
"description": "Set or update the alias/name for a device. Accepts JSON with 'alias' field."
}
},
"/nettools/wakeonlan": {
"post": {
"operationId": "wol_wake_device",
"description": "Send a Wake-on-LAN packet to wake up a device. "
"Accepts JSON with 'devMac' or 'devLastIP'."
}
},
"/devices/export": {
"get": {
"operationId": "export_devices",
"description": "Export devices in CSV or JSON format. "
"Accepts optional 'format' query parameter (csv/json, defaults to csv)."
}
},
"/devices/import": {
"post": {
"operationId": "import_devices",
"description": "Import devices from CSV or JSON content. "
"Accepts JSON with 'content' field containing base64-encoded data, or multipart file upload."
}
},
"/devices/totals": {
"get": {
"operationId": "get_device_totals",
"description": "Get device statistics and counts."
}
},
"/nettools/traceroute": {
"post": {
"operationId": "traceroute",
"description": "Perform a traceroute to a target IP address. "
"Accepts JSON with 'devLastIP' field."
}
}
}
}
return jsonify(spec)
@@ -57,11 +179,20 @@ def openapi_spec():
# Sessions for SSE
_openapi_spec_cache = None
API_BASE_URL = f"http://localhost:{get_setting_value('GRAPHQL_PORT')}"
_openapi_spec_cache = None # Cached OpenAPI spec to avoid repeated generation
API_BASE_URL = f"http://localhost:{get_setting_value('GRAPHQL_PORT')}" # Base URL for internal API calls
def get_openapi_spec():
"""
Retrieve the cached OpenAPI specification for MCP tools.
This function caches the OpenAPI spec to avoid repeated generation.
If the cache is empty, it calls openapi_spec() to generate it.
Returns:
dict or None: The OpenAPI spec as a dictionary, or None if generation fails.
"""
global _openapi_spec_cache
if _openapi_spec_cache:
@@ -78,6 +209,15 @@ def get_openapi_spec():
def map_openapi_to_mcp_tools(spec):
"""
Convert an OpenAPI specification into MCP tool definitions.
Args:
spec (dict): The OpenAPI spec dictionary.
Returns:
list: A list of MCP tool dictionaries, each containing name, description, and inputSchema.
"""
tools = []
if not spec or 'paths' not in spec:
return tools
@@ -101,6 +241,18 @@ def map_openapi_to_mcp_tools(spec):
def process_mcp_request(data):
"""
Process an incoming MCP JSON-RPC request.
Handles various MCP methods like initialize, tools/list, tools/call, etc.
For tools/call, it maps the tool name to an API endpoint and makes the call.
Args:
data (dict): The JSON-RPC request data containing method, id, params, etc.
Returns:
dict or None: The JSON-RPC response, or None for notifications.
"""
method = data.get('method')
msg_id = data.get('id')
if method == 'initialize':
@@ -157,6 +309,15 @@ def process_mcp_request(data):
def mcp_messages():
"""
Handle MCP messages for a specific session via HTTP POST.
This endpoint processes JSON-RPC requests for an existing MCP session.
The session_id is passed as a query parameter.
Returns:
flask.Response: JSON response indicating acceptance or error.
"""
session_id = request.args.get('session_id')
if not session_id:
return jsonify({"error": "Missing session_id"}), 400
@@ -174,6 +335,16 @@ def mcp_messages():
def mcp_sse():
"""
Handle MCP Server-Sent Events (SSE) endpoint.
Supports both GET (for establishing SSE stream) and POST (for direct JSON-RPC).
For GET, creates a new session and streams responses.
For POST, processes the request directly and returns the response.
Returns:
flask.Response: SSE stream for GET, JSON response for POST.
"""
if request.method == 'POST':
try:
data = request.get_json(silent=True)