import sys import os import pytest from unittest.mock import patch, MagicMock from datetime import datetime INSTALL_PATH = os.getenv('NETALERTX_APP', '/app') sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"]) from helper import get_setting_value # noqa: E402 from api_server.api_server_start import app # noqa: E402 @pytest.fixture(scope="session") def api_token(): return get_setting_value("API_TOKEN") @pytest.fixture def client(): with app.test_client() as client: yield client def auth_headers(token): return {"Authorization": f"Bearer {token}"} # --- Device Search Tests --- @patch('models.device_instance.get_temp_db_connection') def test_get_device_info_ip_partial(mock_db_conn, client, api_token): """Test device search with partial IP search.""" # Mock database connection - DeviceInstance._fetchall calls conn.execute().fetchall() mock_conn = MagicMock() mock_execute_result = MagicMock() mock_execute_result.fetchall.return_value = [ {"devName": "Test Device", "devMac": "AA:BB:CC:DD:EE:FF", "devLastIP": "192.168.1.50"} ] mock_conn.execute.return_value = mock_execute_result mock_db_conn.return_value = mock_conn payload = {"query": ".50"} response = client.post('/devices/search', json=payload, headers=auth_headers(api_token)) assert response.status_code == 200 data = response.get_json() assert data["success"] is True assert len(data["devices"]) == 1 assert data["devices"][0]["devLastIP"] == "192.168.1.50" # --- Trigger Scan Tests --- @patch('api_server.api_server_start.UserEventsQueueInstance') def test_trigger_scan_ARPSCAN(mock_queue_class, client, api_token): """Test trigger_scan with ARPSCAN type.""" mock_queue = MagicMock() mock_queue_class.return_value = mock_queue payload = {"type": "ARPSCAN"} response = client.post('/mcp/sse/nettools/trigger-scan', json=payload, headers=auth_headers(api_token)) assert response.status_code == 200 data = response.get_json() assert data["success"] is True mock_queue.add_event.assert_called_once() call_args = mock_queue.add_event.call_args[0] assert "run|ARPSCAN" in call_args[0] @patch('api_server.api_server_start.UserEventsQueueInstance') def test_trigger_scan_invalid_type(mock_queue_class, client, api_token): """Test trigger_scan with invalid scan type.""" mock_queue = MagicMock() mock_queue_class.return_value = mock_queue payload = {"type": "invalid_type", "target": "192.168.1.0/24"} response = client.post('/mcp/sse/nettools/trigger-scan', json=payload, headers=auth_headers(api_token)) assert response.status_code == 400 data = response.get_json() assert data["success"] is False # --- get_open_ports Tests --- @patch('models.plugin_object_instance.get_temp_db_connection') @patch('models.device_instance.get_temp_db_connection') def test_get_open_ports_ip(mock_plugin_db_conn, mock_device_db_conn, client, api_token): """Test get_open_ports with an IP address.""" # Mock database connections for both device lookup and plugin objects mock_conn = MagicMock() mock_execute_result = MagicMock() # Mock for PluginObjectInstance.getByField (returns port data) mock_execute_result.fetchall.return_value = [ {"Object_SecondaryID": "22", "Watched_Value2": "ssh"}, {"Object_SecondaryID": "80", "Watched_Value2": "http"} ] # Mock for DeviceInstance.getByIP (returns device with MAC) mock_execute_result.fetchone.return_value = {"devMac": "AA:BB:CC:DD:EE:FF"} mock_conn.execute.return_value = mock_execute_result mock_plugin_db_conn.return_value = mock_conn mock_device_db_conn.return_value = mock_conn payload = {"target": "192.168.1.1"} response = client.post('/device/open_ports', json=payload, headers=auth_headers(api_token)) assert response.status_code == 200 data = response.get_json() assert data["success"] is True assert len(data["open_ports"]) == 2 assert data["open_ports"][0]["port"] == 22 assert data["open_ports"][1]["service"] == "http" @patch('models.plugin_object_instance.get_temp_db_connection') def test_get_open_ports_mac_resolve(mock_plugin_db_conn, client, api_token): """Test get_open_ports with a MAC address that resolves to an IP.""" # Mock database connection for MAC-based open ports query mock_conn = MagicMock() mock_execute_result = MagicMock() mock_execute_result.fetchall.return_value = [ {"Object_SecondaryID": "80", "Watched_Value2": "http"} ] mock_conn.execute.return_value = mock_execute_result mock_plugin_db_conn.return_value = mock_conn payload = {"target": "AA:BB:CC:DD:EE:FF"} response = client.post('/device/open_ports', json=payload, headers=auth_headers(api_token)) assert response.status_code == 200 data = response.get_json() assert data["success"] is True assert "target" in data assert len(data["open_ports"]) == 1 assert data["open_ports"][0]["port"] == 80 # --- get_network_topology Tests --- @patch('models.device_instance.get_temp_db_connection') def test_get_network_topology(mock_db_conn, client, api_token): """Test get_network_topology.""" # Mock database connection for topology query mock_conn = MagicMock() mock_execute_result = MagicMock() mock_execute_result.fetchall.return_value = [ {"devName": "Router", "devMac": "AA:AA:AA:AA:AA:AA", "devParentMAC": None, "devParentPort": None, "devVendor": "VendorA"}, {"devName": "Device1", "devMac": "BB:BB:BB:BB:BB:BB", "devParentMAC": "AA:AA:AA:AA:AA:AA", "devParentPort": "eth1", "devVendor": "VendorB"} ] mock_conn.execute.return_value = mock_execute_result mock_db_conn.return_value = mock_conn response = client.get('/devices/network/topology', headers=auth_headers(api_token)) assert response.status_code == 200 data = response.get_json() assert len(data["nodes"]) == 2 assert len(data["links"]) == 1 assert data["links"][0]["source"] == "AA:AA:AA:AA:AA:AA" assert data["links"][0]["target"] == "BB:BB:BB:BB:BB:BB" # --- get_recent_alerts Tests --- @patch('models.event_instance.get_temp_db_connection') def test_get_recent_alerts(mock_db_conn, client, api_token): """Test get_recent_alerts.""" # Mock database connection for events query mock_conn = MagicMock() mock_execute_result = MagicMock() now = datetime.now().strftime('%Y-%m-%d %H:%M:%S') mock_execute_result.fetchall.return_value = [ {"eve_DateTime": now, "eve_EventType": "New Device", "eve_MAC": "AA:BB:CC:DD:EE:FF"} ] mock_conn.execute.return_value = mock_execute_result mock_db_conn.return_value = mock_conn response = client.get('/events/recent', headers=auth_headers(api_token)) assert response.status_code == 200 data = response.get_json() assert data["success"] is True assert data["hours"] == 24 # --- Device Alias Tests --- @patch('api_server.api_server_start.update_device_column') def test_set_device_alias(mock_update_col, client, api_token): """Test set_device_alias.""" mock_update_col.return_value = {"success": True, "message": "Device alias updated"} payload = {"alias": "New Device Name"} response = client.post('/device/AA:BB:CC:DD:EE:FF/set-alias', json=payload, headers=auth_headers(api_token)) assert response.status_code == 200 data = response.get_json() assert data["success"] is True mock_update_col.assert_called_once_with("AA:BB:CC:DD:EE:FF", "devName", "New Device Name") @patch('api_server.api_server_start.update_device_column') def test_set_device_alias_not_found(mock_update_col, client, api_token): """Test set_device_alias when device is not found.""" mock_update_col.return_value = {"success": False, "error": "Device not found"} payload = {"alias": "New Device Name"} response = client.post('/device/FF:FF:FF:FF:FF:FF/set-alias', json=payload, headers=auth_headers(api_token)) assert response.status_code == 200 data = response.get_json() assert data["success"] is False assert "Device not found" in data["error"] # --- Wake-on-LAN Tests --- @patch('api_server.api_server_start.wakeonlan') def test_wol_wake_device(mock_wakeonlan, client, api_token): """Test wol_wake_device.""" mock_wakeonlan.return_value = {"success": True, "message": "WOL packet sent to AA:BB:CC:DD:EE:FF"} payload = {"devMac": "AA:BB:CC:DD:EE:FF"} response = client.post('/nettools/wakeonlan', json=payload, headers=auth_headers(api_token)) assert response.status_code == 200 data = response.get_json() assert data["success"] is True assert "AA:BB:CC:DD:EE:FF" in data["message"] def test_wol_wake_device_invalid_mac(client, api_token): """Test wol_wake_device with invalid MAC.""" payload = {"devMac": "invalid-mac"} response = client.post('/nettools/wakeonlan', json=payload, headers=auth_headers(api_token)) assert response.status_code == 400 data = response.get_json() assert data["success"] is False # --- OpenAPI Spec Tests --- # --- Latest Device Tests --- @patch('models.device_instance.get_temp_db_connection') def test_get_latest_device(mock_db_conn, client, api_token): """Test get_latest_device endpoint.""" # Mock database connection for latest device query mock_conn = MagicMock() mock_execute_result = MagicMock() mock_execute_result.fetchone.return_value = { "devName": "Latest Device", "devMac": "AA:BB:CC:DD:EE:FF", "devLastIP": "192.168.1.100", "devFirstConnection": "2025-12-07 10:30:00" } mock_conn.execute.return_value = mock_execute_result mock_db_conn.return_value = mock_conn response = client.get('/devices/latest', headers=auth_headers(api_token)) assert response.status_code == 200 data = response.get_json() assert len(data) == 1 assert data[0]["devName"] == "Latest Device" assert data[0]["devMac"] == "AA:BB:CC:DD:EE:FF" def test_openapi_spec(client, api_token): """Test openapi_spec endpoint contains MCP tool paths.""" response = client.get('/mcp/sse/openapi.json', headers=auth_headers(api_token)) assert response.status_code == 200 spec = response.get_json() # Check for MCP tool endpoints in the spec with correct paths assert "/nettools/trigger-scan" in spec["paths"] assert "/device/open_ports" in spec["paths"] assert "/devices/network/topology" in spec["paths"] assert "/events/recent" in spec["paths"] assert "/device/{mac}/set-alias" in spec["paths"] assert "/nettools/wakeonlan" in spec["paths"]