import sqlite3 from unittest.mock import Mock, patch import pytest from scan.session_events import process_scan @pytest.fixture def scan_db(): conn = sqlite3.connect(":memory:") conn.row_factory = sqlite3.Row cur = conn.cursor() # Devices cur.execute( """ CREATE TABLE Devices ( devMac TEXT PRIMARY KEY, devLastIP TEXT, devPresentLastScan INTEGER, devAlertDown INTEGER, devAlertEvents INTEGER, devIsArchived INTEGER DEFAULT 0 ) """ ) # Current scan cur.execute( """ CREATE TABLE CurrentScan ( scanMac TEXT, scanLastIP TEXT ) """ ) # Events cur.execute( """ CREATE TABLE Events ( eve_MAC TEXT, eve_IP TEXT, eve_DateTime TEXT, eve_EventType TEXT, eve_AdditionalInfo TEXT, eve_PendingAlertEmail INTEGER ) """ ) conn.commit() db = Mock() db.sql_connection = conn db.sql = cur db.commitDB = conn.commit def read(query): return [dict(cur.execute(query).fetchone())] db.read = read yield db conn.close() @pytest.fixture def minimal_patches(): """Patch unrelated pipeline steps.""" with patch.multiple( "scan.session_events", # <-- target module exclude_ignored_devices=Mock(), save_scanned_devices=Mock(), print_scan_stats=Mock(), create_new_devices=Mock(), update_devices_data_from_scan=Mock(), update_devLastConnection_from_CurrentScan=Mock(), update_devPresentLastScan_based_on_nics=Mock(), update_devPresentLastScan_based_on_force_status=Mock(), update_vendors_from_mac=Mock(), update_ipv4_ipv6=Mock(), update_icons_and_types=Mock(), pair_sessions_events=Mock(), create_sessions_snapshot=Mock(), insertOnlineHistory=Mock(), skip_repeated_notifications=Mock(), update_unread_notifications_count=Mock(), insert_events=Mock(), ): yield # --------------------------------------------------- # TEST 1: Online → Offline transition # --------------------------------------------------- def test_device_goes_offline_when_missing_next_scan(scan_db, minimal_patches): db = scan_db cur = db.sql # Device initially known cur.execute( """ INSERT INTO Devices VALUES ('AA','1.1.1.1',1,1,1,0) """ ) # FIRST SCAN — device present cur.execute("INSERT INTO CurrentScan VALUES ('AA','1.1.1.1')") db.commitDB() process_scan(db) # Device should be online row = cur.execute( "SELECT devPresentLastScan FROM Devices WHERE devMac='AA'" ).fetchone() assert row["devPresentLastScan"] == 1 # SECOND SCAN — device missing # (CurrentScan was cleared by process_scan) process_scan(db) row = cur.execute( "SELECT devPresentLastScan FROM Devices WHERE devMac='AA'" ).fetchone() assert row["devPresentLastScan"] == 0 # --------------------------------------------------- # TEST 2: Device Down event created # --------------------------------------------------- def test_device_down_event_created_when_missing(scan_db, minimal_patches): db = scan_db cur = db.sql cur.execute( """ INSERT INTO Devices VALUES ('BB','2.2.2.2',1,1,1,0) """ ) # No CurrentScan entry → offline process_scan(db) event = cur.execute( """ SELECT eve_EventType FROM Events WHERE eve_MAC='BB' """ ).fetchone() assert event is not None assert event["eve_EventType"] == "Device Down" # --------------------------------------------------- # TEST 3: Guards against the "forgot to clear CurrentScan" bug # --------------------------------------------------- def test_offline_detection_requires_currentscan_cleanup(scan_db, minimal_patches): """ This test FAILS if CurrentScan is not cleared. """ db = scan_db cur = db.sql # Device exists cur.execute( """ INSERT INTO Devices VALUES ('CC','3.3.3.3',1,1,1,0) """ ) # First scan — device present cur.execute("INSERT INTO CurrentScan VALUES ('CC','3.3.3.3')") db.commitDB() process_scan(db) # Simulate bug: device not seen again BUT CurrentScan not cleared # (reinsert old entry like stale data) cur.execute("INSERT INTO CurrentScan VALUES ('CC','3.3.3.3')") db.commitDB() process_scan(db) row = cur.execute( """ SELECT devPresentLastScan FROM Devices WHERE devMac='CC' """ ).fetchone() # If CurrentScan works correctly, device should be offline assert row["devPresentLastScan"] == 0