Refactor authoritative field handling and enhance device update logic

- Updated `get_source_for_field_update_with_value` to determine source values based on new field values, including handling for empty and unknown values.
- Introduced `get_overwrite_sql_clause` to build SQL conditions for authoritative overwrite checks based on plugin settings.
- Enhanced `update_devices_data_from_scan` to utilize new authoritative settings and conditions for updating device fields.
- Added new tests for source value determination and device creation to ensure proper handling of source fields.
- Created in-memory SQLite database fixtures for testing device creation and updates.
This commit is contained in:
Jokob @NetAlertX
2026-01-22 04:33:49 +00:00
parent 422a048806
commit 49e689f022
5 changed files with 931 additions and 143 deletions

View File

@@ -2,11 +2,9 @@
Unit tests for authoritative field update handler.
"""
import pytest
from server.db.authoritative_handler import (
can_overwrite_field,
get_source_for_field_update,
get_source_for_field_update_with_value,
FIELD_SOURCE_MAP,
)
@@ -75,17 +73,52 @@ class TestCanOverwriteField:
)
class TestGetSourceForFieldUpdate:
"""Test source value determination for field updates."""
class TestGetSourceForFieldUpdateWithValue:
"""Test source value determination with value-based normalization."""
def test_user_override_sets_user_source(self):
"""User override should set USER source."""
assert get_source_for_field_update("devName", "UNIFIAPI", is_user_override=True) == "USER"
assert (
get_source_for_field_update_with_value(
"devName", "UNIFIAPI", "Device", is_user_override=True
)
== "USER"
)
def test_plugin_update_sets_plugin_prefix(self):
"""Plugin update should set plugin prefix as source."""
assert get_source_for_field_update("devName", "UNIFIAPI", is_user_override=False) == "UNIFIAPI"
assert get_source_for_field_update("devLastIP", "ARPSCAN", is_user_override=False) == "ARPSCAN"
assert (
get_source_for_field_update_with_value(
"devName", "UNIFIAPI", "Device", is_user_override=False
)
== "UNIFIAPI"
)
assert (
get_source_for_field_update_with_value(
"devLastIP", "ARPSCAN", "192.168.1.1", is_user_override=False
)
== "ARPSCAN"
)
def test_empty_or_unknown_values_return_newdev(self):
assert (
get_source_for_field_update_with_value(
"devName", "ARPSCAN", "", is_user_override=False
)
== "NEWDEV"
)
assert (
get_source_for_field_update_with_value(
"devName", "ARPSCAN", "(unknown)", is_user_override=False
)
== "NEWDEV"
)
def test_non_empty_value_sets_plugin_prefix(self):
assert (
get_source_for_field_update_with_value(
"devVendor", "ARPSCAN", "Acme", is_user_override=False
)
== "ARPSCAN"
)
class TestFieldSourceMapping:

View File

@@ -83,6 +83,34 @@ def scan_db():
"""
)
cur.execute(
"""
CREATE TABLE Events (
eve_MAC TEXT,
eve_IP TEXT,
eve_DateTime TEXT,
eve_EventType TEXT,
eve_AdditionalInfo TEXT,
eve_PendingAlertEmail INTEGER
)
"""
)
cur.execute(
"""
CREATE TABLE Sessions (
ses_MAC TEXT,
ses_IP TEXT,
ses_EventTypeConnection TEXT,
ses_DateTimeConnection TEXT,
ses_EventTypeDisconnection TEXT,
ses_DateTimeDisconnection TEXT,
ses_StillConnected INTEGER,
ses_AdditionalInfo TEXT
)
"""
)
conn.commit()
yield conn
conn.close()
@@ -109,6 +137,197 @@ def mock_device_handlers():
yield
@pytest.fixture
def scan_db_for_new_devices():
"""Create an in-memory SQLite database for create_new_devices tests."""
conn = sqlite3.connect(":memory:")
conn.row_factory = sqlite3.Row
cur = conn.cursor()
cur.execute(
"""
CREATE TABLE Devices (
devMac TEXT PRIMARY KEY,
devName TEXT,
devVendor TEXT,
devLastIP TEXT,
devPrimaryIPv4 TEXT,
devPrimaryIPv6 TEXT,
devFirstConnection TEXT,
devLastConnection TEXT,
devSyncHubNode TEXT,
devGUID TEXT,
devParentMAC TEXT,
devParentPort TEXT,
devSite TEXT,
devSSID TEXT,
devType TEXT,
devSourcePlugin TEXT,
devMacSource TEXT,
devNameSource TEXT,
devFqdnSource TEXT,
devLastIpSource TEXT,
devVendorSource TEXT,
devSsidSource TEXT,
devParentMacSource TEXT,
devParentPortSource TEXT,
devParentRelTypeSource TEXT,
devVlanSource TEXT,
devAlertEvents INTEGER,
devAlertDown INTEGER,
devPresentLastScan INTEGER,
devIsArchived INTEGER,
devIsNew INTEGER,
devSkipRepeated INTEGER,
devScan INTEGER,
devOwner TEXT,
devFavorite INTEGER,
devGroup TEXT,
devComments TEXT,
devLogEvents INTEGER,
devLocation TEXT,
devCustomProps TEXT,
devParentRelType TEXT,
devReqNicsOnline INTEGER
)
"""
)
cur.execute(
"""
CREATE TABLE CurrentScan (
cur_MAC TEXT,
cur_Name TEXT,
cur_Vendor TEXT,
cur_ScanMethod TEXT,
cur_IP TEXT,
cur_SyncHubNodeName TEXT,
cur_NetworkNodeMAC TEXT,
cur_PORT TEXT,
cur_NetworkSite TEXT,
cur_SSID TEXT,
cur_Type TEXT
)
"""
)
cur.execute(
"""
CREATE TABLE Events (
eve_MAC TEXT,
eve_IP TEXT,
eve_DateTime TEXT,
eve_EventType TEXT,
eve_AdditionalInfo TEXT,
eve_PendingAlertEmail INTEGER
)
"""
)
cur.execute(
"""
CREATE TABLE Sessions (
ses_MAC TEXT,
ses_IP TEXT,
ses_EventTypeConnection TEXT,
ses_DateTimeConnection TEXT,
ses_EventTypeDisconnection TEXT,
ses_DateTimeDisconnection TEXT,
ses_StillConnected INTEGER,
ses_AdditionalInfo TEXT
)
"""
)
conn.commit()
yield conn
conn.close()
def test_create_new_devices_sets_sources(scan_db_for_new_devices):
"""New device insert initializes source fields from scan method."""
cur = scan_db_for_new_devices.cursor()
cur.execute(
"""
INSERT INTO CurrentScan (
cur_MAC, cur_Name, cur_Vendor, cur_ScanMethod, cur_IP,
cur_SyncHubNodeName, cur_NetworkNodeMAC, cur_PORT,
cur_NetworkSite, cur_SSID, cur_Type
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
"AA:BB:CC:DD:EE:10",
"DeviceOne",
"AcmeVendor",
"ARPSCAN",
"192.168.1.10",
"",
"11:22:33:44:55:66",
"1",
"",
"MyWifi",
"",
),
)
scan_db_for_new_devices.commit()
settings = {
"NEWDEV_devType": "default-type",
"NEWDEV_devParentMAC": "FF:FF:FF:FF:FF:FF",
"NEWDEV_devOwner": "owner",
"NEWDEV_devGroup": "group",
"NEWDEV_devComments": "",
"NEWDEV_devLocation": "",
"NEWDEV_devCustomProps": "",
"NEWDEV_devParentRelType": "uplink",
"SYNC_node_name": "SYNCNODE",
}
def get_setting_value_side_effect(key):
return settings.get(key, "")
db = Mock()
db.sql_connection = scan_db_for_new_devices
db.sql = cur
db.commitDB = scan_db_for_new_devices.commit
with patch.multiple(
device_handling,
get_setting_value=Mock(side_effect=get_setting_value_side_effect),
safe_int=Mock(return_value=0),
):
device_handling.create_new_devices(db)
row = cur.execute(
"""
SELECT
devMacSource,
devNameSource,
devVendorSource,
devLastIpSource,
devSsidSource,
devParentMacSource,
devParentPortSource,
devParentRelTypeSource,
devFqdnSource,
devVlanSource
FROM Devices WHERE devMac = ?
""",
("AA:BB:CC:DD:EE:10",),
).fetchone()
assert row["devMacSource"] == "ARPSCAN"
assert row["devNameSource"] == "ARPSCAN"
assert row["devVendorSource"] == "ARPSCAN"
assert row["devLastIpSource"] == "ARPSCAN"
assert row["devSsidSource"] == "ARPSCAN"
assert row["devParentMacSource"] == "ARPSCAN"
assert row["devParentPortSource"] == "ARPSCAN"
assert row["devParentRelTypeSource"] == "NEWDEV"
assert row["devFqdnSource"] == "NEWDEV"
assert row["devVlanSource"] == "NEWDEV"
def test_scan_updates_newdev_device_name(scan_db, mock_device_handlers):
"""Scanner discovers name for device with NEWDEV source."""
cur = scan_db.cursor()