Files
NetAlertX/test/api_endpoints/test_mcp_tools_endpoints.py
Jokob @NetAlertX 8c982cd476 MCP refactor
Signed-off-by: GitHub <noreply@github.com>
2025-12-07 08:20:51 +00:00

306 lines
11 KiB
Python

import sys
import os
import pytest
from unittest.mock import patch, MagicMock
from datetime import datetime
INSTALL_PATH = os.getenv('NETALERTX_APP', '/app')
sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
from helper import get_setting_value # noqa: E402
from api_server.api_server_start import app # noqa: E402
@pytest.fixture(scope="session")
def api_token():
return get_setting_value("API_TOKEN")
@pytest.fixture
def client():
with app.test_client() as client:
yield client
def auth_headers(token):
return {"Authorization": f"Bearer {token}"}
# --- Device Search Tests ---
@patch('models.device_instance.get_temp_db_connection')
def test_get_device_info_ip_partial(mock_db_conn, client, api_token):
"""Test device search with partial IP search."""
# Mock database connection - DeviceInstance._fetchall calls conn.execute().fetchall()
mock_conn = MagicMock()
mock_execute_result = MagicMock()
mock_execute_result.fetchall.return_value = [
{"devName": "Test Device", "devMac": "AA:BB:CC:DD:EE:FF", "devLastIP": "192.168.1.50"}
]
mock_conn.execute.return_value = mock_execute_result
mock_db_conn.return_value = mock_conn
payload = {"query": ".50"}
response = client.post('/devices/search',
json=payload,
headers=auth_headers(api_token))
assert response.status_code == 200
data = response.get_json()
assert data["success"] is True
assert len(data["devices"]) == 1
assert data["devices"][0]["devLastIP"] == "192.168.1.50"
# --- Trigger Scan Tests ---
@patch('api_server.api_server_start.UserEventsQueueInstance')
def test_trigger_scan_ARPSCAN(mock_queue_class, client, api_token):
"""Test trigger_scan with ARPSCAN type."""
mock_queue = MagicMock()
mock_queue_class.return_value = mock_queue
payload = {"type": "ARPSCAN"}
response = client.post('/mcp/sse/nettools/trigger-scan',
json=payload,
headers=auth_headers(api_token))
assert response.status_code == 200
data = response.get_json()
assert data["success"] is True
mock_queue.add_event.assert_called_once()
call_args = mock_queue.add_event.call_args[0]
assert "run|ARPSCAN" in call_args[0]
@patch('api_server.api_server_start.UserEventsQueueInstance')
def test_trigger_scan_invalid_type(mock_queue_class, client, api_token):
"""Test trigger_scan with invalid scan type."""
mock_queue = MagicMock()
mock_queue_class.return_value = mock_queue
payload = {"type": "invalid_type", "target": "192.168.1.0/24"}
response = client.post('/mcp/sse/nettools/trigger-scan',
json=payload,
headers=auth_headers(api_token))
assert response.status_code == 400
data = response.get_json()
assert data["success"] is False
# --- get_open_ports Tests ---
@patch('models.plugin_object_instance.get_temp_db_connection')
@patch('models.device_instance.get_temp_db_connection')
def test_get_open_ports_ip(mock_plugin_db_conn, mock_device_db_conn, client, api_token):
"""Test get_open_ports with an IP address."""
# Mock database connections for both device lookup and plugin objects
mock_conn = MagicMock()
mock_execute_result = MagicMock()
# Mock for PluginObjectInstance.getByField (returns port data)
mock_execute_result.fetchall.return_value = [
{"Object_SecondaryID": "22", "Watched_Value2": "ssh"},
{"Object_SecondaryID": "80", "Watched_Value2": "http"}
]
# Mock for DeviceInstance.getByIP (returns device with MAC)
mock_execute_result.fetchone.return_value = {"devMac": "AA:BB:CC:DD:EE:FF"}
mock_conn.execute.return_value = mock_execute_result
mock_plugin_db_conn.return_value = mock_conn
mock_device_db_conn.return_value = mock_conn
payload = {"target": "192.168.1.1"}
response = client.post('/device/open_ports',
json=payload,
headers=auth_headers(api_token))
assert response.status_code == 200
data = response.get_json()
assert data["success"] is True
assert len(data["open_ports"]) == 2
assert data["open_ports"][0]["port"] == 22
assert data["open_ports"][1]["service"] == "http"
@patch('models.plugin_object_instance.get_temp_db_connection')
def test_get_open_ports_mac_resolve(mock_plugin_db_conn, client, api_token):
"""Test get_open_ports with a MAC address that resolves to an IP."""
# Mock database connection for MAC-based open ports query
mock_conn = MagicMock()
mock_execute_result = MagicMock()
mock_execute_result.fetchall.return_value = [
{"Object_SecondaryID": "80", "Watched_Value2": "http"}
]
mock_conn.execute.return_value = mock_execute_result
mock_plugin_db_conn.return_value = mock_conn
payload = {"target": "AA:BB:CC:DD:EE:FF"}
response = client.post('/device/open_ports',
json=payload,
headers=auth_headers(api_token))
assert response.status_code == 200
data = response.get_json()
assert data["success"] is True
assert "target" in data
assert len(data["open_ports"]) == 1
assert data["open_ports"][0]["port"] == 80
# --- get_network_topology Tests ---
@patch('models.device_instance.get_temp_db_connection')
def test_get_network_topology(mock_db_conn, client, api_token):
"""Test get_network_topology."""
# Mock database connection for topology query
mock_conn = MagicMock()
mock_execute_result = MagicMock()
mock_execute_result.fetchall.return_value = [
{"devName": "Router", "devMac": "AA:AA:AA:AA:AA:AA", "devParentMAC": None, "devParentPort": None, "devVendor": "VendorA"},
{"devName": "Device1", "devMac": "BB:BB:BB:BB:BB:BB", "devParentMAC": "AA:AA:AA:AA:AA:AA", "devParentPort": "eth1", "devVendor": "VendorB"}
]
mock_conn.execute.return_value = mock_execute_result
mock_db_conn.return_value = mock_conn
response = client.get('/devices/network/topology',
headers=auth_headers(api_token))
assert response.status_code == 200
data = response.get_json()
assert len(data["nodes"]) == 2
assert len(data["links"]) == 1
assert data["links"][0]["source"] == "AA:AA:AA:AA:AA:AA"
assert data["links"][0]["target"] == "BB:BB:BB:BB:BB:BB"
# --- get_recent_alerts Tests ---
@patch('models.event_instance.get_temp_db_connection')
def test_get_recent_alerts(mock_db_conn, client, api_token):
"""Test get_recent_alerts."""
# Mock database connection for events query
mock_conn = MagicMock()
mock_execute_result = MagicMock()
now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
mock_execute_result.fetchall.return_value = [
{"eve_DateTime": now, "eve_EventType": "New Device", "eve_MAC": "AA:BB:CC:DD:EE:FF"}
]
mock_conn.execute.return_value = mock_execute_result
mock_db_conn.return_value = mock_conn
response = client.get('/events/recent',
headers=auth_headers(api_token))
assert response.status_code == 200
data = response.get_json()
assert data["success"] is True
assert data["hours"] == 24
# --- Device Alias Tests ---
@patch('api_server.api_server_start.update_device_column')
def test_set_device_alias(mock_update_col, client, api_token):
"""Test set_device_alias."""
mock_update_col.return_value = {"success": True, "message": "Device alias updated"}
payload = {"alias": "New Device Name"}
response = client.post('/device/AA:BB:CC:DD:EE:FF/set-alias',
json=payload,
headers=auth_headers(api_token))
assert response.status_code == 200
data = response.get_json()
assert data["success"] is True
mock_update_col.assert_called_once_with("AA:BB:CC:DD:EE:FF", "devName", "New Device Name")
@patch('api_server.api_server_start.update_device_column')
def test_set_device_alias_not_found(mock_update_col, client, api_token):
"""Test set_device_alias when device is not found."""
mock_update_col.return_value = {"success": False, "error": "Device not found"}
payload = {"alias": "New Device Name"}
response = client.post('/device/FF:FF:FF:FF:FF:FF/set-alias',
json=payload,
headers=auth_headers(api_token))
assert response.status_code == 200
data = response.get_json()
assert data["success"] is False
assert "Device not found" in data["error"]
# --- Wake-on-LAN Tests ---
@patch('api_server.api_server_start.wakeonlan')
def test_wol_wake_device(mock_wakeonlan, client, api_token):
"""Test wol_wake_device."""
mock_wakeonlan.return_value = {"success": True, "message": "WOL packet sent to AA:BB:CC:DD:EE:FF"}
payload = {"devMac": "AA:BB:CC:DD:EE:FF"}
response = client.post('/nettools/wakeonlan',
json=payload,
headers=auth_headers(api_token))
assert response.status_code == 200
data = response.get_json()
assert data["success"] is True
assert "AA:BB:CC:DD:EE:FF" in data["message"]
def test_wol_wake_device_invalid_mac(client, api_token):
"""Test wol_wake_device with invalid MAC."""
payload = {"devMac": "invalid-mac"}
response = client.post('/nettools/wakeonlan',
json=payload,
headers=auth_headers(api_token))
assert response.status_code == 400
data = response.get_json()
assert data["success"] is False
# --- OpenAPI Spec Tests ---
# --- Latest Device Tests ---
@patch('models.device_instance.get_temp_db_connection')
def test_get_latest_device(mock_db_conn, client, api_token):
"""Test get_latest_device endpoint."""
# Mock database connection for latest device query
mock_conn = MagicMock()
mock_execute_result = MagicMock()
mock_execute_result.fetchone.return_value = {
"devName": "Latest Device",
"devMac": "AA:BB:CC:DD:EE:FF",
"devLastIP": "192.168.1.100",
"devFirstConnection": "2025-12-07 10:30:00"
}
mock_conn.execute.return_value = mock_execute_result
mock_db_conn.return_value = mock_conn
response = client.get('/devices/latest',
headers=auth_headers(api_token))
assert response.status_code == 200
data = response.get_json()
assert len(data) == 1
assert data[0]["devName"] == "Latest Device"
assert data[0]["devMac"] == "AA:BB:CC:DD:EE:FF"
def test_openapi_spec(client, api_token):
"""Test openapi_spec endpoint contains MCP tool paths."""
response = client.get('/mcp/sse/openapi.json', headers=auth_headers(api_token))
assert response.status_code == 200
spec = response.get_json()
# Check for MCP tool endpoints in the spec with correct paths
assert "/nettools/trigger-scan" in spec["paths"]
assert "/device/open_ports" in spec["paths"]
assert "/devices/network/topology" in spec["paths"]
assert "/events/recent" in spec["paths"]
assert "/device/{mac}/set-alias" in spec["paths"]
assert "/nettools/wakeonlan" in spec["paths"]