Merge pull request #1604 from netalertx/next_release
Some checks are pending
✅ Code checks / check-url-paths (push) Waiting to run
✅ Code checks / lint (push) Waiting to run
✅ Code checks / docker-tests (push) Waiting to run
🐳 👩‍💻 docker dev / docker_dev (push) Waiting to run
📘 Deploy MkDocs / deploy (push) Waiting to run

Next release
This commit is contained in:
Jokob @NetAlertX
2026-04-11 08:45:33 +10:00
committed by GitHub
5 changed files with 157 additions and 36 deletions

View File

@@ -166,16 +166,16 @@ def check_guest_wifi_status(fc, guest_service_num):
return guest_info return guest_info
def create_guest_wifi_device(fc, host): def create_guest_wifi_device(fc):
""" """
Create a synthetic device entry for guest WiFi. Create a synthetic device entry for guest WiFi.
Derives a deterministic fake MAC from the Fritz!Box hardware MAC address. Derives a deterministic fake MAC from the Fritz!Box hardware MAC address.
Falls back to the configured host string if the MAC cannot be retrieved. Falls back to a fixed sentinel string if the MAC cannot be retrieved.
Returns: Device dictionary Returns: Device dictionary
""" """
try: try:
fritzbox_mac = fc.call_action('DeviceInfo:1', 'GetInfo').get('NewMACAddress', '') fritzbox_mac = fc.call_action('DeviceInfo:1', 'GetInfo').get('NewMACAddress', '')
guest_mac = string_to_fake_mac(normalize_mac(fritzbox_mac) if fritzbox_mac else host) guest_mac = string_to_fake_mac(normalize_mac(fritzbox_mac) if fritzbox_mac else 'FRITZBOX_GUEST')
device = { device = {
'mac_address': guest_mac, 'mac_address': guest_mac,
@@ -224,7 +224,7 @@ def main():
if report_guest: if report_guest:
guest_status = check_guest_wifi_status(fc, guest_service) guest_status = check_guest_wifi_status(fc, guest_service)
if guest_status['active']: if guest_status['active']:
guest_device = create_guest_wifi_device(fc, host) guest_device = create_guest_wifi_device(fc)
if guest_device: if guest_device:
device_data.append(guest_device) device_data.append(guest_device)

View File

@@ -20,6 +20,10 @@ from messaging.reporting import skip_repeated_notifications
from messaging.in_app import update_unread_notifications_count from messaging.in_app import update_unread_notifications_count
from const import NULL_EQUIVALENTS_SQL from const import NULL_EQUIVALENTS_SQL
# Predicate used in every negative-event INSERT to skip forced-online devices.
# Centralised here so all three event paths stay in sync.
_SQL_NOT_FORCED_ONLINE = "LOWER(COALESCE(devForceStatus, '')) != 'online'"
# Make sure log level is initialized correctly # Make sure log level is initialized correctly
Logger(get_setting_value("LOG_LEVEL")) Logger(get_setting_value("LOG_LEVEL"))
@@ -179,6 +183,7 @@ def insert_events(db):
WHERE devAlertDown != 0 WHERE devAlertDown != 0
AND devCanSleep = 0 AND devCanSleep = 0
AND devPresentLastScan = 1 AND devPresentLastScan = 1
AND {_SQL_NOT_FORCED_ONLINE}
AND NOT EXISTS (SELECT 1 FROM CurrentScan AND NOT EXISTS (SELECT 1 FROM CurrentScan
WHERE devMac = scanMac WHERE devMac = scanMac
) """) ) """)
@@ -194,6 +199,7 @@ def insert_events(db):
AND devCanSleep = 1 AND devCanSleep = 1
AND devIsSleeping = 0 AND devIsSleeping = 0
AND devPresentLastScan = 0 AND devPresentLastScan = 0
AND {_SQL_NOT_FORCED_ONLINE}
AND NOT EXISTS (SELECT 1 FROM CurrentScan AND NOT EXISTS (SELECT 1 FROM CurrentScan
WHERE devMac = scanMac) WHERE devMac = scanMac)
AND NOT EXISTS (SELECT 1 FROM Events AND NOT EXISTS (SELECT 1 FROM Events
@@ -229,6 +235,7 @@ def insert_events(db):
FROM Devices FROM Devices
WHERE devAlertDown = 0 WHERE devAlertDown = 0
AND devPresentLastScan = 1 AND devPresentLastScan = 1
AND {_SQL_NOT_FORCED_ONLINE}
AND NOT EXISTS (SELECT 1 FROM CurrentScan AND NOT EXISTS (SELECT 1 FROM CurrentScan
WHERE devMac = scanMac WHERE devMac = scanMac
) """) ) """)

View File

@@ -171,6 +171,7 @@ def insert_device(
can_sleep: int = 0, can_sleep: int = 0,
last_connection: str | None = None, last_connection: str | None = None,
last_ip: str = "192.168.1.1", last_ip: str = "192.168.1.1",
force_status: str | None = None,
) -> None: ) -> None:
""" """
Insert a minimal Devices row. Insert a minimal Devices row.
@@ -189,16 +190,19 @@ def insert_device(
ISO-8601 UTC string; defaults to 60 minutes ago when omitted. ISO-8601 UTC string; defaults to 60 minutes ago when omitted.
last_ip: last_ip:
Value stored in devLastIP. Value stored in devLastIP.
force_status:
Value for devForceStatus (``'online'``, ``'offline'``, or ``None``/
``'dont_force'``).
""" """
cur.execute( cur.execute(
""" """
INSERT INTO Devices INSERT INTO Devices
(devMac, devAlertDown, devPresentLastScan, devCanSleep, (devMac, devAlertDown, devPresentLastScan, devCanSleep,
devLastConnection, devLastIP, devIsArchived, devIsNew) devLastConnection, devLastIP, devIsArchived, devIsNew, devForceStatus)
VALUES (?, ?, ?, ?, ?, ?, 0, 0) VALUES (?, ?, ?, ?, ?, ?, 0, 0, ?)
""", """,
(mac, alert_down, present_last_scan, can_sleep, (mac, alert_down, present_last_scan, can_sleep,
last_connection or minutes_ago(60), last_ip), last_connection or minutes_ago(60), last_ip, force_status),
) )

View File

@@ -7,11 +7,12 @@ first import so no live config reads, log files, or result files are
created during tests. created during tests.
""" """
import hashlib
import sys import sys
import os import os
from unittest.mock import patch, MagicMock from unittest.mock import patch, MagicMock
from utils.crypto_utils import string_to_fake_mac
import pytest import pytest
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@@ -59,19 +60,12 @@ def _make_host_entry(mac="AA:BB:CC:DD:EE:FF", ip="192.168.1.10",
@pytest.fixture @pytest.fixture
def mock_fritz_hosts(): def mock_fritz_hosts():
""" """
Patches fritzconnection.lib.fritzhosts in sys.modules so that Patches fritzbox.FritzHosts so that get_connected_devices() uses a
fritzbox.get_connected_devices() uses a controllable FritzHosts mock. controllable mock. Yields the FritzHosts *instance* (what FritzHosts(fc)
Yields the FritzHosts *instance* (what FritzHosts(fc) returns). returns).
""" """
hosts_instance = MagicMock() hosts_instance = MagicMock()
fritz_hosts_module = MagicMock() with patch("fritzbox.FritzHosts", return_value=hosts_instance):
fritz_hosts_module.FritzHosts = MagicMock(return_value=hosts_instance)
with patch.dict("sys.modules", {
"fritzconnection": MagicMock(),
"fritzconnection.lib": MagicMock(),
"fritzconnection.lib.fritzhosts": fritz_hosts_module,
}):
yield hosts_instance yield hosts_instance
@@ -229,12 +223,15 @@ class TestCreateGuestWifiDevice:
assert device["active_status"] == "Active" assert device["active_status"] == "Active"
assert device["interface_type"] == "Access Point" assert device["interface_type"] == "Access Point"
assert device["ip_address"] == "" assert device["ip_address"] == ""
# MAC must match string_to_fake_mac output (fa:ce: prefix)
assert device["mac_address"].startswith("fa:ce:")
def test_guest_mac_has_locally_administered_bit(self): def test_guest_mac_has_locally_administered_bit(self):
"""First byte must be 0x02 — locally-administered, unicast.""" """The locally-administered bit (0x02) must be set in the first byte.
string_to_fake_mac uses the 'fa:ce:' prefix; 0xFA & 0x02 == 0x02."""
device = fritzbox.create_guest_wifi_device(self._fc_with_mac("AA:BB:CC:DD:EE:FF")) device = fritzbox.create_guest_wifi_device(self._fc_with_mac("AA:BB:CC:DD:EE:FF"))
first_byte = int(device["mac_address"].split(":")[0], 16) first_byte = int(device["mac_address"].split(":")[0], 16)
assert first_byte == 0x02 assert first_byte & 0x02 != 0
def test_guest_mac_format_is_valid(self): def test_guest_mac_format_is_valid(self):
"""MAC must be 6 colon-separated lowercase hex pairs.""" """MAC must be 6 colon-separated lowercase hex pairs."""
@@ -258,11 +255,11 @@ class TestCreateGuestWifiDevice:
assert mac_a != mac_b assert mac_a != mac_b
def test_no_fritzbox_mac_uses_fallback(self): def test_no_fritzbox_mac_uses_fallback(self):
"""When DeviceInfo returns no MAC, fall back to 02:00:00:00:00:01.""" """When DeviceInfo returns no MAC, fall back to a sentinel-derived MAC."""
fc = MagicMock() fc = MagicMock()
fc.call_action.return_value = {"NewMACAddress": ""} fc.call_action.return_value = {"NewMACAddress": ""}
device = fritzbox.create_guest_wifi_device(fc) device = fritzbox.create_guest_wifi_device(fc)
assert device["mac_address"] == "02:00:00:00:00:01" assert device["mac_address"] == string_to_fake_mac("FRITZBOX_GUEST")
def test_device_info_exception_returns_none(self): def test_device_info_exception_returns_none(self):
"""If DeviceInfo call raises, create_guest_wifi_device must return None.""" """If DeviceInfo call raises, create_guest_wifi_device must return None."""
@@ -274,12 +271,11 @@ class TestCreateGuestWifiDevice:
def test_known_mac_produces_known_guest_mac(self): def test_known_mac_produces_known_guest_mac(self):
""" """
Regression anchor: for a fixed Fritz!Box MAC, the expected guest MAC Regression anchor: for a fixed Fritz!Box MAC, the expected guest MAC
is precomputed here independently. If the hashing logic in is derived via string_to_fake_mac(normalize_mac(...)). If the hashing
fritzbox.py changes, this test fails immediately. logic in fritzbox.py or string_to_fake_mac changes, this test fails.
""" """
fritzbox_mac = "aa:bb:cc:dd:ee:ff" # normalize_mac output of "AA:BB:CC:DD:EE:FF" fritzbox_mac = normalize_mac("AA:BB:CC:DD:EE:FF")
digest = hashlib.md5(f"GUEST:{fritzbox_mac}".encode()).digest() expected = string_to_fake_mac(fritzbox_mac)
expected = "02:" + ":".join(f"{b:02x}" for b in digest[:5])
device = fritzbox.create_guest_wifi_device(self._fc_with_mac("AA:BB:CC:DD:EE:FF")) device = fritzbox.create_guest_wifi_device(self._fc_with_mac("AA:BB:CC:DD:EE:FF"))
assert device["mac_address"] == expected assert device["mac_address"] == expected
@@ -296,10 +292,8 @@ class TestGetFritzboxConnection:
fc_instance.modelname = "FRITZ!Box 7590" fc_instance.modelname = "FRITZ!Box 7590"
fc_instance.system_version = "7.57" fc_instance.system_version = "7.57"
fc_class = MagicMock(return_value=fc_instance) fc_class = MagicMock(return_value=fc_instance)
fc_module = MagicMock()
fc_module.FritzConnection = fc_class
with patch.dict("sys.modules", {"fritzconnection": fc_module}): with patch("fritzbox.FritzConnection", fc_class):
result = fritzbox.get_fritzbox_connection("fritz.box", 49443, "admin", "pass", True) result = fritzbox.get_fritzbox_connection("fritz.box", 49443, "admin", "pass", True)
assert result is fc_instance assert result is fc_instance
@@ -308,16 +302,13 @@ class TestGetFritzboxConnection:
) )
def test_import_error_returns_none(self): def test_import_error_returns_none(self):
with patch.dict("sys.modules", {"fritzconnection": None}): with patch("fritzbox.FritzConnection", side_effect=ImportError("fritzconnection not found")):
result = fritzbox.get_fritzbox_connection("fritz.box", 49443, "admin", "pass", True) result = fritzbox.get_fritzbox_connection("fritz.box", 49443, "admin", "pass", True)
assert result is None assert result is None
def test_connection_exception_returns_none(self): def test_connection_exception_returns_none(self):
fc_module = MagicMock() with patch("fritzbox.FritzConnection", side_effect=Exception("Connection refused")):
fc_module.FritzConnection.side_effect = Exception("Connection refused")
with patch.dict("sys.modules", {"fritzconnection": fc_module}):
result = fritzbox.get_fritzbox_connection("fritz.box", 49443, "admin", "pass", True) result = fritzbox.get_fritzbox_connection("fritz.box", 49443, "admin", "pass", True)
assert result is None assert result is None

View File

@@ -444,3 +444,122 @@ class TestDownCountSleepingSuppression:
assert count == 1, ( assert count == 1, (
f"Expected 1 down device (sleeping device must not be counted), got {count}" f"Expected 1 down device (sleeping device must not be counted), got {count}"
) )
# ---------------------------------------------------------------------------
# Layer 1c: insert_events() — forced-online device suppression
#
# Devices with devForceStatus='online' are always considered present by the
# operator. Generating 'Device Down' or 'Disconnected' events for them causes
# spurious flapping detection (devFlapping counts these events in DevicesView).
#
# Affected queries in insert_events():
# 1a Device Down (non-sleeping) — DevicesView query
# 1b Device Down (sleep-expired) — DevicesView query
# 3 Disconnected — Devices table query
# ---------------------------------------------------------------------------
class TestInsertEventsForceOnline:
"""
Regression tests: forced-online devices must never generate
'Device Down' or 'Disconnected' events.
"""
def test_forced_online_no_device_down_event(self):
"""
devForceStatus='online', devAlertDown=1, absent from CurrentScan.
Must NOT produce a 'Device Down' event (regression: used to fire and
cause devFlapping=1 after the threshold was reached).
"""
conn = _make_db()
cur = conn.cursor()
_insert_device(cur, "ff:00:00:00:00:01", alert_down=1, present_last_scan=1,
force_status="online")
conn.commit()
insert_events(DummyDB(conn))
assert "ff:00:00:00:00:01" not in _down_event_macs(cur), (
"forced-online device must never generate a 'Device Down' event"
)
def test_forced_online_sleep_expired_no_device_down_event(self):
"""
devForceStatus='online', devCanSleep=1, sleep window expired.
Must NOT produce a 'Device Down' event via the sleep-expired path.
"""
conn = _make_db(sleep_minutes=30)
cur = conn.cursor()
_insert_device(cur, "ff:00:00:00:00:02", alert_down=1, present_last_scan=0,
can_sleep=1, last_connection=_minutes_ago(45),
force_status="online")
conn.commit()
insert_events(DummyDB(conn))
assert "ff:00:00:00:00:02" not in _down_event_macs(cur), (
"forced-online sleeping device must not get 'Device Down' after sleep expires"
)
def test_forced_online_no_disconnected_event(self):
"""
devForceStatus='online', devAlertDown=0 (Disconnected path), absent.
Must NOT produce a 'Disconnected' event.
"""
conn = _make_db()
cur = conn.cursor()
_insert_device(cur, "ff:00:00:00:00:03", alert_down=0, present_last_scan=1,
force_status="online")
conn.commit()
insert_events(DummyDB(conn))
cur.execute(
"SELECT COUNT(*) AS cnt FROM Events "
"WHERE eveMac = 'ff:00:00:00:00:03' AND eveEventType = 'Disconnected'"
)
assert cur.fetchone()["cnt"] == 0, (
"forced-online device must never generate a 'Disconnected' event"
)
def test_forced_online_uppercase_no_device_down_event(self):
"""devForceStatus='ONLINE' (uppercase) must also be suppressed."""
conn = _make_db()
cur = conn.cursor()
_insert_device(cur, "ff:00:00:00:00:04", alert_down=1, present_last_scan=1,
force_status="ONLINE")
conn.commit()
insert_events(DummyDB(conn))
assert "ff:00:00:00:00:04" not in _down_event_macs(cur), (
"forced-online device (uppercase) must never generate a 'Device Down' event"
)
def test_dont_force_still_fires_device_down(self):
"""devForceStatus='dont_force' must behave normally — event fires."""
conn = _make_db()
cur = conn.cursor()
_insert_device(cur, "ff:00:00:00:00:05", alert_down=1, present_last_scan=1,
force_status="dont_force")
conn.commit()
insert_events(DummyDB(conn))
assert "ff:00:00:00:00:05" in _down_event_macs(cur), (
"dont_force device must still generate 'Device Down' when absent"
)
def test_forced_offline_still_fires_device_down(self):
"""devForceStatus='offline' suppresses nothing — event fires."""
conn = _make_db()
cur = conn.cursor()
_insert_device(cur, "ff:00:00:00:00:06", alert_down=1, present_last_scan=1,
force_status="offline")
conn.commit()
insert_events(DummyDB(conn))
assert "ff:00:00:00:00:06" in _down_event_macs(cur), (
"forced-offline device must still generate 'Device Down' when absent"
)