mirror of
https://github.com/jokob-sk/NetAlertX.git
synced 2026-04-04 09:11:34 -07:00
TEST: scan processing 6 + css fixes
Signed-off-by: jokob-sk <jokob.sk@gmail.com>
This commit is contained in:
@@ -929,6 +929,14 @@ height: 50px;
|
|||||||
.nav-tabs-custom .tab-content {
|
.nav-tabs-custom .tab-content {
|
||||||
overflow: scroll;
|
overflow: scroll;
|
||||||
}
|
}
|
||||||
|
.infobox_label
|
||||||
|
{
|
||||||
|
display: none;
|
||||||
|
}
|
||||||
|
.small-box .icon
|
||||||
|
{
|
||||||
|
display: block !important;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
.top_small_box_gray_text {
|
.top_small_box_gray_text {
|
||||||
|
|||||||
@@ -1,55 +1,25 @@
|
|||||||
from unittest.mock import Mock, patch
|
|
||||||
|
|
||||||
import pytest
|
|
||||||
|
|
||||||
from scan.session_events import process_scan
|
from scan.session_events import process_scan
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
|
||||||
def minimal_patches():
|
|
||||||
with patch.multiple(
|
|
||||||
"scan.session_events",
|
|
||||||
exclude_ignored_devices=Mock(),
|
|
||||||
save_scanned_devices=Mock(),
|
|
||||||
print_scan_stats=Mock(),
|
|
||||||
create_new_devices=Mock(),
|
|
||||||
update_devices_data_from_scan=Mock(),
|
|
||||||
update_devLastConnection_from_CurrentScan=Mock(),
|
|
||||||
update_vendors_from_mac=Mock(),
|
|
||||||
update_ipv4_ipv6=Mock(),
|
|
||||||
update_icons_and_types=Mock(),
|
|
||||||
pair_sessions_events=Mock(),
|
|
||||||
create_sessions_snapshot=Mock(),
|
|
||||||
insertOnlineHistory=Mock(),
|
|
||||||
skip_repeated_notifications=Mock(),
|
|
||||||
update_unread_notifications_count=Mock(),
|
|
||||||
# insert_events optionally mocked depending on test
|
|
||||||
):
|
|
||||||
yield
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------
|
# ---------------------------------------------------
|
||||||
# TEST 1: Online → Offline transition
|
# TEST 1: Online → Offline transition
|
||||||
# ---------------------------------------------------
|
# ---------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
def test_device_goes_offline_when_missing_next_scan(scan_db, minimal_patches):
|
def test_device_goes_offline_when_missing_next_scan(scan_db, minimal_patches):
|
||||||
db = scan_db
|
conn = scan_db
|
||||||
cur = db.sql
|
cur = conn.cursor()
|
||||||
|
|
||||||
# Device initially known
|
# Device initially known
|
||||||
cur.execute(
|
cur.execute("""
|
||||||
"""
|
INSERT INTO Devices (devMac, devLastIP, devPresentLastScan, devAlertDown, devAlertEvents, devIsArchived)
|
||||||
INSERT INTO Devices VALUES
|
VALUES ('AA','1.1.1.1',1,1,1,0)
|
||||||
('AA','1.1.1.1',1,1,1,0)
|
""")
|
||||||
"""
|
conn.commit()
|
||||||
)
|
|
||||||
|
|
||||||
# FIRST SCAN — device present
|
# FIRST SCAN — device present
|
||||||
cur.execute("INSERT INTO CurrentScan VALUES ('AA','1.1.1.1')")
|
cur.execute("INSERT INTO CurrentScan (scanMac, scanLastIP) VALUES ('AA','1.1.1.1')")
|
||||||
db.commitDB()
|
conn.commit()
|
||||||
|
|
||||||
process_scan(db)
|
process_scan(conn)
|
||||||
|
|
||||||
# Device should be online
|
# Device should be online
|
||||||
row = cur.execute(
|
row = cur.execute(
|
||||||
@@ -58,42 +28,35 @@ def test_device_goes_offline_when_missing_next_scan(scan_db, minimal_patches):
|
|||||||
assert row["devPresentLastScan"] == 1
|
assert row["devPresentLastScan"] == 1
|
||||||
|
|
||||||
# SECOND SCAN — device missing
|
# SECOND SCAN — device missing
|
||||||
# (CurrentScan was cleared by process_scan)
|
process_scan(conn)
|
||||||
process_scan(db)
|
|
||||||
|
|
||||||
row = cur.execute(
|
row = cur.execute(
|
||||||
"SELECT devPresentLastScan FROM Devices WHERE devMac='AA'"
|
"SELECT devPresentLastScan FROM Devices WHERE devMac='AA'"
|
||||||
).fetchone()
|
).fetchone()
|
||||||
|
|
||||||
assert row["devPresentLastScan"] == 0
|
assert row["devPresentLastScan"] == 0
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------
|
# ---------------------------------------------------
|
||||||
# TEST 2: Device Down event created
|
# TEST 2: Device Down event created
|
||||||
# ---------------------------------------------------
|
# ---------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
def test_device_down_event_created_when_missing(scan_db, minimal_patches):
|
def test_device_down_event_created_when_missing(scan_db, minimal_patches):
|
||||||
db = scan_db
|
conn = scan_db
|
||||||
cur = db.sql
|
cur = conn.cursor()
|
||||||
|
|
||||||
cur.execute(
|
cur.execute("""
|
||||||
"""
|
INSERT INTO Devices (devMac, devLastIP, devPresentLastScan, devAlertDown, devAlertEvents, devIsArchived)
|
||||||
INSERT INTO Devices VALUES
|
VALUES ('BB','2.2.2.2',1,1,1,0)
|
||||||
('BB','2.2.2.2',1,1,1,0)
|
""")
|
||||||
"""
|
conn.commit()
|
||||||
)
|
|
||||||
|
|
||||||
# No CurrentScan entry → offline
|
# No CurrentScan entry → offline
|
||||||
process_scan(db)
|
process_scan(conn)
|
||||||
|
|
||||||
event = cur.execute(
|
event = cur.execute("""
|
||||||
"""
|
SELECT eve_EventType
|
||||||
SELECT eve_EventType
|
FROM Events
|
||||||
FROM Events
|
WHERE eve_MAC='BB'
|
||||||
WHERE eve_MAC='BB'
|
""").fetchone()
|
||||||
"""
|
|
||||||
).fetchone()
|
|
||||||
|
|
||||||
assert event is not None
|
assert event is not None
|
||||||
assert event["eve_EventType"] == "Device Down"
|
assert event["eve_EventType"] == "Device Down"
|
||||||
@@ -102,42 +65,32 @@ def test_device_down_event_created_when_missing(scan_db, minimal_patches):
|
|||||||
# ---------------------------------------------------
|
# ---------------------------------------------------
|
||||||
# TEST 3: Guards against the "forgot to clear CurrentScan" bug
|
# TEST 3: Guards against the "forgot to clear CurrentScan" bug
|
||||||
# ---------------------------------------------------
|
# ---------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
def test_offline_detection_requires_currentscan_cleanup(scan_db, minimal_patches):
|
def test_offline_detection_requires_currentscan_cleanup(scan_db, minimal_patches):
|
||||||
"""
|
conn = scan_db
|
||||||
This test FAILS if CurrentScan is not cleared.
|
cur = conn.cursor()
|
||||||
"""
|
|
||||||
db = scan_db
|
|
||||||
cur = db.sql
|
|
||||||
|
|
||||||
# Device exists
|
# Device exists
|
||||||
cur.execute(
|
cur.execute("""
|
||||||
"""
|
INSERT INTO Devices (devMac, devLastIP, devPresentLastScan, devAlertDown, devAlertEvents, devIsArchived)
|
||||||
INSERT INTO Devices VALUES
|
VALUES ('CC','3.3.3.3',1,1,1,0)
|
||||||
('CC','3.3.3.3',1,1,1,0)
|
""")
|
||||||
"""
|
conn.commit()
|
||||||
)
|
|
||||||
|
|
||||||
# First scan — device present
|
# First scan — device present
|
||||||
cur.execute("INSERT INTO CurrentScan VALUES ('CC','3.3.3.3')")
|
cur.execute("INSERT INTO CurrentScan (scanMac, scanLastIP) VALUES ('CC','3.3.3.3')")
|
||||||
db.commitDB()
|
conn.commit()
|
||||||
|
|
||||||
process_scan(db)
|
process_scan(conn)
|
||||||
|
|
||||||
# Simulate bug: device not seen again BUT CurrentScan not cleared
|
# Simulate bug: device not seen again BUT CurrentScan not cleared
|
||||||
# (reinsert old entry like stale data)
|
cur.execute("INSERT INTO CurrentScan (scanMac, scanLastIP) VALUES ('CC','3.3.3.3')")
|
||||||
cur.execute("INSERT INTO CurrentScan VALUES ('CC','3.3.3.3')")
|
conn.commit()
|
||||||
db.commitDB()
|
|
||||||
|
|
||||||
process_scan(db)
|
process_scan(conn)
|
||||||
|
|
||||||
row = cur.execute(
|
row = cur.execute("""
|
||||||
"""
|
SELECT devPresentLastScan
|
||||||
SELECT devPresentLastScan
|
FROM Devices WHERE devMac='CC'
|
||||||
FROM Devices WHERE devMac='CC'
|
""").fetchone()
|
||||||
"""
|
|
||||||
).fetchone()
|
|
||||||
|
|
||||||
# If CurrentScan works correctly, device should be offline
|
|
||||||
assert row["devPresentLastScan"] == 0
|
assert row["devPresentLastScan"] == 0
|
||||||
|
|||||||
Reference in New Issue
Block a user