Files
NetAlertX/test/authoritative_fields/test_field_lock_scan_integration.py
2026-01-19 11:35:47 +00:00

703 lines
20 KiB
Python

"""
Integration tests for device field locking during actual scan updates.
Simulates real-world scenarios by:
1. Setting up Devices table with various source values
2. Populating CurrentScan with new discovery data
3. Running actual device_handling scan updates
4. Verifying field updates respect authorization rules
Tests all combinations of field sources (LOCKED, USER, NEWDEV, plugin name)
with realistic scan data.
"""
import sqlite3
from unittest.mock import Mock, patch
import pytest
from server.scan import device_handling
@pytest.fixture
def scan_db():
"""Create an in-memory SQLite database with full device schema."""
conn = sqlite3.connect(":memory:")
conn.row_factory = sqlite3.Row
cur = conn.cursor()
# Create Devices table with source tracking
cur.execute(
"""
CREATE TABLE Devices (
devMac TEXT PRIMARY KEY,
devLastConnection TEXT,
devPresentLastScan INTEGER DEFAULT 0,
devLastIP TEXT,
devName TEXT,
devNameSource TEXT DEFAULT 'NEWDEV',
devVendor TEXT,
devVendorSource TEXT DEFAULT 'NEWDEV',
devLastIpSource TEXT DEFAULT 'NEWDEV',
devType TEXT,
devIcon TEXT,
devParentPort TEXT,
devParentPortSource TEXT DEFAULT 'NEWDEV',
devParentMAC TEXT,
devParentMacSource TEXT DEFAULT 'NEWDEV',
devSite TEXT,
devSiteSource TEXT DEFAULT 'NEWDEV',
devSSID TEXT,
devSsidSource TEXT DEFAULT 'NEWDEV',
devFQDN TEXT,
devFqdnSource TEXT DEFAULT 'NEWDEV',
devParentRelType TEXT,
devParentRelTypeSource TEXT DEFAULT 'NEWDEV',
devVlan TEXT,
devVlanSource TEXT DEFAULT 'NEWDEV',
devPrimaryIPv4 TEXT,
devPrimaryIPv6 TEXT
)
"""
)
# Create CurrentScan table
cur.execute(
"""
CREATE TABLE CurrentScan (
cur_MAC TEXT,
cur_IP TEXT,
cur_Vendor TEXT,
cur_ScanMethod TEXT,
cur_Name TEXT,
cur_LastQuery TEXT,
cur_DateTime TEXT,
cur_SyncHubNodeName TEXT,
cur_NetworkSite TEXT,
cur_SSID TEXT,
cur_NetworkNodeMAC TEXT,
cur_PORT TEXT,
cur_Type TEXT
)
"""
)
conn.commit()
yield conn
conn.close()
@pytest.fixture
def mock_device_handlers():
"""Mock device_handling helper functions."""
with patch.multiple(
device_handling,
update_devPresentLastScan_based_on_nics=Mock(return_value=0),
query_MAC_vendor=Mock(return_value=-1),
guess_icon=Mock(return_value="icon"),
guess_type=Mock(return_value="type"),
get_setting_value=Mock(
side_effect=lambda key: {
"NEWDEV_replace_preset_icon": 0,
"NEWDEV_devIcon": "icon",
"NEWDEV_devType": "type",
}.get(key, "")
),
):
yield
def test_scan_updates_newdev_device_name(scan_db, mock_device_handlers):
"""Scanner discovers name for device with NEWDEV source."""
cur = scan_db.cursor()
# Device with empty name (NEWDEV)
cur.execute(
"""
INSERT INTO Devices (
devMac, devLastConnection, devPresentLastScan, devLastIP,
devName, devNameSource, devVendor, devVendorSource, devLastIpSource,
devType, devIcon, devParentPort, devParentMAC, devSite, devSSID
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
"AA:BB:CC:DD:EE:01",
"2025-01-01 00:00:00",
0,
"192.168.1.1",
"", # No name yet
"NEWDEV", # Default/unset
"TestVendor",
"NEWDEV",
"ARPSCAN",
"type",
"icon",
"",
"",
"",
"",
),
)
# Scanner discovers name
cur.execute(
"""
INSERT INTO CurrentScan (
cur_MAC, cur_IP, cur_Vendor, cur_ScanMethod, cur_Name,
cur_LastQuery, cur_DateTime, cur_SyncHubNodeName,
cur_NetworkSite, cur_SSID, cur_NetworkNodeMAC, cur_PORT, cur_Type
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
"AA:BB:CC:DD:EE:01",
"192.168.1.1",
"TestVendor",
"NBTSCAN",
"DiscoveredDevice",
"",
"2025-01-01 01:00:00",
"",
"",
"",
"",
"",
"",
),
)
scan_db.commit()
db = Mock()
db.sql_connection = scan_db
db.sql = cur
# Run scan update
device_handling.update_devices_data_from_scan(db)
row = cur.execute(
"SELECT devName FROM Devices WHERE devMac = ?",
("AA:BB:CC:DD:EE:01",),
).fetchone()
# Name SHOULD be updated from NEWDEV
assert row["devName"] == "DiscoveredDevice", "Name should be updated from empty"
def test_scan_does_not_update_user_field_name(scan_db, mock_device_handlers):
"""Scanner cannot override devName when source is USER."""
cur = scan_db.cursor()
# Device with USER-edited name
cur.execute(
"""
INSERT INTO Devices (
devMac, devLastConnection, devPresentLastScan, devLastIP,
devName, devNameSource, devVendor, devVendorSource, devLastIpSource,
devType, devIcon, devParentPort, devParentMAC, devSite, devSSID
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
"AA:BB:CC:DD:EE:02",
"2025-01-01 00:00:00",
0,
"192.168.1.2",
"My Custom Device",
"USER", # User-owned
"TestVendor",
"NEWDEV",
"ARPSCAN",
"type",
"icon",
"",
"",
"",
"",
),
)
# Scanner tries to update name
cur.execute(
"""
INSERT INTO CurrentScan (
cur_MAC, cur_IP, cur_Vendor, cur_ScanMethod, cur_Name,
cur_LastQuery, cur_DateTime, cur_SyncHubNodeName,
cur_NetworkSite, cur_SSID, cur_NetworkNodeMAC, cur_PORT, cur_Type
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
"AA:BB:CC:DD:EE:02",
"192.168.1.2",
"TestVendor",
"NBTSCAN",
"ScannedDevice",
"",
"2025-01-01 01:00:00",
"",
"",
"",
"",
"",
"",
),
)
scan_db.commit()
db = Mock()
db.sql_connection = scan_db
db.sql = cur
# Run scan update
device_handling.update_devices_data_from_scan(db)
row = cur.execute(
"SELECT devName FROM Devices WHERE devMac = ?",
("AA:BB:CC:DD:EE:02",),
).fetchone()
# Name should NOT be updated because it's USER-owned
assert row["devName"] == "My Custom Device", "USER name should not be changed by scan"
def test_scan_does_not_update_locked_field(scan_db, mock_device_handlers):
"""Scanner cannot override LOCKED devName."""
cur = scan_db.cursor()
# Device with LOCKED name
cur.execute(
"""
INSERT INTO Devices (
devMac, devLastConnection, devPresentLastScan, devLastIP,
devName, devNameSource, devVendor, devVendorSource, devLastIpSource,
devType, devIcon, devParentPort, devParentMAC, devSite, devSSID
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
"AA:BB:CC:DD:EE:03",
"2025-01-01 00:00:00",
0,
"192.168.1.3",
"Important Device",
"LOCKED", # Locked
"TestVendor",
"NEWDEV",
"ARPSCAN",
"type",
"icon",
"",
"",
"",
"",
),
)
# Scanner tries to update name
cur.execute(
"""
INSERT INTO CurrentScan (
cur_MAC, cur_IP, cur_Vendor, cur_ScanMethod, cur_Name,
cur_LastQuery, cur_DateTime, cur_SyncHubNodeName,
cur_NetworkSite, cur_SSID, cur_NetworkNodeMAC, cur_PORT, cur_Type
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
"AA:BB:CC:DD:EE:03",
"192.168.1.3",
"TestVendor",
"NBTSCAN",
"Unknown",
"",
"2025-01-01 01:00:00",
"",
"",
"",
"",
"",
"",
),
)
scan_db.commit()
db = Mock()
db.sql_connection = scan_db
db.sql = cur
# Run scan update
device_handling.update_devices_data_from_scan(db)
row = cur.execute(
"SELECT devName FROM Devices WHERE devMac = ?",
("AA:BB:CC:DD:EE:03",),
).fetchone()
# Name should NOT be updated because it's LOCKED
assert row["devName"] == "Important Device", "LOCKED name should not be changed"
def test_scan_updates_empty_vendor_field(scan_db, mock_device_handlers):
"""Scan updates vendor when it's empty/NULL."""
cur = scan_db.cursor()
# Device with empty vendor
cur.execute(
"""
INSERT INTO Devices (
devMac, devLastConnection, devPresentLastScan, devLastIP,
devName, devNameSource, devVendor, devVendorSource, devLastIpSource,
devType, devIcon, devParentPort, devParentMAC, devSite, devSSID
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
"AA:BB:CC:DD:EE:04",
"2025-01-01 00:00:00",
0,
"192.168.1.4",
"Device",
"NEWDEV",
"", # Empty vendor
"NEWDEV",
"ARPSCAN",
"type",
"icon",
"",
"",
"",
"",
),
)
# Scan discovers vendor
cur.execute(
"""
INSERT INTO CurrentScan (
cur_MAC, cur_IP, cur_Vendor, cur_ScanMethod, cur_Name,
cur_LastQuery, cur_DateTime, cur_SyncHubNodeName,
cur_NetworkSite, cur_SSID, cur_NetworkNodeMAC, cur_PORT, cur_Type
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
"AA:BB:CC:DD:EE:04",
"192.168.1.4",
"Apple Inc.",
"ARPSCAN",
"",
"",
"2025-01-01 01:00:00",
"",
"",
"",
"",
"",
"",
),
)
scan_db.commit()
db = Mock()
db.sql_connection = scan_db
db.sql = cur
# Run scan update
device_handling.update_devices_data_from_scan(db)
row = cur.execute(
"SELECT devVendor FROM Devices WHERE devMac = ?",
("AA:BB:CC:DD:EE:04",),
).fetchone()
# Vendor SHOULD be updated
assert row["devVendor"] == "Apple Inc.", "Empty vendor should be populated from scan"
def test_scan_updates_ip_addresses(scan_db, mock_device_handlers):
"""Scan updates IPv4 and IPv6 addresses correctly."""
cur = scan_db.cursor()
# Device with empty IPs
cur.execute(
"""
INSERT INTO Devices (
devMac, devLastConnection, devPresentLastScan, devLastIP,
devName, devNameSource, devVendor, devVendorSource, devLastIpSource,
devType, devIcon, devParentPort, devParentMAC, devSite, devSSID,
devPrimaryIPv4, devPrimaryIPv6
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
"AA:BB:CC:DD:EE:05",
"2025-01-01 00:00:00",
0,
"",
"Device",
"NEWDEV",
"Vendor",
"NEWDEV",
"NEWDEV",
"type",
"icon",
"",
"",
"",
"",
"", # No IPv4
"", # No IPv6
),
)
# Scan discovers IPv4
cur.execute(
"""
INSERT INTO CurrentScan (
cur_MAC, cur_IP, cur_Vendor, cur_ScanMethod, cur_Name,
cur_LastQuery, cur_DateTime, cur_SyncHubNodeName,
cur_NetworkSite, cur_SSID, cur_NetworkNodeMAC, cur_PORT, cur_Type
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
"AA:BB:CC:DD:EE:05",
"192.168.1.100",
"Vendor",
"ARPSCAN",
"",
"",
"2025-01-01 01:00:00",
"",
"",
"",
"",
"",
"",
),
)
scan_db.commit()
db = Mock()
db.sql_connection = scan_db
db.sql = cur
# Run scan update
device_handling.update_devices_data_from_scan(db)
row = cur.execute(
"SELECT devLastIP, devPrimaryIPv4, devPrimaryIPv6 FROM Devices WHERE devMac = ?",
("AA:BB:CC:DD:EE:05",),
).fetchone()
# IPv4 should be set
assert row["devLastIP"] == "192.168.1.100", "Last IP should be updated"
assert row["devPrimaryIPv4"] == "192.168.1.100", "Primary IPv4 should be set"
assert row["devPrimaryIPv6"] == "", "IPv6 should remain empty"
def test_scan_updates_ipv6_without_changing_ipv4(scan_db, mock_device_handlers):
"""Scan updates IPv6 without overwriting IPv4."""
cur = scan_db.cursor()
# Device with IPv4 already set
cur.execute(
"""
INSERT INTO Devices (
devMac, devLastConnection, devPresentLastScan, devLastIP,
devName, devNameSource, devVendor, devVendorSource, devLastIpSource,
devType, devIcon, devParentPort, devParentMAC, devSite, devSSID,
devPrimaryIPv4, devPrimaryIPv6
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
"AA:BB:CC:DD:EE:06",
"2025-01-01 00:00:00",
0,
"192.168.1.101",
"Device",
"NEWDEV",
"Vendor",
"NEWDEV",
"NEWDEV",
"type",
"icon",
"",
"",
"",
"",
"192.168.1.101", # IPv4 already set
"", # No IPv6
),
)
# Scan discovers IPv6
cur.execute(
"""
INSERT INTO CurrentScan (
cur_MAC, cur_IP, cur_Vendor, cur_ScanMethod, cur_Name,
cur_LastQuery, cur_DateTime, cur_SyncHubNodeName,
cur_NetworkSite, cur_SSID, cur_NetworkNodeMAC, cur_PORT, cur_Type
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
"AA:BB:CC:DD:EE:06",
"fe80::1",
"Vendor",
"ARPSCAN",
"",
"",
"2025-01-01 01:00:00",
"",
"",
"",
"",
"",
"",
),
)
scan_db.commit()
db = Mock()
db.sql_connection = scan_db
db.sql = cur
# Run scan update
device_handling.update_devices_data_from_scan(db)
row = cur.execute(
"SELECT devPrimaryIPv4, devPrimaryIPv6 FROM Devices WHERE devMac = ?",
("AA:BB:CC:DD:EE:06",),
).fetchone()
# IPv4 should remain, IPv6 should be set
assert row["devPrimaryIPv4"] == "192.168.1.101", "IPv4 should not change"
assert row["devPrimaryIPv6"] == "fe80::1", "IPv6 should be set"
def test_scan_updates_presence_status(scan_db, mock_device_handlers):
"""Scan correctly updates devPresentLastScan status."""
cur = scan_db.cursor()
# Device not in current scan (offline)
cur.execute(
"""
INSERT INTO Devices (
devMac, devLastConnection, devPresentLastScan, devLastIP,
devName, devNameSource, devVendor, devVendorSource, devLastIpSource,
devType, devIcon, devParentPort, devParentMAC, devSite, devSSID
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
"AA:BB:CC:DD:EE:07",
"2025-01-01 00:00:00",
1, # Was online
"192.168.1.102",
"Device",
"NEWDEV",
"Vendor",
"NEWDEV",
"ARPSCAN",
"type",
"icon",
"",
"",
"",
"",
),
)
# Note: No CurrentScan entry for this MAC - device is offline
scan_db.commit()
db = Mock()
db.sql_connection = scan_db
db.sql = cur
# Run scan update
device_handling.update_devices_data_from_scan(db)
row = cur.execute(
"SELECT devPresentLastScan FROM Devices WHERE devMac = ?",
("AA:BB:CC:DD:EE:07",),
).fetchone()
# Device should be marked as offline
assert row["devPresentLastScan"] == 0, "Offline device should have devPresentLastScan = 0"
def test_scan_multiple_devices_mixed_sources(scan_db, mock_device_handlers):
"""Scan with multiple devices having different source combinations."""
cur = scan_db.cursor()
devices_data = [
# (MAC, Name, NameSource, Vendor, VendorSource)
("AA:BB:CC:DD:EE:11", "Device1", "NEWDEV", "", "NEWDEV"), # Both updatable
("AA:BB:CC:DD:EE:12", "My Device", "USER", "OldVendor", "NEWDEV"), # Name protected
("AA:BB:CC:DD:EE:13", "Locked Device", "LOCKED", "", "NEWDEV"), # Name locked
("AA:BB:CC:DD:EE:14", "Device4", "ARPSCAN", "", "NEWDEV"), # Name from plugin
]
for mac, name, name_src, vendor, vendor_src in devices_data:
cur.execute(
"""
INSERT INTO Devices (
devMac, devLastConnection, devPresentLastScan, devLastIP,
devName, devNameSource, devVendor, devVendorSource, devLastIpSource,
devType, devIcon, devParentPort, devParentMAC, devSite, devSSID
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
mac,
"2025-01-01 00:00:00",
0,
"192.168.1.1",
name,
name_src,
vendor,
vendor_src,
"ARPSCAN",
"type",
"icon",
"",
"",
"",
"",
),
)
# Scan discovers all devices with new data
scan_entries = [
("AA:BB:CC:DD:EE:11", "192.168.1.1", "Apple Inc.", "ScanPlugin", "ScannedDevice1"),
("AA:BB:CC:DD:EE:12", "192.168.1.2", "Samsung", "ScanPlugin", "ScannedDevice2"),
("AA:BB:CC:DD:EE:13", "192.168.1.3", "Sony", "ScanPlugin", "ScannedDevice3"),
("AA:BB:CC:DD:EE:14", "192.168.1.4", "LG", "ScanPlugin", "ScannedDevice4"),
]
for mac, ip, vendor, scan_method, name in scan_entries:
cur.execute(
"""
INSERT INTO CurrentScan (
cur_MAC, cur_IP, cur_Vendor, cur_ScanMethod, cur_Name,
cur_LastQuery, cur_DateTime, cur_SyncHubNodeName,
cur_NetworkSite, cur_SSID, cur_NetworkNodeMAC, cur_PORT, cur_Type
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(mac, ip, vendor, scan_method, name, "", "2025-01-01 01:00:00", "", "", "", "", "", ""),
)
scan_db.commit()
db = Mock()
db.sql_connection = scan_db
db.sql = cur
# Run scan update
device_handling.update_devices_data_from_scan(db)
# Check results
results = {
"AA:BB:CC:DD:EE:11": {"name": "Device1", "vendor": "Apple Inc."}, # Name already set, won't update
"AA:BB:CC:DD:EE:12": {"name": "My Device", "vendor": "Samsung"}, # Name protected (USER)
"AA:BB:CC:DD:EE:13": {"name": "Locked Device", "vendor": "Sony"}, # Name locked
"AA:BB:CC:DD:EE:14": {"name": "Device4", "vendor": "LG"}, # Name already from plugin, won't update
}
for mac, expected in results.items():
row = cur.execute(
"SELECT devName, devVendor FROM Devices WHERE devMac = ?",
(mac,),
).fetchone()
assert row["devName"] == expected["name"], f"Device {mac} name mismatch: got {row['devName']}, expected {expected['name']}"