Merge branch 'next_release' of https://github.com/netalertx/NetAlertX into next_release

This commit is contained in:
Jokob @NetAlertX
2026-03-15 01:42:23 +00:00
158 changed files with 7576 additions and 2892 deletions

View File

@@ -99,9 +99,9 @@ def test_copy_device(client, api_token, test_mac):
)
assert resp.status_code == 200
# Step 2: Generate a target MAC
# Step 2: Generate a target MAC (lowercase to match DB trigger normalisation)
target_mac = "aa:bb:cc:" + ":".join(
f"{random.randint(0, 255):02X}" for _ in range(3)
f"{random.randint(0, 255):02x}" for _ in range(3)
)
# Step 3: Copy device

View File

@@ -43,6 +43,10 @@ def create_dummy(client, api_token, test_mac):
client.post(f"/device/{test_mac}", json=payload, headers=auth_headers(api_token))
def delete_dummy(client, api_token, test_mac):
client.delete("/devices", json={"macs": [test_mac]}, headers=auth_headers(api_token))
def test_get_all_devices(client, api_token, test_mac):
# Ensure there is at least one device
create_dummy(client, api_token, test_mac)
@@ -149,53 +153,62 @@ def test_export_import_cycle_base64(client, api_token, test_mac):
def test_devices_totals(client, api_token, test_mac):
# 1. Create a dummy device
create_dummy(client, api_token, test_mac)
try:
# 1. Call the totals endpoint
resp = client.get("/devices/totals", headers=auth_headers(api_token))
assert resp.status_code == 200
# 2. Call the totals endpoint
resp = client.get("/devices/totals", headers=auth_headers(api_token))
assert resp.status_code == 200
# 2. Ensure the response is a JSON list
data = resp.json
assert isinstance(data, list)
# 3. Ensure the response is a JSON list
data = resp.json
assert isinstance(data, list)
# 3. Dynamically get expected length
conditions = get_device_conditions()
expected_length = len(conditions)
assert len(data) == expected_length
# 4. Dynamically get expected length
conditions = get_device_conditions()
expected_length = len(conditions)
assert len(data) == expected_length
# 5. Check that at least 1 device exists
assert data[0] >= 1 # 'devices' count includes the dummy device
# 4. Check that at least 1 device exists when there are any conditions
if expected_length > 0:
assert data[0] >= 1 # 'devices' count includes the dummy device
else:
# no conditions defined; data should be an empty list
assert data == []
finally:
delete_dummy(client, api_token, test_mac)
def test_devices_by_status(client, api_token, test_mac):
# 1. Create a dummy device
create_dummy(client, api_token, test_mac)
try:
# 1. Request devices by a valid status
resp = client.get("/devices/by-status?status=my", headers=auth_headers(api_token))
assert resp.status_code == 200
data = resp.json
assert isinstance(data, list)
assert any(d["id"] == test_mac for d in data)
# 2. Request devices by a valid status
resp = client.get("/devices/by-status?status=my", headers=auth_headers(api_token))
assert resp.status_code == 200
data = resp.json
assert isinstance(data, list)
assert any(d["id"] == test_mac for d in data)
# 2. Request devices with an invalid/unknown status
resp_invalid = client.get("/devices/by-status?status=invalid_status", headers=auth_headers(api_token))
# Strict validation now returns 422 for invalid status enum values
assert resp_invalid.status_code == 422
# 3. Request devices with an invalid/unknown status
resp_invalid = client.get("/devices/by-status?status=invalid_status", headers=auth_headers(api_token))
# Strict validation now returns 422 for invalid status enum values
assert resp_invalid.status_code == 422
# 3. Check favorite formatting if devFavorite = 1
# Update dummy device to favorite
update_resp = client.post(
f"/device/{test_mac}",
json={"devFavorite": 1},
headers=auth_headers(api_token)
)
assert update_resp.status_code == 200
assert update_resp.json.get("success") is True
# 4. Check favorite formatting if devFavorite = 1
# Update dummy device to favorite
client.post(
f"/device/{test_mac}",
json={"devFavorite": 1},
headers=auth_headers(api_token)
)
resp_fav = client.get("/devices/by-status?status=my", headers=auth_headers(api_token))
fav_data = next((d for d in resp_fav.json if d["id"] == test_mac), None)
assert fav_data is not None
assert "&#9733" in fav_data["title"]
resp_fav = client.get("/devices/by-status?status=my", headers=auth_headers(api_token))
fav_data = next((d for d in resp_fav.json if d["id"] == test_mac), None)
assert fav_data is not None
assert "&#9733" in fav_data["title"]
finally:
delete_dummy(client, api_token, test_mac)
def test_delete_test_devices(client, api_token):

View File

@@ -169,3 +169,26 @@ def test_graphql_post_langstrings_all_languages(client, api_token):
assert data["deStrings"]["count"] >= 1
# Ensure langCode matches
assert all(e["langCode"] == "en_us" for e in data["enStrings"]["langStrings"])
def test_graphql_langstrings_excludes_languages_json(client, api_token):
"""languages.json must never appear as a language string entry (langCode='languages')"""
query = {
"query": """
{
langStrings {
langStrings { langCode langStringKey langStringText }
count
}
}
"""
}
resp = client.post("/graphql", json=query, headers=auth_headers(api_token))
assert resp.status_code == 200
all_strings = resp.json.get("data", {}).get("langStrings", {}).get("langStrings", [])
# No entry should have langCode == "languages" (i.e. from languages.json)
polluted = [s for s in all_strings if s.get("langCode") == "languages"]
assert polluted == [], (
f"languages.json leaked into langStrings as {len(polluted)} entries; "
"graphql_endpoint.py must exclude it from the directory scan"
)

View File

@@ -0,0 +1,141 @@
"""Tests for GET /languages endpoint."""
import sys
import os
import pytest
INSTALL_PATH = os.getenv("NETALERTX_APP", "/app")
sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
from helper import get_setting_value # noqa: E402
from api_server.api_server_start import app # noqa: E402
@pytest.fixture(scope="session")
def api_token():
"""Load API token from system settings."""
return get_setting_value("API_TOKEN")
@pytest.fixture
def client():
"""Flask test client."""
with app.test_client() as client:
yield client
def auth_headers(token):
"""Helper to construct Authorization header."""
return {"Authorization": f"Bearer {token}"}
# ========================================================================
# AUTHENTICATION TESTS
# ========================================================================
def test_languages_unauthorized(client):
"""Missing token should be forbidden."""
resp = client.get("/languages")
assert resp.status_code == 403
data = resp.get_json()
assert data is not None
assert data.get("success") is False
def test_languages_invalid_token(client):
"""Invalid bearer token should be forbidden."""
resp = client.get("/languages", headers=auth_headers("INVALID-TOKEN"))
assert resp.status_code == 403
data = resp.get_json()
assert data is not None
assert data.get("success") is False
def test_languages_valid_token(client, api_token):
"""Valid token should return 200 with success=True."""
resp = client.get("/languages", headers=auth_headers(api_token))
assert resp.status_code == 200
data = resp.get_json()
assert data is not None
assert data.get("success") is True
# ========================================================================
# RESPONSE STRUCTURE TESTS
# ========================================================================
def test_languages_response_structure(client, api_token):
"""Response must contain required fields with correct types."""
resp = client.get("/languages", headers=auth_headers(api_token))
assert resp.status_code == 200
data = resp.get_json()
assert data.get("success") is True
assert isinstance(data.get("default"), str)
assert isinstance(data.get("count"), int)
assert isinstance(data.get("languages"), list)
def test_languages_default_is_en_us(client, api_token):
"""Default language must always be en_us."""
resp = client.get("/languages", headers=auth_headers(api_token))
data = resp.get_json()
assert data["default"] == "en_us"
def test_languages_count_matches_list(client, api_token):
"""count must equal len(languages)."""
resp = client.get("/languages", headers=auth_headers(api_token))
data = resp.get_json()
assert data["count"] == len(data["languages"])
def test_languages_entry_shape(client, api_token):
"""Each language entry must have 'code' and 'display' string fields."""
resp = client.get("/languages", headers=auth_headers(api_token))
data = resp.get_json()
for entry in data["languages"]:
assert "code" in entry, f"Missing 'code' in {entry}"
assert "display" in entry, f"Missing 'display' in {entry}"
assert isinstance(entry["code"], str)
assert isinstance(entry["display"], str)
# code must match pattern xx_xx
assert len(entry["code"]) == 5 and entry["code"][2] == "_", \
f"Unexpected code format: {entry['code']}"
def test_languages_includes_en_us(client, api_token):
"""en_us must always be in the language list."""
resp = client.get("/languages", headers=auth_headers(api_token))
data = resp.get_json()
codes = [l["code"] for l in data["languages"]]
assert "en_us" in codes
def test_languages_display_contains_code(client, api_token):
"""Each display name must embed its code in parentheses, e.g. 'English (en_us)'."""
resp = client.get("/languages", headers=auth_headers(api_token))
data = resp.get_json()
for entry in data["languages"]:
assert f"({entry['code']})" in entry["display"], \
f"Display '{entry['display']}' does not contain '({entry['code']})'"
def test_languages_minimum_count(client, api_token):
"""Must have at least 20 languages (the original set)."""
resp = client.get("/languages", headers=auth_headers(api_token))
data = resp.get_json()
assert data["count"] >= 20, f"Expected >=20 languages, got {data['count']}"
def test_languages_no_duplicate_codes(client, api_token):
"""Language codes must be unique."""
resp = client.get("/languages", headers=auth_headers(api_token))
data = resp.get_json()
codes = [l["code"] for l in data["languages"]]
assert len(codes) == len(set(codes)), "Duplicate language codes found"

View File

@@ -1,6 +1,7 @@
import pytest
from unittest.mock import patch, MagicMock
from datetime import datetime
import random
from api_server.api_server_start import app
from helper import get_setting_value
@@ -21,6 +22,31 @@ def auth_headers(token):
return {"Authorization": f"Bearer {token}"}
def create_dummy(client, api_token, test_mac):
payload = {
"createNew": True,
"devName": "Test Device MCP",
"devOwner": "Unit Test",
"devType": "Router",
"devVendor": "TestVendor",
}
response = client.post(f"/device/{test_mac}", json=payload, headers=auth_headers(api_token))
assert response.status_code in [200, 201], (
f"Expected status 200/201 for device creation, got {response.status_code}. "
f"Response body: {response.get_data(as_text=True)}"
)
return response
def delete_dummy(client, api_token, test_mac):
response = client.delete("/devices", json={"macs": [test_mac]}, headers=auth_headers(api_token))
assert response.status_code == 200, (
f"Expected status 200 for device deletion, got {response.status_code}. "
f"Response body: {response.get_data(as_text=True)}"
)
return response
# --- Device Search Tests ---
@@ -350,25 +376,22 @@ def test_mcp_devices_import_json(mock_db_conn, client, api_token):
# --- MCP Device Totals Tests ---
@patch("database.get_temp_db_connection")
def test_mcp_devices_totals(mock_db_conn, client, api_token):
def test_mcp_devices_totals(client, api_token):
"""Test MCP devices totals endpoint."""
mock_conn = MagicMock()
mock_sql = MagicMock()
mock_execute_result = MagicMock()
# Mock the getTotals method to return sample data
mock_execute_result.fetchone.return_value = [10, 8, 2, 0, 1, 3] # devices, connected, favorites, new, down, archived
mock_sql.execute.return_value = mock_execute_result
mock_conn.cursor.return_value = mock_sql
mock_db_conn.return_value = mock_conn
test_mac = "aa:bb:cc:" + ":".join(f"{random.randint(0, 255):02X}" for _ in range(3)).lower()
create_dummy(client, api_token, test_mac)
response = client.get("/devices/totals", headers=auth_headers(api_token))
try:
response = client.get("/devices/totals", headers=auth_headers(api_token))
assert response.status_code == 200
data = response.get_json()
# Should return device counts as array
assert isinstance(data, list)
assert len(data) >= 4 # At least online, offline, etc.
assert response.status_code == 200
data = response.get_json()
# Should return device counts as array
assert isinstance(data, list)
assert len(data) >= 4 # At least online, offline, etc.
assert data[0] >= 1
finally:
delete_dummy(client, api_token, test_mac)
# --- MCP Traceroute Tests ---

257
test/db/test_db_cleanup.py Normal file
View File

@@ -0,0 +1,257 @@
"""
Unit tests for db_cleanup plugin SQL logic.
Covers:
- Sessions trim (reuses DAYS_TO_KEEP_EVENTS window)
- ANALYZE refreshes sqlite_stat1 after bulk deletes
- PRAGMA optimize runs without error
Each test creates an isolated in-memory SQLite database so there is no
dependency on the running application or its config.
"""
import sqlite3
import os
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _make_db():
"""Return an in-memory connection seeded with the tables used by db_cleanup."""
conn = sqlite3.connect(":memory:")
cur = conn.cursor()
cur.execute("""
CREATE TABLE Events (
eve_MAC TEXT NOT NULL,
eve_IP TEXT NOT NULL,
eve_DateTime DATETIME NOT NULL,
eve_EventType TEXT NOT NULL,
eve_AdditionalInfo TEXT DEFAULT '',
eve_PendingAlertEmail INTEGER NOT NULL DEFAULT 1,
eve_PairEventRowid INTEGER
)
""")
cur.execute("""
CREATE TABLE Sessions (
ses_MAC TEXT,
ses_IP TEXT,
ses_EventTypeConnection TEXT,
ses_DateTimeConnection DATETIME,
ses_EventTypeDisconnection TEXT,
ses_DateTimeDisconnection DATETIME,
ses_StillConnected INTEGER,
ses_AdditionalInfo TEXT
)
""")
conn.commit()
return conn
def _seed_sessions(cur, old_count: int, recent_count: int, days: int):
"""
Insert `old_count` rows with connection date older than `days` days and
`recent_count` rows with connection date today.
"""
for i in range(old_count):
cur.execute(
"INSERT INTO Sessions (ses_MAC, ses_DateTimeConnection) "
"VALUES (?, date('now', ?))",
(f"AA:BB:CC:DD:EE:{i:02X}", f"-{days + 1} day"),
)
for i in range(recent_count):
cur.execute(
"INSERT INTO Sessions (ses_MAC, ses_DateTimeConnection) "
"VALUES (?, date('now'))",
(f"11:22:33:44:55:{i:02X}",),
)
def _run_sessions_trim(cur, days: int) -> int:
"""Execute the exact DELETE used by db_cleanup and return rowcount."""
cur.execute(
f"DELETE FROM Sessions "
f"WHERE ses_DateTimeConnection <= date('now', '-{days} day')"
)
return cur.rowcount
# ---------------------------------------------------------------------------
# Sessions trim tests
# ---------------------------------------------------------------------------
class TestSessionsTrim:
def test_old_rows_are_deleted(self):
"""Rows older than DAYS_TO_KEEP_EVENTS window must be removed."""
conn = _make_db()
cur = conn.cursor()
_seed_sessions(cur, old_count=10, recent_count=5, days=30)
deleted = _run_sessions_trim(cur, days=30)
assert deleted == 10, f"Expected 10 old rows deleted, got {deleted}"
cur.execute("SELECT COUNT(*) FROM Sessions")
remaining = cur.fetchone()[0]
assert remaining == 5, f"Expected 5 recent rows to survive, got {remaining}"
def test_recent_rows_are_preserved(self):
"""Rows within the retention window must not be touched."""
conn = _make_db()
cur = conn.cursor()
_seed_sessions(cur, old_count=0, recent_count=20, days=30)
deleted = _run_sessions_trim(cur, days=30)
assert deleted == 0, f"Expected 0 deletions, got {deleted}"
cur.execute("SELECT COUNT(*) FROM Sessions")
assert cur.fetchone()[0] == 20
def test_empty_table_is_a_no_op(self):
"""Trim against an empty Sessions table must not raise."""
conn = _make_db()
cur = conn.cursor()
deleted = _run_sessions_trim(cur, days=30)
assert deleted == 0
def test_trim_is_bounded_by_days_parameter(self):
"""Only rows strictly outside the window are removed; boundary row survives."""
conn = _make_db()
cur = conn.cursor()
# Row exactly AT the boundary (date = 'now' - days exactly)
cur.execute(
"INSERT INTO Sessions (ses_MAC, ses_DateTimeConnection) "
"VALUES (?, date('now', ?))",
("AA:BB:CC:00:00:01", "-30 day"),
)
# Row just inside the window
cur.execute(
"INSERT INTO Sessions (ses_MAC, ses_DateTimeConnection) "
"VALUES (?, date('now', '-29 day'))",
("AA:BB:CC:00:00:02",),
)
_run_sessions_trim(cur, days=30)
cur.execute("SELECT ses_MAC FROM Sessions")
remaining_macs = {row[0] for row in cur.fetchall()}
# Boundary row (== threshold) is deleted; inside row survives
assert "AA:BB:CC:00:00:02" in remaining_macs, "Row inside window was wrongly deleted"
def test_sessions_trim_uses_same_value_as_events(self):
"""
Regression: verify that the Sessions DELETE uses an identical day-offset
expression to the Events DELETE so the two tables stay aligned.
"""
INSTALL_PATH = os.getenv("NETALERTX_APP", "/app")
script_path = os.path.join(
INSTALL_PATH, "front", "plugins", "db_cleanup", "script.py"
)
with open(script_path) as fh:
source = fh.read()
events_expr = "DELETE FROM Events WHERE eve_DateTime <= date('now', '-{str(DAYS_TO_KEEP_EVENTS)} day')"
sessions_expr = "DELETE FROM Sessions WHERE ses_DateTimeConnection <= date('now', '-{str(DAYS_TO_KEEP_EVENTS)} day')"
assert events_expr in source, "Events DELETE expression changed unexpectedly"
assert sessions_expr in source, "Sessions DELETE is not aligned with Events DELETE"
# ---------------------------------------------------------------------------
# ANALYZE tests
# ---------------------------------------------------------------------------
class TestAnalyze:
def test_analyze_populates_sqlite_stat1(self):
"""
After ANALYZE, sqlite_stat1 must exist and have at least one row
for the Events table (which has an implicit rowid index).
"""
conn = _make_db()
cur = conn.cursor()
# Seed some rows so ANALYZE has something to measure
for i in range(20):
cur.execute(
"INSERT INTO Events (eve_MAC, eve_IP, eve_DateTime, eve_EventType) "
"VALUES (?, '1.2.3.4', date('now'), 'Connected')",
(f"AA:BB:CC:DD:EE:{i:02X}",),
)
conn.commit()
cur.execute("ANALYZE;")
conn.commit()
cur.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='sqlite_stat1'")
assert cur.fetchone() is not None, "sqlite_stat1 table not created by ANALYZE"
def test_analyze_does_not_raise_on_empty_tables(self):
"""ANALYZE against empty tables must complete without exceptions."""
conn = _make_db()
cur = conn.cursor()
# Should not raise
cur.execute("ANALYZE;")
conn.commit()
def test_analyze_is_idempotent(self):
"""Running ANALYZE twice must not raise or corrupt state."""
conn = _make_db()
cur = conn.cursor()
cur.execute("ANALYZE;")
cur.execute("ANALYZE;")
conn.commit()
# ---------------------------------------------------------------------------
# PRAGMA optimize tests
# ---------------------------------------------------------------------------
class TestPragmaOptimize:
def test_pragma_optimize_does_not_raise(self):
"""PRAGMA optimize must complete without exceptions."""
conn = _make_db()
cur = conn.cursor()
# Run ANALYZE first (as db_cleanup does) then optimize
cur.execute("ANALYZE;")
cur.execute("PRAGMA optimize;")
conn.commit()
def test_pragma_optimize_after_bulk_delete(self):
"""
PRAGMA optimize after a bulk DELETE (simulating db_cleanup) must
complete without error, validating the full tail sequence.
"""
conn = _make_db()
cur = conn.cursor()
for i in range(50):
cur.execute(
"INSERT INTO Sessions (ses_MAC, ses_DateTimeConnection) "
"VALUES (?, date('now', '-60 day'))",
(f"AA:BB:CC:DD:EE:{i:02X}",),
)
conn.commit()
# Mirror the tail sequence from cleanup_database.
# WAL checkpoints are omitted: they require no open transaction and are
# not supported on :memory: databases (SQLite raises OperationalError).
cur.execute("DELETE FROM Sessions WHERE ses_DateTimeConnection <= date('now', '-30 day')")
conn.commit()
cur.execute("ANALYZE;")
conn.execute("VACUUM;")
cur.execute("PRAGMA optimize;")
cur.execute("SELECT COUNT(*) FROM Sessions")
assert cur.fetchone()[0] == 0

View File

@@ -0,0 +1,213 @@
"""
Unit tests for the DevicesView SQL view built by ensure_views().
Regression coverage:
- NULL devAlertDown must NOT be treated as != 0 (IFNULL bug: '' vs 0).
- devCanSleep / devIsSleeping suppression within the sleep window.
- Only devices with devAlertDown = 1 AND devPresentLastScan = 0 appear in
the "Device Down" event query.
Each test uses an isolated in-memory SQLite database so it has no
dependency on the running application or config.
"""
import sys
import os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from db_test_helpers import ( # noqa: E402
make_db as _make_db,
minutes_ago as _minutes_ago,
insert_device as _insert_device,
)
# ---------------------------------------------------------------------------
# Tests: devAlertDown NULL coercion
# ---------------------------------------------------------------------------
class TestAlertDownNullCoercion:
"""
Guard against the IFNULL(devAlertDown, '') bug.
When devAlertDown IS NULL and the view uses IFNULL(..., ''), the text value
'' satisfies `!= 0` in SQLite (text > integer), causing spurious down events.
The fix is IFNULL(devAlertDown, 0) so NULL → 0, and 0 != 0 is FALSE.
"""
def test_null_alert_down_not_in_down_event_query(self):
"""A device with NULL devAlertDown must NOT appear in the down-event query."""
conn = _make_db()
cur = conn.cursor()
_insert_device(cur, "AA:BB:CC:DD:EE:01", alert_down=None, present_last_scan=0)
conn.commit()
cur.execute("""
SELECT devMac FROM DevicesView
WHERE devAlertDown != 0
AND devPresentLastScan = 0
""")
rows = cur.fetchall()
macs = [r["devMac"] for r in rows]
assert "AA:BB:CC:DD:EE:01" not in macs, (
"Device with NULL devAlertDown must not fire a down event "
"(IFNULL coercion regression)"
)
def test_zero_alert_down_not_in_down_event_query(self):
"""A device with explicit devAlertDown=0 must NOT appear."""
conn = _make_db()
cur = conn.cursor()
_insert_device(cur, "AA:BB:CC:DD:EE:02", alert_down=0, present_last_scan=0)
conn.commit()
cur.execute(
"SELECT devMac FROM DevicesView WHERE devAlertDown != 0 AND devPresentLastScan = 0"
)
macs = [r["devMac"] for r in cur.fetchall()]
assert "AA:BB:CC:DD:EE:02" not in macs
def test_one_alert_down_in_down_event_query(self):
"""A device with devAlertDown=1 and absent MUST appear in the down-event query."""
conn = _make_db()
cur = conn.cursor()
_insert_device(cur, "AA:BB:CC:DD:EE:03", alert_down=1, present_last_scan=0)
conn.commit()
cur.execute(
"SELECT devMac FROM DevicesView WHERE devAlertDown != 0 AND devPresentLastScan = 0"
)
macs = [r["devMac"] for r in cur.fetchall()]
# DevicesView returns LOWER(devMac), so compare against lowercase
assert "aa:bb:cc:dd:ee:03" in macs
def test_online_device_not_in_down_event_query(self):
"""An online device (devPresentLastScan=1) should never fire a down event."""
conn = _make_db()
cur = conn.cursor()
_insert_device(cur, "AA:BB:CC:DD:EE:04", alert_down=1, present_last_scan=1)
conn.commit()
cur.execute(
"SELECT devMac FROM DevicesView WHERE devAlertDown != 0 AND devPresentLastScan = 0"
)
macs = [r["devMac"] for r in cur.fetchall()]
assert "AA:BB:CC:DD:EE:04" not in macs
# ---------------------------------------------------------------------------
# Tests: devIsSleeping suppression
# ---------------------------------------------------------------------------
class TestIsSleepingSuppression:
"""
When devCanSleep=1 and the device has been absent for less than
NTFPRCS_sleep_time minutes, devIsSleeping must be 1 and the device
must NOT appear in the down-event query.
"""
def test_sleeping_device_is_marked_sleeping(self):
"""devCanSleep=1, absent, last seen 5 min ago → devIsSleeping=1."""
conn = _make_db(sleep_minutes=30)
cur = conn.cursor()
_insert_device(
cur, "BB:BB:BB:BB:BB:01",
alert_down=1, present_last_scan=0,
can_sleep=1, last_connection=_minutes_ago(5),
)
conn.commit()
# DevicesView returns LOWER(devMac); query must use lowercase
cur.execute("SELECT devIsSleeping FROM DevicesView WHERE devMac = 'bb:bb:bb:bb:bb:01'")
row = cur.fetchone()
assert row["devIsSleeping"] == 1
def test_sleeping_device_not_in_down_event_query(self):
"""A sleeping device must be excluded from the down-event query."""
conn = _make_db(sleep_minutes=30)
cur = conn.cursor()
_insert_device(
cur, "BB:BB:BB:BB:BB:02",
alert_down=1, present_last_scan=0,
can_sleep=1, last_connection=_minutes_ago(5),
)
conn.commit()
cur.execute("""
SELECT devMac FROM DevicesView
WHERE devAlertDown != 0
AND devIsSleeping = 0
AND devPresentLastScan = 0
""")
macs = [r["devMac"] for r in cur.fetchall()]
# DevicesView returns LOWER(devMac)
assert "bb:bb:bb:bb:bb:02" not in macs
def test_expired_sleep_window_fires_down(self):
"""After the sleep window expires, the device must appear as Down."""
conn = _make_db(sleep_minutes=30)
cur = conn.cursor()
_insert_device(
cur, "BB:BB:BB:BB:BB:03",
alert_down=1, present_last_scan=0,
can_sleep=1, last_connection=_minutes_ago(45), # > 30 min
)
conn.commit()
# DevicesView returns LOWER(devMac); query must use lowercase
cur.execute("SELECT devIsSleeping FROM DevicesView WHERE devMac = 'bb:bb:bb:bb:bb:03'")
assert cur.fetchone()["devIsSleeping"] == 0
cur.execute("""
SELECT devMac FROM DevicesView
WHERE devAlertDown != 0
AND devIsSleeping = 0
AND devPresentLastScan = 0
""")
macs = [r["devMac"] for r in cur.fetchall()]
assert "bb:bb:bb:bb:bb:03" in macs
def test_can_sleep_zero_device_is_not_sleeping(self):
"""devCanSleep=0 device recently offline → devIsSleeping must be 0."""
conn = _make_db(sleep_minutes=30)
cur = conn.cursor()
_insert_device(
cur, "BB:BB:BB:BB:BB:04",
alert_down=1, present_last_scan=0,
can_sleep=0, last_connection=_minutes_ago(5),
)
conn.commit()
# DevicesView returns LOWER(devMac); query must use lowercase
cur.execute("SELECT devIsSleeping FROM DevicesView WHERE devMac = 'bb:bb:bb:bb:bb:04'")
assert cur.fetchone()["devIsSleeping"] == 0
def test_devstatus_sleeping(self):
"""DevicesView devStatus must be 'Sleeping' for a sleeping device."""
conn = _make_db(sleep_minutes=30)
cur = conn.cursor()
_insert_device(
cur, "BB:BB:BB:BB:BB:05",
alert_down=1, present_last_scan=0,
can_sleep=1, last_connection=_minutes_ago(5),
)
conn.commit()
# DevicesView returns LOWER(devMac); query must use lowercase
cur.execute("SELECT devStatus FROM DevicesView WHERE devMac = 'bb:bb:bb:bb:bb:05'")
assert cur.fetchone()["devStatus"] == "Sleeping"
def test_devstatus_down_after_window_expires(self):
"""DevicesView devStatus must be 'Down' once the sleep window expires."""
conn = _make_db(sleep_minutes=30)
cur = conn.cursor()
_insert_device(
cur, "BB:BB:BB:BB:BB:06",
alert_down=1, present_last_scan=0,
can_sleep=1, last_connection=_minutes_ago(45),
)
conn.commit()
# DevicesView returns LOWER(devMac); query must use lowercase
cur.execute("SELECT devStatus FROM DevicesView WHERE devMac = 'bb:bb:bb:bb:bb:06'")
assert cur.fetchone()["devStatus"] == "Down"

349
test/db_test_helpers.py Normal file
View File

@@ -0,0 +1,349 @@
"""
Shared in-memory database factories and helpers for NetAlertX unit tests.
Import from any test subdirectory with:
import sys, os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from db_test_helpers import make_db, insert_device, minutes_ago, DummyDB, down_event_macs, make_device_dict, sync_insert_devices
"""
import sqlite3
import sys
import os
from datetime import datetime, timezone, timedelta
# Make the 'server' package importable when this module is loaded directly.
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "server"))
from db.db_upgrade import ensure_views # noqa: E402
# ---------------------------------------------------------------------------
# DDL
# ---------------------------------------------------------------------------
CREATE_DEVICES = """
CREATE TABLE IF NOT EXISTS Devices (
devMac TEXT PRIMARY KEY,
devName TEXT,
devOwner TEXT,
devType TEXT,
devVendor TEXT,
devFavorite INTEGER DEFAULT 0,
devGroup TEXT,
devComments TEXT,
devFirstConnection TEXT,
devLastConnection TEXT,
devLastIP TEXT,
devPrimaryIPv4 TEXT,
devPrimaryIPv6 TEXT,
devVlan TEXT,
devForceStatus TEXT,
devStaticIP TEXT,
devScan INTEGER DEFAULT 1,
devLogEvents INTEGER DEFAULT 1,
devAlertEvents INTEGER DEFAULT 1,
devAlertDown INTEGER, -- intentionally nullable
devCanSleep INTEGER DEFAULT 0,
devSkipRepeated INTEGER DEFAULT 0,
devLastNotification TEXT,
devPresentLastScan INTEGER DEFAULT 0,
devIsNew INTEGER DEFAULT 0,
devLocation TEXT,
devIsArchived INTEGER DEFAULT 0,
devParentMAC TEXT,
devParentPort TEXT,
devIcon TEXT,
devGUID TEXT,
devSite TEXT,
devSSID TEXT,
devSyncHubNode TEXT,
devSourcePlugin TEXT,
devCustomProps TEXT,
devFQDN TEXT,
devParentRelType TEXT,
devReqNicsOnline INTEGER DEFAULT 0,
devMacSource TEXT,
devNameSource TEXT,
devFQDNSource TEXT,
devLastIPSource TEXT,
devVendorSource TEXT,
devSSIDSource TEXT,
devParentMACSource TEXT,
devParentPortSource TEXT,
devParentRelTypeSource TEXT,
devVlanSource TEXT
)
"""
# Includes eve_PairEventRowid — required by insert_events().
CREATE_EVENTS = """
CREATE TABLE IF NOT EXISTS Events (
eve_MAC TEXT,
eve_IP TEXT,
eve_DateTime TEXT,
eve_EventType TEXT,
eve_AdditionalInfo TEXT,
eve_PendingAlertEmail INTEGER,
eve_PairEventRowid INTEGER
)
"""
CREATE_CURRENT_SCAN = """
CREATE TABLE IF NOT EXISTS CurrentScan (
scanMac TEXT,
scanLastIP TEXT,
scanVendor TEXT,
scanSourcePlugin TEXT,
scanName TEXT,
scanLastQuery TEXT,
scanLastConnection TEXT,
scanSyncHubNode TEXT,
scanSite TEXT,
scanSSID TEXT,
scanParentMAC TEXT,
scanParentPort TEXT,
scanType TEXT
)
"""
CREATE_SETTINGS = """
CREATE TABLE IF NOT EXISTS Settings (
setKey TEXT PRIMARY KEY,
setValue TEXT
)
"""
# ---------------------------------------------------------------------------
# DB factory
# ---------------------------------------------------------------------------
def make_db(sleep_minutes: int = 30) -> sqlite3.Connection:
"""
Return a fully seeded in-memory SQLite connection with DevicesView built.
Builds all required tables (Devices, Events, CurrentScan, Settings) and
calls ensure_views() so DevicesView is immediately queryable.
"""
conn = sqlite3.connect(":memory:")
conn.row_factory = sqlite3.Row
cur = conn.cursor()
cur.execute(CREATE_DEVICES)
cur.execute(CREATE_EVENTS)
cur.execute(CREATE_CURRENT_SCAN)
cur.execute(CREATE_SETTINGS)
cur.execute(
"INSERT OR REPLACE INTO Settings (setKey, setValue) VALUES (?, ?)",
("NTFPRCS_sleep_time", str(sleep_minutes)),
)
conn.commit()
ensure_views(cur)
conn.commit()
return conn
# ---------------------------------------------------------------------------
# Time helpers
# ---------------------------------------------------------------------------
def minutes_ago(n: int) -> str:
"""Return a UTC timestamp string for *n* minutes ago."""
dt = datetime.now(timezone.utc) - timedelta(minutes=n)
return dt.strftime("%Y-%m-%d %H:%M:%S")
def now_utc() -> str:
"""Return the current UTC timestamp as a string."""
return datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
# ---------------------------------------------------------------------------
# Device row factory
# ---------------------------------------------------------------------------
def insert_device(
cur,
mac: str,
*,
alert_down,
present_last_scan: int = 0,
can_sleep: int = 0,
last_connection: str | None = None,
last_ip: str = "192.168.1.1",
) -> None:
"""
Insert a minimal Devices row.
Parameters
----------
alert_down:
Value for devAlertDown. Pass ``None`` to store SQL NULL (tests the
IFNULL coercion regression), ``0`` for disabled, ``1`` for enabled.
present_last_scan:
``1`` = device was seen last scan (about to go down transition).
``0`` = device was already absent last scan.
can_sleep:
``1`` enables the sleeping window for this device.
last_connection:
ISO-8601 UTC string; defaults to 60 minutes ago when omitted.
last_ip:
Value stored in devLastIP.
"""
cur.execute(
"""
INSERT INTO Devices
(devMac, devAlertDown, devPresentLastScan, devCanSleep,
devLastConnection, devLastIP, devIsArchived, devIsNew)
VALUES (?, ?, ?, ?, ?, ?, 0, 0)
""",
(mac, alert_down, present_last_scan, can_sleep,
last_connection or minutes_ago(60), last_ip),
)
def make_device_dict(mac: str = "aa:bb:cc:dd:ee:ff", **overrides) -> dict:
"""
Return a fully-populated Devices row dict with safe defaults.
Mirrors every column in CREATE_DEVICES so callers can be inserted
directly via sync_insert_devices() or similar helpers. Pass keyword
arguments to override any individual field.
Computed/view-only columns (devStatus, devIsSleeping, devFlapping,
rowid, …) are intentionally absent — tests that need to verify they are
dropped should add them after calling this function.
"""
base = {
"devMac": mac,
"devName": "Test Device",
"devOwner": "",
"devType": "",
"devVendor": "Acme",
"devFavorite": 0,
"devGroup": "",
"devComments": "",
"devFirstConnection": "2024-01-01 00:00:00",
"devLastConnection": "2024-01-02 00:00:00",
"devLastIP": "192.168.1.10",
"devPrimaryIPv4": "192.168.1.10",
"devPrimaryIPv6": "",
"devVlan": "",
"devForceStatus": "",
"devStaticIP": "",
"devScan": 1,
"devLogEvents": 1,
"devAlertEvents": 1,
"devAlertDown": 1,
"devCanSleep": 0,
"devSkipRepeated": 0,
"devLastNotification": "",
"devPresentLastScan": 1,
"devIsNew": 0,
"devLocation": "",
"devIsArchived": 0,
"devParentMAC": "",
"devParentPort": "",
"devIcon": "",
"devGUID": "test-guid-1",
"devSite": "",
"devSSID": "",
"devSyncHubNode": "node1",
"devSourcePlugin": "",
"devCustomProps": "",
"devFQDN": "",
"devParentRelType": "",
"devReqNicsOnline": 0,
"devMacSource": "",
"devNameSource": "",
"devFQDNSource": "",
"devLastIPSource": "",
"devVendorSource": "",
"devSSIDSource": "",
"devParentMACSource": "",
"devParentPortSource": "",
"devParentRelTypeSource": "",
"devVlanSource": "",
}
base.update(overrides)
return base
# ---------------------------------------------------------------------------
# Sync insert helper (shared by test/plugins/test_sync_insert.py and
# test/plugins/test_sync_protocol.py — mirrors sync.py's insert block)
# ---------------------------------------------------------------------------
def sync_insert_devices(
conn: sqlite3.Connection,
device_data: list,
existing_macs: set | None = None,
) -> int:
"""
Schema-aware device INSERT mirroring sync.py's Mode-3 insert block.
Parameters
----------
conn:
In-memory (or real) SQLite connection with a Devices table.
device_data:
List of device dicts as received from table_devices.json or a node log.
existing_macs:
Set of MAC addresses already present in Devices. Rows whose devMac is
in this set are skipped. Pass ``None`` (default) to insert everything.
Returns the number of rows actually inserted.
"""
if not device_data:
return 0
cursor = conn.cursor()
candidates = (
[d for d in device_data if d["devMac"] not in existing_macs]
if existing_macs is not None
else list(device_data)
)
if not candidates:
return 0
cursor.execute("PRAGMA table_info(Devices)")
db_columns = {row[1] for row in cursor.fetchall()}
insert_cols = [k for k in candidates[0].keys() if k in db_columns]
columns = ", ".join(insert_cols)
placeholders = ", ".join("?" for _ in insert_cols)
sql = f"INSERT INTO Devices ({columns}) VALUES ({placeholders})"
values = [tuple(d.get(col) for col in insert_cols) for d in candidates]
cursor.executemany(sql, values)
conn.commit()
return len(values)
# ---------------------------------------------------------------------------
# Assertion helpers
# ---------------------------------------------------------------------------
def down_event_macs(cur) -> set:
"""Return the set of MACs that have a 'Device Down' event row (lowercased)."""
cur.execute("SELECT eve_MAC FROM Events WHERE eve_EventType = 'Device Down'")
return {r["eve_MAC"].lower() for r in cur.fetchall()}
# ---------------------------------------------------------------------------
# DummyDB — minimal wrapper used by scan.session_events helpers
# ---------------------------------------------------------------------------
class DummyDB:
"""
Minimal DB wrapper that satisfies the interface expected by
``session_events.insert_events()`` and related helpers.
"""
def __init__(self, conn: sqlite3.Connection):
self.sql = conn.cursor()
self._conn = conn
def commitDB(self) -> None:
self._conn.commit()

View File

@@ -317,14 +317,18 @@ def _select_custom_ports(exclude: set[int] | None = None) -> int:
raise RuntimeError("Unable to locate a free high port for compose testing")
def _make_port_check_hook(ports: tuple[int, ...]) -> Callable[[], None]:
def _make_port_check_hook(
ports: tuple[int, ...],
settle_wait_seconds: int = COMPOSE_SETTLE_WAIT_SECONDS,
port_wait_timeout: int = COMPOSE_PORT_WAIT_TIMEOUT,
) -> Callable[[], None]:
"""Return a callback that waits for the provided ports to accept TCP connections."""
def _hook() -> None:
for port in ports:
LAST_PORT_SUCCESSES.pop(port, None)
time.sleep(COMPOSE_SETTLE_WAIT_SECONDS)
_wait_for_ports(ports, timeout=COMPOSE_PORT_WAIT_TIMEOUT)
time.sleep(settle_wait_seconds)
_wait_for_ports(ports, timeout=port_wait_timeout)
return _hook
@@ -344,6 +348,7 @@ def _write_normal_startup_compose(
service_env = service.setdefault("environment", {})
service_env.setdefault("NETALERTX_CHECK_ONLY", "1")
service_env.setdefault("SKIP_STARTUP_CHECKS", "host optimization")
if env_overrides:
service_env.update(env_overrides)
@@ -852,12 +857,18 @@ def test_normal_startup_no_warnings_compose(tmp_path: pathlib.Path) -> None:
default_project = "netalertx-normal-default"
default_compose_file = _write_normal_startup_compose(default_dir, default_project, default_env_overrides)
port_check_timeout = 20
settle_wait_seconds = 2
default_result = _run_docker_compose(
default_compose_file,
default_project,
timeout=8,
detached=True,
post_up=_make_port_check_hook(default_ports),
post_up=_make_port_check_hook(
default_ports,
settle_wait_seconds=settle_wait_seconds,
port_wait_timeout=port_check_timeout,
),
)
# MANDATORY LOGGING - DO NOT REMOVE (see file header for reasoning)
print("\n[compose output default]", default_result.output)
@@ -885,9 +896,14 @@ def test_normal_startup_no_warnings_compose(tmp_path: pathlib.Path) -> None:
f"Unexpected mount row values for /data: {data_parts[2:4]}"
)
allowed_warning = "⚠️ WARNING: ARP flux sysctls are not set."
assert "Write permission denied" not in default_output
assert "CRITICAL" not in default_output
assert "⚠️" not in default_output
assert all(
"⚠️" not in line or allowed_warning in line
for line in default_output.splitlines()
), "Unexpected warning found in default output"
custom_http = _select_custom_ports({default_http_port})
custom_graphql = _select_custom_ports({default_http_port, custom_http})
@@ -913,7 +929,11 @@ def test_normal_startup_no_warnings_compose(tmp_path: pathlib.Path) -> None:
custom_project,
timeout=8,
detached=True,
post_up=_make_port_check_hook(custom_ports),
post_up=_make_port_check_hook(
custom_ports,
settle_wait_seconds=settle_wait_seconds,
port_wait_timeout=port_check_timeout,
),
)
print("\n[compose output custom]", custom_result.output)
custom_output = _assert_ports_ready(custom_result, custom_project, custom_ports)
@@ -922,8 +942,16 @@ def test_normal_startup_no_warnings_compose(tmp_path: pathlib.Path) -> None:
assert "" not in custom_output
assert "Write permission denied" not in custom_output
assert "CRITICAL" not in custom_output
assert "⚠️" not in custom_output
lowered_custom = custom_output.lower()
assert all(
"⚠️" not in line or allowed_warning in line
for line in custom_output.splitlines()
), "Unexpected warning found in custom output"
custom_output_without_allowed_warning = "\n".join(
line
for line in custom_output.splitlines()
if allowed_warning.lower() not in line.lower()
)
lowered_custom = custom_output_without_allowed_warning.lower()
assert "arning" not in lowered_custom
assert "rror" not in lowered_custom

View File

@@ -8,6 +8,7 @@ such as environment variable settings and check skipping.
import subprocess
import uuid
import pytest
import shutil
IMAGE = "netalertx-test"
@@ -85,8 +86,49 @@ def test_no_app_conf_override_when_no_graphql_port():
def test_skip_startup_checks_env_var():
# If SKIP_STARTUP_CHECKS contains the human-readable name of a check (e.g. "mandatory folders"),
# the entrypoint should skip that specific check. We check that the "Creating NetAlertX log directory."
# the entrypoint should skip that specific check. We check that the "Creating NetAlertX log directory."
# message (from the mandatory folders check) is not printed when skipped.
result = _run_entrypoint(env={"SKIP_STARTUP_CHECKS": "mandatory folders"}, check_only=True)
assert "Creating NetAlertX log directory" not in result.stdout
assert result.returncode == 0
@pytest.mark.docker
@pytest.mark.feature_complete
def test_host_optimization_warning_matches_sysctl():
"""Validate host-optimization warning matches actual host sysctl values."""
sysctl_bin = shutil.which("sysctl")
if not sysctl_bin:
pytest.skip("sysctl binary not found on host; skipping host-optimization warning check")
ignore_proc = subprocess.run(
[sysctl_bin, "-n", "net.ipv4.conf.all.arp_ignore"],
capture_output=True,
text=True,
check=False,
timeout=10,
)
announce_proc = subprocess.run(
[sysctl_bin, "-n", "net.ipv4.conf.all.arp_announce"],
capture_output=True,
text=True,
check=False,
timeout=10,
)
if ignore_proc.returncode != 0 or announce_proc.returncode != 0:
pytest.skip("sysctl values unavailable on host; skipping host-optimization warning check")
arp_ignore = ignore_proc.stdout.strip()
arp_announce = announce_proc.stdout.strip()
expected_warning = not (arp_ignore == "1" and arp_announce == "2")
result = _run_entrypoint(check_only=True)
combined_output = result.stdout + result.stderr
warning_present = "WARNING: ARP flux sysctls are not set." in combined_output
assert warning_present == expected_warning, (
"host-optimization warning mismatch: "
f"arp_ignore={arp_ignore}, arp_announce={arp_announce}, "
f"expected_warning={expected_warning}, warning_present={warning_present}"
)

0
test/plugins/__init__.py Normal file
View File

View File

@@ -0,0 +1,130 @@
"""
Tests for the SYNC plugin's schema-aware device insert logic.
The core invariant: only columns that actually exist in the Devices table
are included in the INSERT statement. Computed/virtual fields (devStatus,
devIsSleeping, devFlapping) and unknown future columns must be silently
dropped — never cause an OperationalError.
"""
import sys
import os
import pytest
# Ensure shared helpers and server code are importable.
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", ".."))
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "server"))
from db_test_helpers import make_db, make_device_dict, sync_insert_devices # noqa: E402
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture
def conn():
"""Fresh in-memory DB with the Devices table and all views."""
return make_db()
class TestSyncInsertSchemaAware:
def test_clean_device_inserts_successfully(self, conn):
"""Happy path: a well-formed device dict inserts without error."""
device = make_device_dict()
inserted = sync_insert_devices(conn, [device])
assert inserted == 1
cur = conn.cursor()
cur.execute("SELECT devMac FROM Devices WHERE devMac = ?", (device["devMac"],))
row = cur.fetchone()
assert row is not None
def test_computed_devStatus_is_silently_dropped(self, conn):
"""devStatus is a computed view column — must NOT raise OperationalError."""
device = make_device_dict()
device["devStatus"] = "Online" # computed in DevicesView, not in Devices table
# Pre-fix this would raise: sqlite3.OperationalError: table Devices has no column named devStatus
inserted = sync_insert_devices(conn, [device])
assert inserted == 1
def test_computed_devIsSleeping_is_silently_dropped(self, conn):
"""devIsSleeping is a CTE/view column — must NOT raise OperationalError."""
device = make_device_dict()
device["devIsSleeping"] = 0 # the exact field that triggered the original bug report
inserted = sync_insert_devices(conn, [device])
assert inserted == 1
def test_computed_devFlapping_is_silently_dropped(self, conn):
"""devFlapping is also computed in the view."""
device = make_device_dict()
device["devFlapping"] = 0
inserted = sync_insert_devices(conn, [device])
assert inserted == 1
def test_rowid_is_silently_dropped(self, conn):
"""rowid must never appear in an INSERT column list."""
device = make_device_dict()
device["rowid"] = 42
inserted = sync_insert_devices(conn, [device])
assert inserted == 1
def test_all_computed_fields_at_once(self, conn):
"""All known computed/virtual columns together — none should abort the insert."""
device = make_device_dict()
device["rowid"] = 99
device["devStatus"] = "Online"
device["devIsSleeping"] = 0
device["devFlapping"] = 0
device["totally_unknown_future_column"] = "ignored"
inserted = sync_insert_devices(conn, [device])
assert inserted == 1
def test_batch_insert_multiple_devices(self, conn):
"""Multiple devices with computed fields all insert correctly."""
devices = []
for i in range(3):
d = make_device_dict(mac=f"aa:bb:cc:dd:ee:{i:02x}")
d["devGUID"] = f"guid-{i}"
d["devStatus"] = "Online" # computed
d["devIsSleeping"] = 0 # computed
devices.append(d)
inserted = sync_insert_devices(conn, devices)
assert inserted == len(devices)
def test_values_aligned_with_columns_after_filtering(self, conn):
"""Values must be extracted in the same order as insert_cols (alignment bug guard)."""
device = make_device_dict()
device["devStatus"] = "SHOULD_BE_DROPPED"
device["devIsSleeping"] = 999
sync_insert_devices(conn, [device])
cur = conn.cursor()
cur.execute("SELECT devName, devVendor, devLastIP FROM Devices WHERE devMac = ?", (device["devMac"],))
row = cur.fetchone()
assert row["devName"] == "Test Device"
assert row["devVendor"] == "Acme"
assert row["devLastIP"] == "192.168.1.10"
def test_unknown_column_does_not_prevent_insert(self, conn):
"""A column that was added on the node but doesn't exist on the hub is dropped."""
device = make_device_dict()
device["devNewFeatureOnlyOnNode"] = "some_value"
# Must not raise — hub schema wins
inserted = sync_insert_devices(conn, [device])
assert inserted == 1
def test_empty_device_list_returns_zero(self, conn):
"""Edge case: empty list should not raise and should return 0."""
inserted = sync_insert_devices(conn, [])
assert inserted == 0

View File

@@ -0,0 +1,413 @@
"""
Tests for SYNC plugin push/pull/receive behaviour.
Three modes exercised:
Mode 1 PUSH (NODE): send_data() POSTs encrypted device data to the hub.
Mode 2 PULL (HUB): get_data() GETs a base64 JSON blob from each node.
Mode 3 RECEIVE: hub parses decoded log files and upserts devices into DB.
sync.py is intentionally NOT imported here — its module-level code has side
effects (reads live config, initialises logging). Instead, the pure logic
under test is extracted into thin local mirrors that match the production
implementation exactly, so any divergence will surface as a test failure.
"""
import base64
import json
import os
import sys
from unittest.mock import MagicMock, patch
import pytest
import requests
# Make shared helpers + server packages importable from test/plugins/
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", ".."))
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "server"))
from db_test_helpers import make_db, make_device_dict, sync_insert_devices # noqa: E402
from utils.crypto_utils import encrypt_data, decrypt_data # noqa: E402
# ---------------------------------------------------------------------------
# Local mirrors of sync.py logic (no module-level side-effects on import)
# ---------------------------------------------------------------------------
API_ENDPOINT = "/sync"
def _send_data(api_token, file_content, encryption_key, file_path, node_name, pref, hub_url):
"""Mirror of sync.send_data() — returns True on HTTP 200, False otherwise."""
encrypted_data = encrypt_data(file_content, encryption_key)
data = {
"data": encrypted_data,
"file_path": file_path,
"plugin": pref,
"node_name": node_name,
}
headers = {"Authorization": f"Bearer {api_token}"}
try:
response = requests.post(hub_url + API_ENDPOINT, data=data, headers=headers, timeout=5)
return response.status_code == 200
except requests.RequestException:
return False
def _get_data(api_token, node_url):
"""Mirror of sync.get_data() — returns parsed JSON dict or '' on any failure."""
headers = {"Authorization": f"Bearer {api_token}"}
try:
response = requests.get(node_url + API_ENDPOINT, headers=headers, timeout=5)
if response.status_code == 200:
try:
return response.json()
except json.JSONDecodeError:
pass
except requests.RequestException:
pass
return ""
def _node_name_from_filename(file_name: str) -> str:
"""Mirror of the node-name extraction in sync.main()."""
parts = file_name.split(".")
return parts[2] if ("decoded" in file_name or "encoded" in file_name) else parts[1]
def _determine_mode(hub_url: str, send_devices: bool, plugins_to_sync: list, pull_nodes: list):
"""Mirror of the is_hub / is_node detection block in sync.main()."""
is_node = len(hub_url) > 0 and (send_devices or bool(plugins_to_sync))
is_hub = len(pull_nodes) > 0
return is_hub, is_node
def _currentscan_candidates(device_data: list[dict]) -> list[dict]:
"""
Mirror of the plugin_objects.add_object() filter in sync.main().
Only online (devPresentLastScan=1) and non-internet devices are eligible
to be written to the CurrentScan / plugin result file.
"""
return [
d for d in device_data
if d.get("devPresentLastScan") == 1 and str(d.get("devMac", "")).lower() != "internet"
]
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
ENCRYPTION_KEY = "test-secret-key"
API_TOKEN = "tok_abc123"
HUB_URL = "http://hub.local:20211"
NODE_URL = "http://node.local:20211"
@pytest.fixture
def conn():
"""Fresh in-memory DB with Devices table and all views."""
return make_db()
# ===========================================================================
# Mode detection
# ===========================================================================
class TestModeDetection:
def test_is_node_when_hub_url_and_send_devices(self):
is_hub, is_node = _determine_mode(HUB_URL, send_devices=True, plugins_to_sync=[], pull_nodes=[])
assert is_node is True
assert is_hub is False
def test_is_node_when_hub_url_and_plugins_set(self):
is_hub, is_node = _determine_mode(HUB_URL, send_devices=False, plugins_to_sync=["NMAP"], pull_nodes=[])
assert is_node is True
assert is_hub is False
def test_is_hub_when_pull_nodes_set(self):
is_hub, is_node = _determine_mode("", send_devices=False, plugins_to_sync=[], pull_nodes=[NODE_URL])
assert is_hub is True
assert is_node is False
def test_is_both_hub_and_node(self):
is_hub, is_node = _determine_mode(HUB_URL, send_devices=True, plugins_to_sync=[], pull_nodes=[NODE_URL])
assert is_hub is True
assert is_node is True
def test_neither_when_no_config(self):
is_hub, is_node = _determine_mode("", send_devices=False, plugins_to_sync=[], pull_nodes=[])
assert is_hub is False
assert is_node is False
def test_no_hub_url_means_not_node_even_with_send_devices(self):
is_hub, is_node = _determine_mode("", send_devices=True, plugins_to_sync=[], pull_nodes=[])
assert is_node is False
# ===========================================================================
# send_data (Mode 1 PUSH)
# ===========================================================================
class TestSendData:
def _mock_post(self, status_code=200):
resp = MagicMock()
resp.status_code = status_code
return patch("requests.post", return_value=resp)
def test_returns_true_on_http_200(self):
with self._mock_post(200):
result = _send_data(API_TOKEN, '{"data":[]}', ENCRYPTION_KEY,
"/tmp/file.log", "node1", "SYNC", HUB_URL)
assert result is True
def test_returns_false_on_non_200(self):
for code in (400, 401, 403, 500, 503):
with self._mock_post(code):
result = _send_data(API_TOKEN, '{"data":[]}', ENCRYPTION_KEY,
"/tmp/file.log", "node1", "SYNC", HUB_URL)
assert result is False, f"Expected False for HTTP {code}"
def test_returns_false_on_connection_error(self):
with patch("requests.post", side_effect=requests.ConnectionError("refused")):
result = _send_data(API_TOKEN, '{"data":[]}', ENCRYPTION_KEY,
"/tmp/file.log", "node1", "SYNC", HUB_URL)
assert result is False
def test_returns_false_on_timeout(self):
with patch("requests.post", side_effect=requests.Timeout("timed out")):
result = _send_data(API_TOKEN, '{"data":[]}', ENCRYPTION_KEY,
"/tmp/file.log", "node1", "SYNC", HUB_URL)
assert result is False
def test_posts_to_correct_endpoint(self):
resp = MagicMock()
resp.status_code = 200
with patch("requests.post", return_value=resp) as mock_post:
_send_data(API_TOKEN, '{"data":[]}', ENCRYPTION_KEY,
"/tmp/file.log", "node1", "SYNC", HUB_URL)
url_called = mock_post.call_args[0][0]
assert url_called == HUB_URL + "/sync"
def test_bearer_auth_header_sent(self):
resp = MagicMock()
resp.status_code = 200
with patch("requests.post", return_value=resp) as mock_post:
_send_data(API_TOKEN, '{"data":[]}', ENCRYPTION_KEY,
"/tmp/file.log", "node1", "SYNC", HUB_URL)
headers = mock_post.call_args[1]["headers"]
assert headers["Authorization"] == f"Bearer {API_TOKEN}"
def test_payload_contains_expected_fields(self):
resp = MagicMock()
resp.status_code = 200
with patch("requests.post", return_value=resp) as mock_post:
_send_data(API_TOKEN, '{"data":[]}', ENCRYPTION_KEY,
"/tmp/file.log", "node1", "SYNC", HUB_URL)
payload = mock_post.call_args[1]["data"]
assert "data" in payload # encrypted blob
assert payload["file_path"] == "/tmp/file.log"
assert payload["plugin"] == "SYNC"
assert payload["node_name"] == "node1"
def test_payload_data_is_encrypted_not_plaintext(self):
"""The 'data' field in the POST must be encrypted, not the raw content."""
plaintext = '{"secret": "do_not_expose"}'
resp = MagicMock()
resp.status_code = 200
with patch("requests.post", return_value=resp) as mock_post:
_send_data(API_TOKEN, plaintext, ENCRYPTION_KEY,
"/tmp/file.log", "node1", "SYNC", HUB_URL)
transmitted = mock_post.call_args[1]["data"]["data"]
assert transmitted != plaintext
# Verify it round-trips correctly
assert decrypt_data(transmitted, ENCRYPTION_KEY) == plaintext
# ===========================================================================
# get_data (Mode 2 PULL)
# ===========================================================================
class TestGetData:
def _mock_get(self, status_code=200, json_body=None, side_effect=None):
resp = MagicMock()
resp.status_code = status_code
if json_body is not None:
resp.json.return_value = json_body
if side_effect is not None:
return patch("requests.get", side_effect=side_effect)
return patch("requests.get", return_value=resp)
def test_returns_parsed_json_on_200(self):
body = {"node_name": "node1", "data_base64": base64.b64encode(b"hello").decode()}
with self._mock_get(200, json_body=body):
result = _get_data(API_TOKEN, NODE_URL)
assert result == body
def test_gets_from_correct_endpoint(self):
resp = MagicMock()
resp.status_code = 200
resp.json.return_value = {}
with patch("requests.get", return_value=resp) as mock_get:
_get_data(API_TOKEN, NODE_URL)
url_called = mock_get.call_args[0][0]
assert url_called == NODE_URL + "/sync"
def test_bearer_auth_header_sent(self):
resp = MagicMock()
resp.status_code = 200
resp.json.return_value = {}
with patch("requests.get", return_value=resp) as mock_get:
_get_data(API_TOKEN, NODE_URL)
headers = mock_get.call_args[1]["headers"]
assert headers["Authorization"] == f"Bearer {API_TOKEN}"
def test_returns_empty_string_on_json_decode_error(self):
resp = MagicMock()
resp.status_code = 200
resp.json.side_effect = json.JSONDecodeError("bad json", "", 0)
with patch("requests.get", return_value=resp):
result = _get_data(API_TOKEN, NODE_URL)
assert result == ""
def test_returns_empty_string_on_connection_error(self):
with patch("requests.get", side_effect=requests.ConnectionError("refused")):
result = _get_data(API_TOKEN, NODE_URL)
assert result == ""
def test_returns_empty_string_on_timeout(self):
with patch("requests.get", side_effect=requests.Timeout("timed out")):
result = _get_data(API_TOKEN, NODE_URL)
assert result == ""
def test_returns_empty_string_on_non_200(self):
resp = MagicMock()
resp.status_code = 401
with patch("requests.get", return_value=resp):
result = _get_data(API_TOKEN, NODE_URL)
assert result == ""
# ===========================================================================
# Node name extraction from filename (Mode 3 RECEIVE)
# ===========================================================================
class TestNodeNameExtraction:
def test_simple_filename(self):
# last_result.MyNode.log → "MyNode"
assert _node_name_from_filename("last_result.MyNode.log") == "MyNode"
def test_decoded_filename(self):
# last_result.decoded.MyNode.1.log → "MyNode"
assert _node_name_from_filename("last_result.decoded.MyNode.1.log") == "MyNode"
def test_encoded_filename(self):
# last_result.encoded.MyNode.1.log → "MyNode"
assert _node_name_from_filename("last_result.encoded.MyNode.1.log") == "MyNode"
def test_node_name_with_underscores(self):
assert _node_name_from_filename("last_result.Wladek_Site.log") == "Wladek_Site"
def test_decoded_node_name_with_underscores(self):
assert _node_name_from_filename("last_result.decoded.Wladek_Site.1.log") == "Wladek_Site"
# ===========================================================================
# CurrentScan candidates filter (Mode 3 RECEIVE)
# ===========================================================================
class TestCurrentScanCandidates:
def test_online_device_is_included(self):
d = make_device_dict(devPresentLastScan=1)
assert len(_currentscan_candidates([d])) == 1
def test_offline_device_is_excluded(self):
d = make_device_dict(devPresentLastScan=0)
assert len(_currentscan_candidates([d])) == 0
def test_internet_mac_is_excluded(self):
d = make_device_dict(mac="internet", devPresentLastScan=1)
assert len(_currentscan_candidates([d])) == 0
def test_internet_mac_case_insensitive(self):
for mac in ("INTERNET", "Internet", "iNtErNeT"):
d = make_device_dict(mac=mac, devPresentLastScan=1)
assert len(_currentscan_candidates([d])) == 0, f"mac={mac!r} should be excluded"
def test_mixed_batch(self):
devices = [
make_device_dict(mac="aa:bb:cc:dd:ee:01", devPresentLastScan=1), # included
make_device_dict(mac="aa:bb:cc:dd:ee:02", devPresentLastScan=0), # offline
make_device_dict(mac="internet", devPresentLastScan=1), # root node
make_device_dict(mac="aa:bb:cc:dd:ee:03", devPresentLastScan=1), # included
]
result = _currentscan_candidates(devices)
macs = [d["devMac"] for d in result]
assert "aa:bb:cc:dd:ee:01" in macs
assert "aa:bb:cc:dd:ee:03" in macs
assert "aa:bb:cc:dd:ee:02" not in macs
assert "internet" not in macs
# ===========================================================================
# DB insert filtering new vs existing devices (Mode 3 RECEIVE)
# ===========================================================================
class TestReceiveInsert:
def test_new_device_is_inserted(self, conn):
device = make_device_dict(mac="aa:bb:cc:dd:ee:01")
inserted = sync_insert_devices(conn, [device], existing_macs=set())
assert inserted == 1
cur = conn.cursor()
cur.execute("SELECT devMac FROM Devices WHERE devMac = ?", ("aa:bb:cc:dd:ee:01",))
assert cur.fetchone() is not None
def test_existing_device_is_not_reinserted(self, conn):
# Pre-populate Devices
cur = conn.cursor()
cur.execute(
"INSERT INTO Devices (devMac, devName) VALUES (?, ?)",
("aa:bb:cc:dd:ee:01", "Existing"),
)
conn.commit()
device = make_device_dict(mac="aa:bb:cc:dd:ee:01")
inserted = sync_insert_devices(conn, [device], existing_macs={"aa:bb:cc:dd:ee:01"})
assert inserted == 0
def test_only_new_devices_inserted_in_mixed_batch(self, conn):
cur = conn.cursor()
cur.execute(
"INSERT INTO Devices (devMac, devName) VALUES (?, ?)",
("aa:bb:cc:dd:ee:existing", "Existing"),
)
conn.commit()
devices = [
make_device_dict(mac="aa:bb:cc:dd:ee:existing"),
make_device_dict(mac="aa:bb:cc:dd:ee:new1"),
make_device_dict(mac="aa:bb:cc:dd:ee:new2"),
]
inserted = sync_insert_devices(
conn, devices, existing_macs={"aa:bb:cc:dd:ee:existing"}
)
assert inserted == 2
def test_computed_fields_in_payload_do_not_abort_insert(self, conn):
"""Regression: devIsSleeping / devStatus / devFlapping must be silently dropped."""
device = make_device_dict(mac="aa:bb:cc:dd:ee:01")
device["devIsSleeping"] = 0
device["devStatus"] = "Online"
device["devFlapping"] = 0
device["rowid"] = 99
# Must not raise OperationalError
inserted = sync_insert_devices(conn, [device], existing_macs=set())
assert inserted == 1
def test_empty_device_list_returns_zero(self, conn):
assert sync_insert_devices(conn, [], existing_macs=set()) == 0

View File

@@ -0,0 +1,446 @@
"""
Integration tests for the 'Device Down' event insertion and sleeping suppression.
Two complementary layers are tested:
Layer 1 — insert_events() (session_events.py)
Non-sleeping devices (devCanSleep=0):
The "Device Down" event fires when:
devPresentLastScan = 1 (was online last scan)
AND device NOT in CurrentScan (absent this scan)
AND devAlertDown != 0
Sleeping devices (devCanSleep=1):
The "Device Down" event is DEFERRED until the sleep window
(NTFPRCS_sleep_time) expires. During the sleep window the device
is shown as "Sleeping" and NO down event is created. After the
window expires, insert_events creates the event via the
sleep-expired query (devPresentLastScan=0, devIsSleeping=0).
Layer 2 — DevicesView down-count query (as used by insertOnlineHistory / db_helper)
After presence is updated (devPresentLastScan → 0) the sleeping suppression
(devIsSleeping=1) kicks in for count/API queries.
Tests here verify that sleeping devices are excluded from down counts and that
expired-window devices are included.
"""
import sys
import os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from db_test_helpers import ( # noqa: E402
make_db as _make_db,
minutes_ago as _minutes_ago,
insert_device as _insert_device,
down_event_macs as _down_event_macs,
DummyDB,
)
# server/ is already on sys.path after db_test_helpers import
from scan.session_events import insert_events # noqa: E402
# ---------------------------------------------------------------------------
# Layer 1: insert_events() — event creation on the down transition
#
# Non-sleeping (devCanSleep=0):
# Condition: devPresentLastScan = 1 AND not in CurrentScan → immediate event.
# Sleeping (devCanSleep=1):
# No event until sleep window expires (see TestInsertEventsSleepSuppression).
# ---------------------------------------------------------------------------
class TestInsertEventsDownDetection:
"""
Tests for the 'Device Down' INSERT in insert_events() for non-sleeping devices.
The down transition is: devPresentLastScan=1 AND absent from CurrentScan.
CurrentScan is left empty in all tests (all devices absent this scan).
"""
def test_null_alert_down_does_not_fire_down_event(self):
"""
Regression: NULL devAlertDown must NOT produce a 'Device Down' event.
Root cause: IFNULL(devAlertDown, '') made '' != 0 evaluate TRUE in SQLite,
causing devices without devAlertDown set to fire constant down events.
Fix: IFNULL(devAlertDown, 0) → 0 != 0 is FALSE.
"""
conn = _make_db()
cur = conn.cursor()
_insert_device(cur, "aa:11:22:33:44:01", alert_down=None, present_last_scan=1)
conn.commit()
insert_events(DummyDB(conn))
assert "aa:11:22:33:44:01" not in _down_event_macs(cur), (
"NULL devAlertDown must never fire a 'Device Down' event "
"(IFNULL coercion regression)"
)
def test_zero_alert_down_does_not_fire_down_event(self):
"""Explicit devAlertDown=0 must NOT fire a 'Device Down' event."""
conn = _make_db()
cur = conn.cursor()
_insert_device(cur, "aa:11:22:33:44:02", alert_down=0, present_last_scan=1)
conn.commit()
insert_events(DummyDB(conn))
assert "aa:11:22:33:44:02" not in _down_event_macs(cur)
def test_alert_down_one_fires_down_event_when_absent(self):
"""devAlertDown=1, was online last scan, absent now → 'Device Down' event."""
conn = _make_db()
cur = conn.cursor()
_insert_device(cur, "aa:11:22:33:44:03", alert_down=1, present_last_scan=1)
conn.commit()
insert_events(DummyDB(conn))
assert "aa:11:22:33:44:03" in _down_event_macs(cur)
def test_device_in_current_scan_does_not_fire_down_event(self):
"""A device present in CurrentScan (online now) must NOT get Down event."""
conn = _make_db()
cur = conn.cursor()
_insert_device(cur, "aa:11:22:33:44:04", alert_down=1, present_last_scan=1)
# Put it in CurrentScan → device is online this scan
cur.execute(
"INSERT INTO CurrentScan (scanMac, scanLastIP) VALUES (?, ?)",
("aa:11:22:33:44:04", "192.168.1.1"),
)
conn.commit()
insert_events(DummyDB(conn))
assert "aa:11:22:33:44:04" not in _down_event_macs(cur)
def test_already_absent_last_scan_does_not_re_fire(self):
"""
devPresentLastScan=0 means device was already absent last scan.
For non-sleeping devices (devCanSleep=0), the down event was already
created then; it must not be created again.
"""
conn = _make_db()
cur = conn.cursor()
_insert_device(cur, "aa:11:22:33:44:05", alert_down=1, present_last_scan=0)
conn.commit()
insert_events(DummyDB(conn))
assert "aa:11:22:33:44:05" not in _down_event_macs(cur)
def test_archived_device_does_not_fire_down_event(self):
"""Archived devices should not produce Down events."""
conn = _make_db()
cur = conn.cursor()
cur.execute(
"""INSERT INTO Devices
(devMac, devAlertDown, devPresentLastScan, devCanSleep,
devLastConnection, devLastIP, devIsArchived, devIsNew)
VALUES (?, 1, 1, 0, ?, '192.168.1.1', 1, 0)""",
("aa:11:22:33:44:06", _minutes_ago(60)),
)
conn.commit()
insert_events(DummyDB(conn))
# Archived devices have devIsArchived=1; insert_events doesn't filter
# by archived, but DevicesView applies devAlertDown — archived here is
# tested to confirm the count stays clean for future filter additions.
# The archived device DOES get a Down event today (no archive filter in
# insert_events). This test documents the current behaviour.
# If that changes, update this assertion accordingly.
assert "aa:11:22:33:44:06" in _down_event_macs(cur)
def test_multiple_devices_mixed_alert_down(self):
"""Only devices with devAlertDown=1 that are absent fire Down events."""
conn = _make_db()
cur = conn.cursor()
cases = [
("cc:00:00:00:00:01", None, 1), # NULL → no event
("cc:00:00:00:00:02", 0, 1), # 0 → no event
("cc:00:00:00:00:03", 1, 1), # 1 → event
("cc:00:00:00:00:04", 1, 0), # already absent → no event
]
for mac, alert_down, present in cases:
_insert_device(cur, mac, alert_down=alert_down, present_last_scan=present)
conn.commit()
insert_events(DummyDB(conn))
fired = _down_event_macs(cur)
assert "cc:00:00:00:00:01" not in fired, "NULL devAlertDown must not fire"
assert "cc:00:00:00:00:02" not in fired, "devAlertDown=0 must not fire"
assert "cc:00:00:00:00:03" in fired, "devAlertDown=1 absent must fire"
assert "cc:00:00:00:00:04" not in fired, "already-absent device must not fire again"
# ---------------------------------------------------------------------------
# Layer 1b: insert_events() — sleeping device suppression
#
# Sleeping devices (devCanSleep=1) must NOT get a 'Device Down' event on the
# first-scan transition. Instead, the event is deferred until the sleep
# window (NTFPRCS_sleep_time) expires.
# ---------------------------------------------------------------------------
class TestInsertEventsSleepSuppression:
"""
Tests for sleeping device suppression in insert_events().
Verifies that devCanSleep=1 devices DO NOT get immediate down events
and only get events after the sleep window expires.
"""
def test_sleeping_device_no_down_event_on_first_absence(self):
"""
devCanSleep=1, devPresentLastScan=1, absent from CurrentScan.
Sleep window has NOT expired → must NOT fire 'Device Down'.
This is the core bug fix: previously the event fired immediately.
"""
conn = _make_db(sleep_minutes=30)
cur = conn.cursor()
_insert_device(cur, "bb:00:00:00:00:01", alert_down=1, present_last_scan=1,
can_sleep=1, last_connection=_minutes_ago(1))
conn.commit()
insert_events(DummyDB(conn))
assert "bb:00:00:00:00:01" not in _down_event_macs(cur), (
"Sleeping device must NOT get 'Device Down' on first absence "
"(sleep window not expired)"
)
def test_sleeping_device_still_in_window_no_event(self):
"""
devCanSleep=1, devPresentLastScan=0, devIsSleeping=1 (within window).
Device was already absent last scan and is still sleeping.
Must NOT fire 'Device Down'.
"""
conn = _make_db(sleep_minutes=30)
cur = conn.cursor()
_insert_device(cur, "bb:00:00:00:00:02", alert_down=1, present_last_scan=0,
can_sleep=1, last_connection=_minutes_ago(10))
conn.commit()
insert_events(DummyDB(conn))
assert "bb:00:00:00:00:02" not in _down_event_macs(cur), (
"Sleeping device within sleep window must NOT get 'Device Down'"
)
def test_sleeping_device_expired_window_fires_event(self):
"""
devCanSleep=1, devPresentLastScan=0, sleep window expired
(devLastConnection > NTFPRCS_sleep_time ago) → must fire 'Device Down'.
"""
conn = _make_db(sleep_minutes=30)
cur = conn.cursor()
_insert_device(cur, "bb:00:00:00:00:03", alert_down=1, present_last_scan=0,
can_sleep=1, last_connection=_minutes_ago(45))
conn.commit()
insert_events(DummyDB(conn))
assert "bb:00:00:00:00:03" in _down_event_macs(cur), (
"Sleeping device past its sleep window must get 'Device Down'"
)
def test_sleeping_device_expired_no_duplicate_event(self):
"""
Once a 'Device Down' event exists for the current absence period,
subsequent scan cycles must NOT create another one.
"""
conn = _make_db(sleep_minutes=30)
cur = conn.cursor()
last_conn = _minutes_ago(45)
_insert_device(cur, "bb:00:00:00:00:04", alert_down=1, present_last_scan=0,
can_sleep=1, last_connection=last_conn)
# Simulate: a Device Down event already exists for this absence
cur.execute(
"INSERT INTO Events (eve_MAC, eve_IP, eve_DateTime, eve_EventType, "
"eve_AdditionalInfo, eve_PendingAlertEmail) "
"VALUES (?, '192.168.1.1', ?, 'Device Down', '', 1)",
("bb:00:00:00:00:04", _minutes_ago(15)),
)
conn.commit()
insert_events(DummyDB(conn))
cur.execute(
"SELECT COUNT(*) as cnt FROM Events "
"WHERE eve_MAC = 'bb:00:00:00:00:04' AND eve_EventType = 'Device Down'"
)
count = cur.fetchone()["cnt"]
assert count == 1, (
f"Expected exactly 1 Device Down event, got {count} (duplicate prevention)"
)
def test_sleeping_device_with_alert_down_zero_no_event(self):
"""devCanSleep=1 but devAlertDown=0 → never fires, even after sleep expires."""
conn = _make_db(sleep_minutes=30)
cur = conn.cursor()
_insert_device(cur, "bb:00:00:00:00:05", alert_down=0, present_last_scan=0,
can_sleep=1, last_connection=_minutes_ago(45))
conn.commit()
insert_events(DummyDB(conn))
assert "bb:00:00:00:00:05" not in _down_event_macs(cur)
def test_mixed_sleeping_and_non_sleeping(self):
"""
Non-sleeping device fires immediately on first absence.
Sleeping device within window does NOT fire.
Sleeping device past window DOES fire.
"""
conn = _make_db(sleep_minutes=30)
cur = conn.cursor()
# Non-sleeping, present last scan, absent now → immediate event
_insert_device(cur, "bb:00:00:00:00:10", alert_down=1, present_last_scan=1,
can_sleep=0, last_connection=_minutes_ago(1))
# Sleeping, present last scan (first absence) → NO event
_insert_device(cur, "bb:00:00:00:00:11", alert_down=1, present_last_scan=1,
can_sleep=1, last_connection=_minutes_ago(1))
# Sleeping, within window → NO event
_insert_device(cur, "bb:00:00:00:00:12", alert_down=1, present_last_scan=0,
can_sleep=1, last_connection=_minutes_ago(10))
# Sleeping, past window → event
_insert_device(cur, "bb:00:00:00:00:13", alert_down=1, present_last_scan=0,
can_sleep=1, last_connection=_minutes_ago(45))
conn.commit()
insert_events(DummyDB(conn))
fired = _down_event_macs(cur)
assert "bb:00:00:00:00:10" in fired, "Non-sleeping absent must fire"
assert "bb:00:00:00:00:11" not in fired, "Sleeping first-absence must NOT fire"
assert "bb:00:00:00:00:12" not in fired, "Sleeping within window must NOT fire"
assert "bb:00:00:00:00:13" in fired, "Sleeping past window must fire"
# ---------------------------------------------------------------------------
# Layer 2: DevicesView down-count query (post-presence-update)
#
# After update_presence_from_CurrentScan sets devPresentLastScan → 0 for absent
# devices, the sleeping suppression (devIsSleeping) becomes active for:
# - insertOnlineHistory (SUM ... WHERE devPresentLastScan=0 AND devIsSleeping=0)
# - db_helper "down" filter
# - getDown()
# ---------------------------------------------------------------------------
class TestDownCountSleepingSuppression:
"""
Tests for the post-presence-update down-count query.
Simulates the state AFTER update_presence_from_CurrentScan has run by
inserting devices with devPresentLastScan=0 (already absent) directly.
"""
_DOWN_COUNT_SQL = """
SELECT devMac FROM DevicesView
WHERE devAlertDown != 0
AND devPresentLastScan = 0
AND devIsSleeping = 0
AND devIsArchived = 0
"""
def test_null_alert_down_excluded_from_down_count(self):
"""NULL devAlertDown must not contribute to down count."""
conn = _make_db()
cur = conn.cursor()
_insert_device(cur, "dd:00:00:00:00:01", alert_down=None, present_last_scan=0)
conn.commit()
cur.execute(self._DOWN_COUNT_SQL)
macs = {r["devMac"] for r in cur.fetchall()}
assert "dd:00:00:00:00:01" not in macs
def test_alert_down_one_included_in_down_count(self):
"""devAlertDown=1 absent device must be counted as down."""
conn = _make_db()
cur = conn.cursor()
_insert_device(cur, "dd:00:00:00:00:02", alert_down=1, present_last_scan=0,
last_connection=_minutes_ago(60))
conn.commit()
cur.execute(self._DOWN_COUNT_SQL)
macs = {r["devMac"] for r in cur.fetchall()}
assert "dd:00:00:00:00:02" in macs
def test_sleeping_device_excluded_from_down_count(self):
"""
devCanSleep=1 + absent + within sleep window → devIsSleeping=1.
Must be excluded from the down-count query.
"""
conn = _make_db(sleep_minutes=30)
cur = conn.cursor()
_insert_device(cur, "dd:00:00:00:00:03", alert_down=1, present_last_scan=0,
can_sleep=1, last_connection=_minutes_ago(5))
conn.commit()
cur.execute(self._DOWN_COUNT_SQL)
macs = {r["devMac"] for r in cur.fetchall()}
assert "dd:00:00:00:00:03" not in macs, (
"Sleeping device must be excluded from down count"
)
def test_expired_sleep_window_included_in_down_count(self):
"""Once the sleep window expires the device must appear in down count."""
conn = _make_db(sleep_minutes=30)
cur = conn.cursor()
_insert_device(cur, "dd:00:00:00:00:04", alert_down=1, present_last_scan=0,
can_sleep=1, last_connection=_minutes_ago(45))
conn.commit()
cur.execute(self._DOWN_COUNT_SQL)
macs = {r["devMac"] for r in cur.fetchall()}
assert "dd:00:00:00:00:04" in macs, (
"Device past its sleep window must appear in down count"
)
def test_can_sleep_zero_always_in_down_count(self):
"""devCanSleep=0 device that is absent is always counted as down."""
conn = _make_db(sleep_minutes=30)
cur = conn.cursor()
_insert_device(cur, "dd:00:00:00:00:05", alert_down=1, present_last_scan=0,
can_sleep=0, last_connection=_minutes_ago(5))
conn.commit()
cur.execute(self._DOWN_COUNT_SQL)
macs = {r["devMac"] for r in cur.fetchall()}
assert "dd:00:00:00:00:05" in macs
def test_online_history_down_count_excludes_sleeping(self):
"""
Mirrors the insertOnlineHistory SUM query exactly.
Sleeping devices must not inflate the downDevices count.
"""
conn = _make_db(sleep_minutes=30)
cur = conn.cursor()
# Normal down
_insert_device(cur, "ee:00:00:00:00:01", alert_down=1, present_last_scan=0,
can_sleep=0, last_connection=_minutes_ago(60))
# Sleeping (within window)
_insert_device(cur, "ee:00:00:00:00:02", alert_down=1, present_last_scan=0,
can_sleep=1, last_connection=_minutes_ago(10))
# Online
_insert_device(cur, "ee:00:00:00:00:03", alert_down=1, present_last_scan=1,
last_connection=_minutes_ago(1))
conn.commit()
cur.execute("""
SELECT
COALESCE(SUM(CASE
WHEN devPresentLastScan = 0
AND devAlertDown = 1
AND devIsSleeping = 0
THEN 1 ELSE 0 END), 0) AS downDevices
FROM DevicesView
""")
count = cur.fetchone()["downDevices"]
assert count == 1, (
f"Expected 1 down device (sleeping device must not be counted), got {count}"
)

View File

@@ -58,6 +58,7 @@ class TestDeviceAtomicity(unittest.TestCase):
devScan INTEGER DEFAULT 0,
devAlertEvents INTEGER DEFAULT 0,
devAlertDown INTEGER DEFAULT 0,
devCanSleep INTEGER DEFAULT 0,
devParentRelType TEXT DEFAULT 'default',
devReqNicsOnline INTEGER DEFAULT 0,
devSkipRepeated INTEGER DEFAULT 0,

View File

@@ -8,6 +8,7 @@ import sys
import os
import time
import pytest
from selenium.webdriver.common.by import By
# Add test directory to path
@@ -73,6 +74,16 @@ def get_login_password():
return None
def require_login_page(driver):
"""Skip the test if the login form is not present (web protection disabled)."""
fields = driver.find_elements(By.NAME, "loginpassword")
if not fields:
pytest.skip(
"Web protection is disabled (SETPWD_enable_password != true); "
"login page is not shown on this instance"
)
def perform_login(driver, password=None):
"""Helper function to perform login with optional password fallback
@@ -83,6 +94,7 @@ def perform_login(driver, password=None):
if password is None:
password = "123456" # Default test password
require_login_page(driver)
password_input = driver.find_element(By.NAME, "loginpassword")
password_input.send_keys(password)
@@ -100,7 +112,9 @@ def test_login_page_loads(driver):
driver.get(f"{BASE_URL}/index.php")
wait_for_page_load(driver)
# Check that login form is present
# Skip if web protection is disabled (page redirected away from login form)
require_login_page(driver)
password_field = driver.find_element(By.NAME, "loginpassword")
assert password_field, "Password field should be present"
@@ -230,6 +244,9 @@ def test_url_hash_hidden_input_present(driver):
driver.get(f"{BASE_URL}/index.php")
wait_for_page_load(driver)
# Skip if web protection is disabled (login form not shown)
require_login_page(driver)
# Verify the hidden input field exists
url_hash_input = driver.find_element(By.ID, "url_hash")
assert url_hash_input, "Hidden url_hash input field should be present"