mirror of
https://github.com/jokob-sk/NetAlertX.git
synced 2026-04-11 04:31:25 -07:00
Merge pull request #1604 from netalertx/next_release
Some checks are pending
Some checks are pending
Next release
This commit is contained in:
@@ -166,16 +166,16 @@ def check_guest_wifi_status(fc, guest_service_num):
|
|||||||
return guest_info
|
return guest_info
|
||||||
|
|
||||||
|
|
||||||
def create_guest_wifi_device(fc, host):
|
def create_guest_wifi_device(fc):
|
||||||
"""
|
"""
|
||||||
Create a synthetic device entry for guest WiFi.
|
Create a synthetic device entry for guest WiFi.
|
||||||
Derives a deterministic fake MAC from the Fritz!Box hardware MAC address.
|
Derives a deterministic fake MAC from the Fritz!Box hardware MAC address.
|
||||||
Falls back to the configured host string if the MAC cannot be retrieved.
|
Falls back to a fixed sentinel string if the MAC cannot be retrieved.
|
||||||
Returns: Device dictionary
|
Returns: Device dictionary
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
fritzbox_mac = fc.call_action('DeviceInfo:1', 'GetInfo').get('NewMACAddress', '')
|
fritzbox_mac = fc.call_action('DeviceInfo:1', 'GetInfo').get('NewMACAddress', '')
|
||||||
guest_mac = string_to_fake_mac(normalize_mac(fritzbox_mac) if fritzbox_mac else host)
|
guest_mac = string_to_fake_mac(normalize_mac(fritzbox_mac) if fritzbox_mac else 'FRITZBOX_GUEST')
|
||||||
|
|
||||||
device = {
|
device = {
|
||||||
'mac_address': guest_mac,
|
'mac_address': guest_mac,
|
||||||
@@ -224,7 +224,7 @@ def main():
|
|||||||
if report_guest:
|
if report_guest:
|
||||||
guest_status = check_guest_wifi_status(fc, guest_service)
|
guest_status = check_guest_wifi_status(fc, guest_service)
|
||||||
if guest_status['active']:
|
if guest_status['active']:
|
||||||
guest_device = create_guest_wifi_device(fc, host)
|
guest_device = create_guest_wifi_device(fc)
|
||||||
if guest_device:
|
if guest_device:
|
||||||
device_data.append(guest_device)
|
device_data.append(guest_device)
|
||||||
|
|
||||||
|
|||||||
@@ -20,6 +20,10 @@ from messaging.reporting import skip_repeated_notifications
|
|||||||
from messaging.in_app import update_unread_notifications_count
|
from messaging.in_app import update_unread_notifications_count
|
||||||
from const import NULL_EQUIVALENTS_SQL
|
from const import NULL_EQUIVALENTS_SQL
|
||||||
|
|
||||||
|
# Predicate used in every negative-event INSERT to skip forced-online devices.
|
||||||
|
# Centralised here so all three event paths stay in sync.
|
||||||
|
_SQL_NOT_FORCED_ONLINE = "LOWER(COALESCE(devForceStatus, '')) != 'online'"
|
||||||
|
|
||||||
|
|
||||||
# Make sure log level is initialized correctly
|
# Make sure log level is initialized correctly
|
||||||
Logger(get_setting_value("LOG_LEVEL"))
|
Logger(get_setting_value("LOG_LEVEL"))
|
||||||
@@ -179,6 +183,7 @@ def insert_events(db):
|
|||||||
WHERE devAlertDown != 0
|
WHERE devAlertDown != 0
|
||||||
AND devCanSleep = 0
|
AND devCanSleep = 0
|
||||||
AND devPresentLastScan = 1
|
AND devPresentLastScan = 1
|
||||||
|
AND {_SQL_NOT_FORCED_ONLINE}
|
||||||
AND NOT EXISTS (SELECT 1 FROM CurrentScan
|
AND NOT EXISTS (SELECT 1 FROM CurrentScan
|
||||||
WHERE devMac = scanMac
|
WHERE devMac = scanMac
|
||||||
) """)
|
) """)
|
||||||
@@ -194,6 +199,7 @@ def insert_events(db):
|
|||||||
AND devCanSleep = 1
|
AND devCanSleep = 1
|
||||||
AND devIsSleeping = 0
|
AND devIsSleeping = 0
|
||||||
AND devPresentLastScan = 0
|
AND devPresentLastScan = 0
|
||||||
|
AND {_SQL_NOT_FORCED_ONLINE}
|
||||||
AND NOT EXISTS (SELECT 1 FROM CurrentScan
|
AND NOT EXISTS (SELECT 1 FROM CurrentScan
|
||||||
WHERE devMac = scanMac)
|
WHERE devMac = scanMac)
|
||||||
AND NOT EXISTS (SELECT 1 FROM Events
|
AND NOT EXISTS (SELECT 1 FROM Events
|
||||||
@@ -229,6 +235,7 @@ def insert_events(db):
|
|||||||
FROM Devices
|
FROM Devices
|
||||||
WHERE devAlertDown = 0
|
WHERE devAlertDown = 0
|
||||||
AND devPresentLastScan = 1
|
AND devPresentLastScan = 1
|
||||||
|
AND {_SQL_NOT_FORCED_ONLINE}
|
||||||
AND NOT EXISTS (SELECT 1 FROM CurrentScan
|
AND NOT EXISTS (SELECT 1 FROM CurrentScan
|
||||||
WHERE devMac = scanMac
|
WHERE devMac = scanMac
|
||||||
) """)
|
) """)
|
||||||
|
|||||||
@@ -171,6 +171,7 @@ def insert_device(
|
|||||||
can_sleep: int = 0,
|
can_sleep: int = 0,
|
||||||
last_connection: str | None = None,
|
last_connection: str | None = None,
|
||||||
last_ip: str = "192.168.1.1",
|
last_ip: str = "192.168.1.1",
|
||||||
|
force_status: str | None = None,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""
|
"""
|
||||||
Insert a minimal Devices row.
|
Insert a minimal Devices row.
|
||||||
@@ -189,16 +190,19 @@ def insert_device(
|
|||||||
ISO-8601 UTC string; defaults to 60 minutes ago when omitted.
|
ISO-8601 UTC string; defaults to 60 minutes ago when omitted.
|
||||||
last_ip:
|
last_ip:
|
||||||
Value stored in devLastIP.
|
Value stored in devLastIP.
|
||||||
|
force_status:
|
||||||
|
Value for devForceStatus (``'online'``, ``'offline'``, or ``None``/
|
||||||
|
``'dont_force'``).
|
||||||
"""
|
"""
|
||||||
cur.execute(
|
cur.execute(
|
||||||
"""
|
"""
|
||||||
INSERT INTO Devices
|
INSERT INTO Devices
|
||||||
(devMac, devAlertDown, devPresentLastScan, devCanSleep,
|
(devMac, devAlertDown, devPresentLastScan, devCanSleep,
|
||||||
devLastConnection, devLastIP, devIsArchived, devIsNew)
|
devLastConnection, devLastIP, devIsArchived, devIsNew, devForceStatus)
|
||||||
VALUES (?, ?, ?, ?, ?, ?, 0, 0)
|
VALUES (?, ?, ?, ?, ?, ?, 0, 0, ?)
|
||||||
""",
|
""",
|
||||||
(mac, alert_down, present_last_scan, can_sleep,
|
(mac, alert_down, present_last_scan, can_sleep,
|
||||||
last_connection or minutes_ago(60), last_ip),
|
last_connection or minutes_ago(60), last_ip, force_status),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -7,11 +7,12 @@ first import so no live config reads, log files, or result files are
|
|||||||
created during tests.
|
created during tests.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import hashlib
|
|
||||||
import sys
|
import sys
|
||||||
import os
|
import os
|
||||||
from unittest.mock import patch, MagicMock
|
from unittest.mock import patch, MagicMock
|
||||||
|
|
||||||
|
from utils.crypto_utils import string_to_fake_mac
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
@@ -59,19 +60,12 @@ def _make_host_entry(mac="AA:BB:CC:DD:EE:FF", ip="192.168.1.10",
|
|||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def mock_fritz_hosts():
|
def mock_fritz_hosts():
|
||||||
"""
|
"""
|
||||||
Patches fritzconnection.lib.fritzhosts in sys.modules so that
|
Patches fritzbox.FritzHosts so that get_connected_devices() uses a
|
||||||
fritzbox.get_connected_devices() uses a controllable FritzHosts mock.
|
controllable mock. Yields the FritzHosts *instance* (what FritzHosts(fc)
|
||||||
Yields the FritzHosts *instance* (what FritzHosts(fc) returns).
|
returns).
|
||||||
"""
|
"""
|
||||||
hosts_instance = MagicMock()
|
hosts_instance = MagicMock()
|
||||||
fritz_hosts_module = MagicMock()
|
with patch("fritzbox.FritzHosts", return_value=hosts_instance):
|
||||||
fritz_hosts_module.FritzHosts = MagicMock(return_value=hosts_instance)
|
|
||||||
|
|
||||||
with patch.dict("sys.modules", {
|
|
||||||
"fritzconnection": MagicMock(),
|
|
||||||
"fritzconnection.lib": MagicMock(),
|
|
||||||
"fritzconnection.lib.fritzhosts": fritz_hosts_module,
|
|
||||||
}):
|
|
||||||
yield hosts_instance
|
yield hosts_instance
|
||||||
|
|
||||||
|
|
||||||
@@ -229,12 +223,15 @@ class TestCreateGuestWifiDevice:
|
|||||||
assert device["active_status"] == "Active"
|
assert device["active_status"] == "Active"
|
||||||
assert device["interface_type"] == "Access Point"
|
assert device["interface_type"] == "Access Point"
|
||||||
assert device["ip_address"] == ""
|
assert device["ip_address"] == ""
|
||||||
|
# MAC must match string_to_fake_mac output (fa:ce: prefix)
|
||||||
|
assert device["mac_address"].startswith("fa:ce:")
|
||||||
|
|
||||||
def test_guest_mac_has_locally_administered_bit(self):
|
def test_guest_mac_has_locally_administered_bit(self):
|
||||||
"""First byte must be 0x02 — locally-administered, unicast."""
|
"""The locally-administered bit (0x02) must be set in the first byte.
|
||||||
|
string_to_fake_mac uses the 'fa:ce:' prefix; 0xFA & 0x02 == 0x02."""
|
||||||
device = fritzbox.create_guest_wifi_device(self._fc_with_mac("AA:BB:CC:DD:EE:FF"))
|
device = fritzbox.create_guest_wifi_device(self._fc_with_mac("AA:BB:CC:DD:EE:FF"))
|
||||||
first_byte = int(device["mac_address"].split(":")[0], 16)
|
first_byte = int(device["mac_address"].split(":")[0], 16)
|
||||||
assert first_byte == 0x02
|
assert first_byte & 0x02 != 0
|
||||||
|
|
||||||
def test_guest_mac_format_is_valid(self):
|
def test_guest_mac_format_is_valid(self):
|
||||||
"""MAC must be 6 colon-separated lowercase hex pairs."""
|
"""MAC must be 6 colon-separated lowercase hex pairs."""
|
||||||
@@ -258,11 +255,11 @@ class TestCreateGuestWifiDevice:
|
|||||||
assert mac_a != mac_b
|
assert mac_a != mac_b
|
||||||
|
|
||||||
def test_no_fritzbox_mac_uses_fallback(self):
|
def test_no_fritzbox_mac_uses_fallback(self):
|
||||||
"""When DeviceInfo returns no MAC, fall back to 02:00:00:00:00:01."""
|
"""When DeviceInfo returns no MAC, fall back to a sentinel-derived MAC."""
|
||||||
fc = MagicMock()
|
fc = MagicMock()
|
||||||
fc.call_action.return_value = {"NewMACAddress": ""}
|
fc.call_action.return_value = {"NewMACAddress": ""}
|
||||||
device = fritzbox.create_guest_wifi_device(fc)
|
device = fritzbox.create_guest_wifi_device(fc)
|
||||||
assert device["mac_address"] == "02:00:00:00:00:01"
|
assert device["mac_address"] == string_to_fake_mac("FRITZBOX_GUEST")
|
||||||
|
|
||||||
def test_device_info_exception_returns_none(self):
|
def test_device_info_exception_returns_none(self):
|
||||||
"""If DeviceInfo call raises, create_guest_wifi_device must return None."""
|
"""If DeviceInfo call raises, create_guest_wifi_device must return None."""
|
||||||
@@ -274,12 +271,11 @@ class TestCreateGuestWifiDevice:
|
|||||||
def test_known_mac_produces_known_guest_mac(self):
|
def test_known_mac_produces_known_guest_mac(self):
|
||||||
"""
|
"""
|
||||||
Regression anchor: for a fixed Fritz!Box MAC, the expected guest MAC
|
Regression anchor: for a fixed Fritz!Box MAC, the expected guest MAC
|
||||||
is precomputed here independently. If the hashing logic in
|
is derived via string_to_fake_mac(normalize_mac(...)). If the hashing
|
||||||
fritzbox.py changes, this test fails immediately.
|
logic in fritzbox.py or string_to_fake_mac changes, this test fails.
|
||||||
"""
|
"""
|
||||||
fritzbox_mac = "aa:bb:cc:dd:ee:ff" # normalize_mac output of "AA:BB:CC:DD:EE:FF"
|
fritzbox_mac = normalize_mac("AA:BB:CC:DD:EE:FF")
|
||||||
digest = hashlib.md5(f"GUEST:{fritzbox_mac}".encode()).digest()
|
expected = string_to_fake_mac(fritzbox_mac)
|
||||||
expected = "02:" + ":".join(f"{b:02x}" for b in digest[:5])
|
|
||||||
|
|
||||||
device = fritzbox.create_guest_wifi_device(self._fc_with_mac("AA:BB:CC:DD:EE:FF"))
|
device = fritzbox.create_guest_wifi_device(self._fc_with_mac("AA:BB:CC:DD:EE:FF"))
|
||||||
assert device["mac_address"] == expected
|
assert device["mac_address"] == expected
|
||||||
@@ -296,10 +292,8 @@ class TestGetFritzboxConnection:
|
|||||||
fc_instance.modelname = "FRITZ!Box 7590"
|
fc_instance.modelname = "FRITZ!Box 7590"
|
||||||
fc_instance.system_version = "7.57"
|
fc_instance.system_version = "7.57"
|
||||||
fc_class = MagicMock(return_value=fc_instance)
|
fc_class = MagicMock(return_value=fc_instance)
|
||||||
fc_module = MagicMock()
|
|
||||||
fc_module.FritzConnection = fc_class
|
|
||||||
|
|
||||||
with patch.dict("sys.modules", {"fritzconnection": fc_module}):
|
with patch("fritzbox.FritzConnection", fc_class):
|
||||||
result = fritzbox.get_fritzbox_connection("fritz.box", 49443, "admin", "pass", True)
|
result = fritzbox.get_fritzbox_connection("fritz.box", 49443, "admin", "pass", True)
|
||||||
|
|
||||||
assert result is fc_instance
|
assert result is fc_instance
|
||||||
@@ -308,16 +302,13 @@ class TestGetFritzboxConnection:
|
|||||||
)
|
)
|
||||||
|
|
||||||
def test_import_error_returns_none(self):
|
def test_import_error_returns_none(self):
|
||||||
with patch.dict("sys.modules", {"fritzconnection": None}):
|
with patch("fritzbox.FritzConnection", side_effect=ImportError("fritzconnection not found")):
|
||||||
result = fritzbox.get_fritzbox_connection("fritz.box", 49443, "admin", "pass", True)
|
result = fritzbox.get_fritzbox_connection("fritz.box", 49443, "admin", "pass", True)
|
||||||
|
|
||||||
assert result is None
|
assert result is None
|
||||||
|
|
||||||
def test_connection_exception_returns_none(self):
|
def test_connection_exception_returns_none(self):
|
||||||
fc_module = MagicMock()
|
with patch("fritzbox.FritzConnection", side_effect=Exception("Connection refused")):
|
||||||
fc_module.FritzConnection.side_effect = Exception("Connection refused")
|
|
||||||
|
|
||||||
with patch.dict("sys.modules", {"fritzconnection": fc_module}):
|
|
||||||
result = fritzbox.get_fritzbox_connection("fritz.box", 49443, "admin", "pass", True)
|
result = fritzbox.get_fritzbox_connection("fritz.box", 49443, "admin", "pass", True)
|
||||||
|
|
||||||
assert result is None
|
assert result is None
|
||||||
|
|||||||
@@ -444,3 +444,122 @@ class TestDownCountSleepingSuppression:
|
|||||||
assert count == 1, (
|
assert count == 1, (
|
||||||
f"Expected 1 down device (sleeping device must not be counted), got {count}"
|
f"Expected 1 down device (sleeping device must not be counted), got {count}"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Layer 1c: insert_events() — forced-online device suppression
|
||||||
|
#
|
||||||
|
# Devices with devForceStatus='online' are always considered present by the
|
||||||
|
# operator. Generating 'Device Down' or 'Disconnected' events for them causes
|
||||||
|
# spurious flapping detection (devFlapping counts these events in DevicesView).
|
||||||
|
#
|
||||||
|
# Affected queries in insert_events():
|
||||||
|
# 1a Device Down (non-sleeping) — DevicesView query
|
||||||
|
# 1b Device Down (sleep-expired) — DevicesView query
|
||||||
|
# 3 Disconnected — Devices table query
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
class TestInsertEventsForceOnline:
|
||||||
|
"""
|
||||||
|
Regression tests: forced-online devices must never generate
|
||||||
|
'Device Down' or 'Disconnected' events.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def test_forced_online_no_device_down_event(self):
|
||||||
|
"""
|
||||||
|
devForceStatus='online', devAlertDown=1, absent from CurrentScan.
|
||||||
|
Must NOT produce a 'Device Down' event (regression: used to fire and
|
||||||
|
cause devFlapping=1 after the threshold was reached).
|
||||||
|
"""
|
||||||
|
conn = _make_db()
|
||||||
|
cur = conn.cursor()
|
||||||
|
_insert_device(cur, "ff:00:00:00:00:01", alert_down=1, present_last_scan=1,
|
||||||
|
force_status="online")
|
||||||
|
conn.commit()
|
||||||
|
|
||||||
|
insert_events(DummyDB(conn))
|
||||||
|
|
||||||
|
assert "ff:00:00:00:00:01" not in _down_event_macs(cur), (
|
||||||
|
"forced-online device must never generate a 'Device Down' event"
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_forced_online_sleep_expired_no_device_down_event(self):
|
||||||
|
"""
|
||||||
|
devForceStatus='online', devCanSleep=1, sleep window expired.
|
||||||
|
Must NOT produce a 'Device Down' event via the sleep-expired path.
|
||||||
|
"""
|
||||||
|
conn = _make_db(sleep_minutes=30)
|
||||||
|
cur = conn.cursor()
|
||||||
|
_insert_device(cur, "ff:00:00:00:00:02", alert_down=1, present_last_scan=0,
|
||||||
|
can_sleep=1, last_connection=_minutes_ago(45),
|
||||||
|
force_status="online")
|
||||||
|
conn.commit()
|
||||||
|
|
||||||
|
insert_events(DummyDB(conn))
|
||||||
|
|
||||||
|
assert "ff:00:00:00:00:02" not in _down_event_macs(cur), (
|
||||||
|
"forced-online sleeping device must not get 'Device Down' after sleep expires"
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_forced_online_no_disconnected_event(self):
|
||||||
|
"""
|
||||||
|
devForceStatus='online', devAlertDown=0 (Disconnected path), absent.
|
||||||
|
Must NOT produce a 'Disconnected' event.
|
||||||
|
"""
|
||||||
|
conn = _make_db()
|
||||||
|
cur = conn.cursor()
|
||||||
|
_insert_device(cur, "ff:00:00:00:00:03", alert_down=0, present_last_scan=1,
|
||||||
|
force_status="online")
|
||||||
|
conn.commit()
|
||||||
|
|
||||||
|
insert_events(DummyDB(conn))
|
||||||
|
|
||||||
|
cur.execute(
|
||||||
|
"SELECT COUNT(*) AS cnt FROM Events "
|
||||||
|
"WHERE eveMac = 'ff:00:00:00:00:03' AND eveEventType = 'Disconnected'"
|
||||||
|
)
|
||||||
|
assert cur.fetchone()["cnt"] == 0, (
|
||||||
|
"forced-online device must never generate a 'Disconnected' event"
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_forced_online_uppercase_no_device_down_event(self):
|
||||||
|
"""devForceStatus='ONLINE' (uppercase) must also be suppressed."""
|
||||||
|
conn = _make_db()
|
||||||
|
cur = conn.cursor()
|
||||||
|
_insert_device(cur, "ff:00:00:00:00:04", alert_down=1, present_last_scan=1,
|
||||||
|
force_status="ONLINE")
|
||||||
|
conn.commit()
|
||||||
|
|
||||||
|
insert_events(DummyDB(conn))
|
||||||
|
|
||||||
|
assert "ff:00:00:00:00:04" not in _down_event_macs(cur), (
|
||||||
|
"forced-online device (uppercase) must never generate a 'Device Down' event"
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_dont_force_still_fires_device_down(self):
|
||||||
|
"""devForceStatus='dont_force' must behave normally — event fires."""
|
||||||
|
conn = _make_db()
|
||||||
|
cur = conn.cursor()
|
||||||
|
_insert_device(cur, "ff:00:00:00:00:05", alert_down=1, present_last_scan=1,
|
||||||
|
force_status="dont_force")
|
||||||
|
conn.commit()
|
||||||
|
|
||||||
|
insert_events(DummyDB(conn))
|
||||||
|
|
||||||
|
assert "ff:00:00:00:00:05" in _down_event_macs(cur), (
|
||||||
|
"dont_force device must still generate 'Device Down' when absent"
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_forced_offline_still_fires_device_down(self):
|
||||||
|
"""devForceStatus='offline' suppresses nothing — event fires."""
|
||||||
|
conn = _make_db()
|
||||||
|
cur = conn.cursor()
|
||||||
|
_insert_device(cur, "ff:00:00:00:00:06", alert_down=1, present_last_scan=1,
|
||||||
|
force_status="offline")
|
||||||
|
conn.commit()
|
||||||
|
|
||||||
|
insert_events(DummyDB(conn))
|
||||||
|
|
||||||
|
assert "ff:00:00:00:00:06" in _down_event_macs(cur), (
|
||||||
|
"forced-offline device must still generate 'Device Down' when absent"
|
||||||
|
)
|
||||||
|
|||||||
Reference in New Issue
Block a user