This commit is contained in:
jokob-sk
2026-02-03 20:40:16 +11:00
13 changed files with 164 additions and 79 deletions

View File

@@ -72,6 +72,7 @@ from .openapi.schemas import ( # noqa: E402 [flake8 lint suppression]
DeviceUpdateRequest,
DeviceInfo,
BaseResponse, DeviceTotalsResponse,
DeviceTotalsNamedResponse,
DeleteDevicesRequest, DeviceImportRequest,
DeviceImportResponse, UpdateDeviceColumnRequest,
LockDeviceFieldRequest, UnlockDeviceFieldsRequest,
@@ -289,7 +290,6 @@ def api_get_setting(setKey):
# --------------------------
# Device Endpoints
# --------------------------
@app.route('/mcp/sse/device/<mac>', methods=['GET', 'POST'])
@app.route("/device/<mac>", methods=["GET"])
@validate_request(
operation_id="get_device_info",
@@ -432,7 +432,7 @@ def api_device_copy(payload=None):
@validate_request(
operation_id="update_device_column",
summary="Update Device Column",
description="Update a specific database column for a device.",
description="Update a specific database column for a device. Use this to mark devices as favorites (columnName='devFavorite', columnValue=1). See `get_favorite_devices` to retrieve them.",
path_params=[{
"name": "mac",
"description": "Device MAC address",
@@ -554,7 +554,6 @@ def api_device_fields_unlock(payload=None):
# Devices Collections
# --------------------------
@app.route('/mcp/sse/device/<mac>/set-alias', methods=['POST'])
@app.route('/device/<mac>/set-alias', methods=['POST'])
@validate_request(
operation_id="set_device_alias",
@@ -582,16 +581,25 @@ def api_device_set_alias(mac, payload=None):
return jsonify(result)
@app.route('/mcp/sse/device/open_ports', methods=['POST'])
@app.route('/device/open_ports', methods=['POST'])
@validate_request(
operation_id="get_open_ports",
summary="Get Open Ports",
description="Retrieve open ports for a target IP or MAC address. Returns cached NMAP scan results.",
description="Retrieve open ports for a target IP or MAC address. Returns cached NMAP scan results. If no ports are found, run a scan first using `run_nmap_scan`.",
request_model=OpenPortsRequest,
response_model=OpenPortsResponse,
tags=["nettools"],
auth_callable=is_authorized
auth_callable=is_authorized,
links={
"RunNmapScan": {
"operationId": "run_nmap_scan",
"parameters": {
"scan": "$response.body#/target",
"mode": "fast"
},
"description": "Refresh the open ports data by running a new NMAP scan on this target."
}
}
)
def api_device_open_ports(payload=None):
"""Get stored NMAP open ports for a target IP or MAC."""
@@ -606,7 +614,7 @@ def api_device_open_ports(payload=None):
open_ports = device_handler.getOpenPorts(target)
if not open_ports:
return jsonify({"success": False, "error": f"No stored open ports for {target}. Run a scan with `/nettools/trigger-scan`"}), 404
return jsonify({"success": False, "error": f"No stored open ports for {target}. Run a scan with the 'run_nmap_scan' tool (or /nettools/nmap)."}), 404
return jsonify({"success": True, "target": target, "open_ports": open_ports})
@@ -674,7 +682,6 @@ def api_delete_unknown_devices(payload=None):
return jsonify(device_handler.deleteUnknownDevices())
@app.route('/mcp/sse/devices/export', methods=['GET'])
@app.route("/devices/export", methods=["GET"])
@app.route("/devices/export/<format>", methods=["GET"])
@validate_request(
@@ -716,7 +723,6 @@ def api_export_devices(format=None, payload=None):
)
@app.route('/mcp/sse/devices/import', methods=['POST'])
@app.route("/devices/import", methods=["POST"])
@validate_request(
operation_id="import_devices",
@@ -746,12 +752,11 @@ def api_import_csv(payload=None):
return jsonify(result)
@app.route('/mcp/sse/devices/totals', methods=['GET'])
@app.route("/devices/totals", methods=["GET"])
@validate_request(
operation_id="get_device_totals",
summary="Get Device Totals",
description="Get device statistics including total count, online/offline counts, new devices, and archived devices.",
summary="Get Device Totals (Deprecated)",
description="Get device statistics including total count, online/offline counts, new devices, and archived devices. Deprecated: use /devices/totals/named instead.",
response_model=DeviceTotalsResponse,
tags=["devices"],
auth_callable=is_authorized
@@ -761,7 +766,30 @@ def api_devices_totals(payload=None):
return jsonify(device_handler.getTotals())
@app.route('/mcp/sse/devices/by-status', methods=['GET', 'POST'])
@app.route("/devices/totals/named", methods=["GET"])
@validate_request(
operation_id="get_device_totals_named",
summary="Get Named Device Totals",
description="Get device statistics with named fields including total count, online/offline counts, new devices, and archived devices.",
response_model=DeviceTotalsNamedResponse,
tags=["devices"],
auth_callable=is_authorized
)
def api_devices_totals_named(payload=None):
device_handler = DeviceInstance()
totals_list = device_handler.getTotals()
# totals_list order: [devices, connected, favorites, new, down, archived]
totals_dict = {
"devices": totals_list[0] if len(totals_list) > 0 else 0,
"connected": totals_list[1] if len(totals_list) > 1 else 0,
"favorites": totals_list[2] if len(totals_list) > 2 else 0,
"new": totals_list[3] if len(totals_list) > 3 else 0,
"down": totals_list[4] if len(totals_list) > 4 else 0,
"archived": totals_list[5] if len(totals_list) > 5 else 0
}
return jsonify({"success": True, "totals": totals_dict})
@app.route("/devices/by-status", methods=["GET", "POST"])
@validate_request(
operation_id="list_devices_by_status_api",
@@ -811,12 +839,11 @@ def api_devices_by_status(payload: DeviceListRequest = None):
return jsonify(device_handler.getByStatus(status))
@app.route('/mcp/sse/devices/search', methods=['POST'])
@app.route('/devices/search', methods=['POST'])
@validate_request(
operation_id="search_devices_api",
summary="Search Devices",
description="Search for devices based on various criteria like name, IP, MAC, or vendor.",
description="Search for devices based on various criteria like name, IP, MAC, or vendor. Use this to find MAC addresses for other tools.",
request_model=DeviceSearchRequest,
response_model=DeviceSearchResponse,
tags=["devices"],
@@ -878,7 +905,6 @@ def api_devices_search(payload=None):
return jsonify({"success": True, "devices": matches})
@app.route('/mcp/sse/devices/latest', methods=['GET'])
@app.route('/devices/latest', methods=['GET'])
@validate_request(
operation_id="get_latest_device",
@@ -899,12 +925,11 @@ def api_devices_latest(payload=None):
return jsonify([latest])
@app.route('/mcp/sse/devices/favorite', methods=['GET'])
@app.route('/devices/favorite', methods=['GET'])
@validate_request(
operation_id="get_favorite_devices",
summary="Get Favorite Devices",
description="Get list of devices marked as favorites.",
description="Get list of devices marked as favorites. Use `update_device_column` with 'devFavorite' to add devices.",
response_model=DeviceListResponse,
tags=["devices"],
auth_callable=is_authorized
@@ -916,11 +941,10 @@ def api_devices_favorite(payload=None):
favorite = device_handler.getFavorite()
if not favorite:
return jsonify({"success": False, "message": "No devices found", "error": "No devices found"}), 404
return jsonify({"success": False, "message": "No devices found", "error": "No favorite devices found. Mark devices using `update_device_column`."}), 404
return jsonify([favorite])
@app.route('/mcp/sse/devices/network/topology', methods=['GET'])
@app.route('/devices/network/topology', methods=['GET'])
@validate_request(
operation_id="get_network_topology",
@@ -942,7 +966,6 @@ def api_devices_network_topology(payload=None):
# --------------------------
# Net tools
# --------------------------
@app.route('/mcp/sse/nettools/wakeonlan', methods=['POST'])
@app.route("/nettools/wakeonlan", methods=["POST"])
@validate_request(
operation_id="wake_on_lan",
@@ -979,7 +1002,6 @@ def api_wakeonlan(payload=None):
return wakeonlan(mac)
@app.route('/mcp/sse/nettools/traceroute', methods=['POST'])
@app.route("/nettools/traceroute", methods=["POST"])
@validate_request(
operation_id="perform_traceroute",
@@ -1036,11 +1058,20 @@ def api_nslookup(payload: NslookupRequest = None):
@validate_request(
operation_id="run_nmap_scan",
summary="NMAP Scan",
description="Perform an NMAP scan on a target IP.",
description="Perform an NMAP scan on a target IP to identify open ports. This data is used by `get_open_ports`.",
request_model=NmapScanRequest,
response_model=NmapScanResponse,
tags=["nettools"],
auth_callable=is_authorized
auth_callable=is_authorized,
links={
"GetOpenPorts": {
"operationId": "get_open_ports",
"parameters": {
"target": "$response.body#/ip"
},
"description": "View the open ports discovered by this scan."
}
}
)
def api_nmap(payload: NmapScanRequest = None):
"""
@@ -1084,7 +1115,6 @@ def api_network_interfaces(payload=None):
return network_interfaces()
@app.route('/mcp/sse/nettools/trigger-scan', methods=['POST'])
@app.route("/nettools/trigger-scan", methods=["GET", "POST"])
@validate_request(
operation_id="trigger_network_scan",
@@ -1139,7 +1169,6 @@ def api_trigger_scan(payload=None):
# MCP Server
# --------------------------
@app.route('/openapi.json', methods=['GET'])
@app.route('/mcp/sse/openapi.json', methods=['GET'])
def serve_openapi_spec():
# Allow unauthenticated access to the spec itself so Swagger UI can load.
# The actual API endpoints remain protected.
@@ -1356,7 +1385,7 @@ def api_add_to_execution_queue(payload=None):
path_params=[{
"name": "mac",
"description": "Device MAC address",
"schema": {"type": "string"}
"schema": {"type": "string", "pattern": "^([0-9A-Fa-f]{2}[:-]){5}([0-9A-Fa-f]{2})$"}
}],
request_model=CreateEventRequest,
response_model=BaseResponse,
@@ -1498,7 +1527,6 @@ def api_get_events_totals(payload=None):
return jsonify(totals)
@app.route('/mcp/sse/events/recent', methods=['GET', 'POST'])
@app.route('/events/recent', methods=['GET', 'POST'])
@validate_request(
operation_id="get_recent_events",
@@ -1518,7 +1546,6 @@ def api_events_default_24h(payload=None):
return api_events_recent(hours)
@app.route('/mcp/sse/events/last', methods=['GET', 'POST'])
@app.route('/events/last', methods=['GET', 'POST'])
@validate_request(
operation_id="get_last_events",
@@ -1707,7 +1734,7 @@ def api_get_session_events(payload=None):
auth_callable=is_authorized
)
def metrics(payload=None):
# Return Prometheus metrics as plain text
# Return Prometheus metrics as plain text (not JSON)
return Response(get_metric_stats(), mimetype="text/plain")

View File

@@ -749,7 +749,8 @@ def _execute_tool(route: Dict[str, Any], args: Dict[str, Any]) -> Dict[str, Any]
"type": "text",
"text": json.dumps(json_content, indent=2)
})
except json.JSONDecodeError:
except (json.JSONDecodeError, ValueError):
# Fallback for endpoints that return plain text instead of JSON (e.g., /metrics)
content.append({
"type": "text",
"text": api_response.text

View File

@@ -39,8 +39,7 @@ ALLOWED_DEVICE_COLUMNS = Literal[
]
ALLOWED_NMAP_MODES = Literal[
"quick", "intense", "ping", "comprehensive", "fast", "normal", "detail", "skipdiscovery",
"-sS", "-sT", "-sU", "-sV", "-O"
"fast", "normal", "detail", "skipdiscovery"
]
NOTIFICATION_LEVELS = Literal["info", "warning", "error", "alert", "interrupt"]
@@ -301,6 +300,24 @@ class DeviceTotalsResponse(RootModel):
root: List[int] = Field(default_factory=list, description="List of counts: [all, online, favorites, new, offline, archived]")
class DeviceTotalsNamedResponse(BaseResponse):
"""Response with named device statistics."""
totals: Dict[str, int] = Field(
...,
description="Dictionary of counts",
json_schema_extra={
"examples": [{
"devices": 10,
"connected": 5,
"favorites": 2,
"new": 1,
"down": 0,
"archived": 2
}]
}
)
class DeviceExportRequest(BaseModel):
"""Request for exporting devices."""
format: Literal["csv", "json"] = Field(
@@ -702,7 +719,7 @@ class SessionInfo(BaseModel):
class CreateSessionRequest(BaseModel):
"""Request to create a session."""
mac: str = Field(..., description="Device MAC")
mac: str = Field(..., description="Device MAC", pattern=MAC_PATTERN)
ip: str = Field(..., description="Device IP")
start_time: str = Field(..., description="Start time")
end_time: Optional[str] = Field(None, description="End time")