diff --git a/test/authoritative_fields/test_ip_update_logic.py b/test/authoritative_fields/test_ip_update_logic.py index 5b4e3e4c..6cdb47f9 100644 --- a/test/authoritative_fields/test_ip_update_logic.py +++ b/test/authoritative_fields/test_ip_update_logic.py @@ -104,4 +104,3 @@ def test_primary_ipv4_is_set_and_ipv6_preserved(scan_db, mock_device_handling): assert row["devLastIP"] == "10.0.0.5" assert row["devPrimaryIPv4"] == "10.0.0.5" assert row["devPrimaryIPv6"] == "2001:db8::2" - diff --git a/test/server/test_presence_detection.py b/test/server/test_presence_detection.py new file mode 100644 index 00000000..2e62f60e --- /dev/null +++ b/test/server/test_presence_detection.py @@ -0,0 +1,206 @@ +import sqlite3 +from unittest.mock import Mock, patch + +import pytest + +from scan.session_events import process_scan + + +@pytest.fixture +def scan_db(): + conn = sqlite3.connect(":memory:") + conn.row_factory = sqlite3.Row + cur = conn.cursor() + + # Devices + cur.execute( + """ + CREATE TABLE Devices ( + devMac TEXT PRIMARY KEY, + devLastIP TEXT, + devPresentLastScan INTEGER, + devAlertDown INTEGER, + devAlertEvents INTEGER, + devIsArchived INTEGER DEFAULT 0 + ) + """ + ) + + # Current scan + cur.execute( + """ + CREATE TABLE CurrentScan ( + scanMac TEXT, + scanLastIP TEXT + ) + """ + ) + + # Events + cur.execute( + """ + CREATE TABLE Events ( + eve_MAC TEXT, + eve_IP TEXT, + eve_DateTime TEXT, + eve_EventType TEXT, + eve_AdditionalInfo TEXT, + eve_PendingAlertEmail INTEGER + ) + """ + ) + + conn.commit() + + db = Mock() + db.sql_connection = conn + db.sql = cur + db.commitDB = conn.commit + + def read(query): + return [dict(cur.execute(query).fetchone())] + + db.read = read + + yield db + conn.close() + + +@pytest.fixture +def minimal_patches(): + """Patch unrelated pipeline steps.""" + with patch.multiple( + "scan.session_events", # <-- target module + exclude_ignored_devices=Mock(), + save_scanned_devices=Mock(), + print_scan_stats=Mock(), + create_new_devices=Mock(), + update_devices_data_from_scan=Mock(), + update_devLastConnection_from_CurrentScan=Mock(), + update_devPresentLastScan_based_on_nics=Mock(), + update_devPresentLastScan_based_on_force_status=Mock(), + update_vendors_from_mac=Mock(), + update_ipv4_ipv6=Mock(), + update_icons_and_types=Mock(), + pair_sessions_events=Mock(), + create_sessions_snapshot=Mock(), + insertOnlineHistory=Mock(), + skip_repeated_notifications=Mock(), + update_unread_notifications_count=Mock(), + ): + yield + + +# --------------------------------------------------- +# TEST 1: Online → Offline transition +# --------------------------------------------------- + + +def test_device_goes_offline_when_missing_next_scan(scan_db, minimal_patches): + db = scan_db + cur = db.sql + + # Device initially known + cur.execute( + """ + INSERT INTO Devices VALUES + ('AA','1.1.1.1',1,1,1,0) + """ + ) + + # FIRST SCAN — device present + cur.execute("INSERT INTO CurrentScan VALUES ('AA','1.1.1.1')") + db.commitDB() + + process_scan(db) + + # Device should be online + row = cur.execute( + "SELECT devPresentLastScan FROM Devices WHERE devMac='AA'" + ).fetchone() + assert row["devPresentLastScan"] == 1 + + # SECOND SCAN — device missing + # (CurrentScan was cleared by process_scan) + process_scan(db) + + row = cur.execute( + "SELECT devPresentLastScan FROM Devices WHERE devMac='AA'" + ).fetchone() + + assert row["devPresentLastScan"] == 0 + + +# --------------------------------------------------- +# TEST 2: Device Down event created +# --------------------------------------------------- + + +def test_device_down_event_created_when_missing(scan_db, minimal_patches): + db = scan_db + cur = db.sql + + cur.execute( + """ + INSERT INTO Devices VALUES + ('BB','2.2.2.2',1,1,1,0) + """ + ) + + # No CurrentScan entry → offline + process_scan(db) + + event = cur.execute( + """ + SELECT eve_EventType + FROM Events + WHERE eve_MAC='BB' + """ + ).fetchone() + + assert event is not None + assert event["eve_EventType"] == "Device Down" + + +# --------------------------------------------------- +# TEST 3: Guards against the "forgot to clear CurrentScan" bug +# --------------------------------------------------- + + +def test_offline_detection_requires_currentscan_cleanup(scan_db, minimal_patches): + """ + This test FAILS if CurrentScan is not cleared. + """ + db = scan_db + cur = db.sql + + # Device exists + cur.execute( + """ + INSERT INTO Devices VALUES + ('CC','3.3.3.3',1,1,1,0) + """ + ) + + # First scan — device present + cur.execute("INSERT INTO CurrentScan VALUES ('CC','3.3.3.3')") + db.commitDB() + + process_scan(db) + + # Simulate bug: device not seen again BUT CurrentScan not cleared + # (reinsert old entry like stale data) + cur.execute("INSERT INTO CurrentScan VALUES ('CC','3.3.3.3')") + db.commitDB() + + process_scan(db) + + row = cur.execute( + """ + SELECT devPresentLastScan + FROM Devices WHERE devMac='CC' + """ + ).fetchone() + + # If CurrentScan works correctly, device should be offline + assert row["devPresentLastScan"] == 0