This commit is contained in:
Adam Outler
2025-11-28 21:13:20 +00:00
parent 541b932b6d
commit 5e4ad10fe0
4 changed files with 128 additions and 107 deletions

View File

@@ -2,7 +2,6 @@ import sys
import os
import pytest
from unittest.mock import patch, MagicMock
import subprocess
INSTALL_PATH = os.getenv('NETALERTX_APP', '/app')
sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
@@ -10,20 +9,23 @@ sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
from helper import get_setting_value # noqa: E402
from api_server.api_server_start import app # noqa: E402
@pytest.fixture(scope="session")
def api_token():
return get_setting_value("API_TOKEN")
@pytest.fixture
def client():
with app.test_client() as client:
yield client
def auth_headers(token):
return {"Authorization": f"Bearer {token}"}
# --- get_device_info Tests ---
# --- get_device_info Tests ---
@patch('api_server.tools_routes.get_temp_db_connection')
def test_get_device_info_ip_partial(mock_db_conn, client, api_token):
"""Test get_device_info with partial IP search."""
@@ -33,53 +35,55 @@ def test_get_device_info_ip_partial(mock_db_conn, client, api_token):
{"devName": "Test Device", "devMac": "AA:BB:CC:DD:EE:FF", "devLastIP": "192.168.1.50"}
]
mock_db_conn.return_value.cursor.return_value = mock_cursor
payload = {"query": ".50"}
response = client.post('/api/tools/get_device_info',
json=payload,
response = client.post('/api/tools/get_device_info',
json=payload,
headers=auth_headers(api_token))
assert response.status_code == 200
devices = response.get_json()
assert len(devices) == 1
assert devices[0]["devLastIP"] == "192.168.1.50"
# Verify SQL query included 3 params (MAC, Name, IP)
args, _ = mock_cursor.execute.call_args
assert args[0].count("?") == 3
assert len(args[1]) == 3
# --- trigger_scan Tests ---
# --- trigger_scan Tests ---
@patch('subprocess.run')
def test_trigger_scan_nmap_fast(mock_run, client, api_token):
"""Test trigger_scan with nmap_fast."""
mock_run.return_value = MagicMock(stdout="Scan completed", returncode=0)
payload = {"scan_type": "nmap_fast", "target": "192.168.1.1"}
response = client.post('/api/tools/trigger_scan',
json=payload,
response = client.post('/api/tools/trigger_scan',
json=payload,
headers=auth_headers(api_token))
assert response.status_code == 200
data = response.get_json()
assert data["success"] is True
assert "nmap -F 192.168.1.1" in data["command"]
mock_run.assert_called_once()
@patch('subprocess.run')
def test_trigger_scan_invalid_type(mock_run, client, api_token):
"""Test trigger_scan with invalid scan_type."""
payload = {"scan_type": "invalid_type", "target": "192.168.1.1"}
response = client.post('/api/tools/trigger_scan',
json=payload,
response = client.post('/api/tools/trigger_scan',
json=payload,
headers=auth_headers(api_token))
assert response.status_code == 400
mock_run.assert_not_called()
# --- get_open_ports Tests ---
@patch('subprocess.run')
def test_get_open_ports_ip(mock_run, client, api_token):
"""Test get_open_ports with an IP address."""
@@ -94,12 +98,12 @@ PORT STATE SERVICE
Nmap done: 1 IP address (1 host up) scanned in 0.10 seconds
"""
mock_run.return_value = MagicMock(stdout=mock_output, returncode=0)
payload = {"target": "192.168.1.1"}
response = client.post('/api/tools/get_open_ports',
json=payload,
response = client.post('/api/tools/get_open_ports',
json=payload,
headers=auth_headers(api_token))
assert response.status_code == 200
data = response.get_json()
assert data["success"] is True
@@ -107,6 +111,7 @@ Nmap done: 1 IP address (1 host up) scanned in 0.10 seconds
assert data["open_ports"][0]["port"] == 22
assert data["open_ports"][1]["service"] == "http"
@patch('api_server.tools_routes.get_temp_db_connection')
@patch('subprocess.run')
def test_get_open_ports_mac_resolve(mock_run, mock_db_conn, client, api_token):
@@ -115,24 +120,24 @@ def test_get_open_ports_mac_resolve(mock_run, mock_db_conn, client, api_token):
mock_cursor = MagicMock()
mock_cursor.fetchone.return_value = {"devLastIP": "192.168.1.50"}
mock_db_conn.return_value.cursor.return_value = mock_cursor
# Mock Nmap output
mock_run.return_value = MagicMock(stdout="80/tcp open http", returncode=0)
payload = {"target": "AA:BB:CC:DD:EE:FF"}
response = client.post('/api/tools/get_open_ports',
json=payload,
response = client.post('/api/tools/get_open_ports',
json=payload,
headers=auth_headers(api_token))
assert response.status_code == 200
data = response.get_json()
assert data["target"] == "192.168.1.50" # Should be resolved IP
assert data["target"] == "192.168.1.50" # Should be resolved IP
mock_run.assert_called_once()
args, _ = mock_run.call_args
assert "192.168.1.50" in args[0]
# --- get_network_topology Tests ---
# --- get_network_topology Tests ---
@patch('api_server.tools_routes.get_temp_db_connection')
def test_get_network_topology(mock_db_conn, client, api_token):
"""Test get_network_topology."""
@@ -142,10 +147,10 @@ def test_get_network_topology(mock_db_conn, client, api_token):
{"devName": "Device1", "devMac": "BB:BB:BB:BB:BB:BB", "devParentMAC": "AA:AA:AA:AA:AA:AA", "devParentPort": "eth1", "devVendor": "VendorB"}
]
mock_db_conn.return_value.cursor.return_value = mock_cursor
response = client.get('/api/tools/get_network_topology',
response = client.get('/api/tools/get_network_topology',
headers=auth_headers(api_token))
assert response.status_code == 200
data = response.get_json()
assert len(data["nodes"]) == 2
@@ -153,8 +158,8 @@ def test_get_network_topology(mock_db_conn, client, api_token):
assert data["links"][0]["source"] == "AA:AA:AA:AA:AA:AA"
assert data["links"][0]["target"] == "BB:BB:BB:BB:BB:BB"
# --- get_recent_alerts Tests ---
# --- get_recent_alerts Tests ---
@patch('api_server.tools_routes.get_temp_db_connection')
def test_get_recent_alerts(mock_db_conn, client, api_token):
"""Test get_recent_alerts."""
@@ -163,67 +168,69 @@ def test_get_recent_alerts(mock_db_conn, client, api_token):
{"eve_DateTime": "2023-10-27 10:00:00", "eve_EventType": "New Device", "eve_MAC": "CC:CC:CC:CC:CC:CC", "eve_IP": "192.168.1.100", "devName": "Unknown"}
]
mock_db_conn.return_value.cursor.return_value = mock_cursor
payload = {"hours": 24}
response = client.post('/api/tools/get_recent_alerts',
json=payload,
response = client.post('/api/tools/get_recent_alerts',
json=payload,
headers=auth_headers(api_token))
assert response.status_code == 200
data = response.get_json()
assert len(data) == 1
assert data[0]["eve_EventType"] == "New Device"
# --- set_device_alias Tests ---
# --- set_device_alias Tests ---
@patch('api_server.tools_routes.get_temp_db_connection')
def test_set_device_alias(mock_db_conn, client, api_token):
"""Test set_device_alias."""
mock_cursor = MagicMock()
mock_cursor.rowcount = 1 # Simulate successful update
mock_cursor.rowcount = 1 # Simulate successful update
mock_db_conn.return_value.cursor.return_value = mock_cursor
payload = {"mac": "AA:BB:CC:DD:EE:FF", "alias": "New Name"}
response = client.post('/api/tools/set_device_alias',
json=payload,
response = client.post('/api/tools/set_device_alias',
json=payload,
headers=auth_headers(api_token))
assert response.status_code == 200
data = response.get_json()
assert data["success"] is True
@patch('api_server.tools_routes.get_temp_db_connection')
def test_set_device_alias_not_found(mock_db_conn, client, api_token):
"""Test set_device_alias when device is not found."""
mock_cursor = MagicMock()
mock_cursor.rowcount = 0 # Simulate no rows updated
mock_cursor.rowcount = 0 # Simulate no rows updated
mock_db_conn.return_value.cursor.return_value = mock_cursor
payload = {"mac": "AA:BB:CC:DD:EE:FF", "alias": "New Name"}
response = client.post('/api/tools/set_device_alias',
json=payload,
response = client.post('/api/tools/set_device_alias',
json=payload,
headers=auth_headers(api_token))
assert response.status_code == 404
# --- wol_wake_device Tests ---
# --- wol_wake_device Tests ---
@patch('subprocess.run')
def test_wol_wake_device(mock_subprocess, client, api_token):
"""Test wol_wake_device."""
mock_subprocess.return_value.stdout = "Sending magic packet to 255.255.255.255:9 with AA:BB:CC:DD:EE:FF"
mock_subprocess.return_value.returncode = 0
payload = {"mac": "AA:BB:CC:DD:EE:FF"}
response = client.post('/api/tools/wol_wake_device',
json=payload,
response = client.post('/api/tools/wol_wake_device',
json=payload,
headers=auth_headers(api_token))
assert response.status_code == 200
data = response.get_json()
assert data["success"] is True
mock_subprocess.assert_called_with(["wakeonlan", "AA:BB:CC:DD:EE:FF"], capture_output=True, text=True, check=True)
@patch('api_server.tools_routes.get_temp_db_connection')
@patch('subprocess.run')
def test_wol_wake_device_by_ip(mock_subprocess, mock_db_conn, client, api_token):
@@ -238,38 +245,39 @@ def test_wol_wake_device_by_ip(mock_subprocess, mock_db_conn, client, api_token)
mock_subprocess.return_value.returncode = 0
payload = {"ip": "192.168.1.50"}
response = client.post('/api/tools/wol_wake_device',
json=payload,
response = client.post('/api/tools/wol_wake_device',
json=payload,
headers=auth_headers(api_token))
assert response.status_code == 200
data = response.get_json()
assert data["success"] is True
assert "AA:BB:CC:DD:EE:FF" in data["message"]
# Verify DB lookup
mock_cursor.execute.assert_called_with("SELECT devMac FROM Devices WHERE devLastIP = ?", ("192.168.1.50",))
# Verify subprocess call
mock_subprocess.assert_called_with(["wakeonlan", "AA:BB:CC:DD:EE:FF"], capture_output=True, text=True, check=True)
def test_wol_wake_device_invalid_mac(client, api_token):
"""Test wol_wake_device with invalid MAC."""
payload = {"mac": "invalid-mac"}
response = client.post('/api/tools/wol_wake_device',
json=payload,
response = client.post('/api/tools/wol_wake_device',
json=payload,
headers=auth_headers(api_token))
assert response.status_code == 400
# --- openapi_spec Tests ---
# --- openapi_spec Tests ---
def test_openapi_spec(client):
"""Test openapi_spec endpoint contains new paths."""
response = client.get('/api/tools/openapi.json')
assert response.status_code == 200
spec = response.get_json()
# Check for new endpoints
assert "/trigger_scan" in spec["paths"]
assert "/get_open_ports" in spec["paths"]