Merge pull request #1604 from netalertx/next_release
Some checks are pending
✅ Code checks / check-url-paths (push) Waiting to run
✅ Code checks / lint (push) Waiting to run
✅ Code checks / docker-tests (push) Waiting to run
🐳 👩‍💻 docker dev / docker_dev (push) Waiting to run
📘 Deploy MkDocs / deploy (push) Waiting to run

Next release
This commit is contained in:
Jokob @NetAlertX
2026-04-11 08:45:33 +10:00
committed by GitHub
5 changed files with 157 additions and 36 deletions

View File

@@ -166,16 +166,16 @@ def check_guest_wifi_status(fc, guest_service_num):
return guest_info
def create_guest_wifi_device(fc, host):
def create_guest_wifi_device(fc):
"""
Create a synthetic device entry for guest WiFi.
Derives a deterministic fake MAC from the Fritz!Box hardware MAC address.
Falls back to the configured host string if the MAC cannot be retrieved.
Falls back to a fixed sentinel string if the MAC cannot be retrieved.
Returns: Device dictionary
"""
try:
fritzbox_mac = fc.call_action('DeviceInfo:1', 'GetInfo').get('NewMACAddress', '')
guest_mac = string_to_fake_mac(normalize_mac(fritzbox_mac) if fritzbox_mac else host)
guest_mac = string_to_fake_mac(normalize_mac(fritzbox_mac) if fritzbox_mac else 'FRITZBOX_GUEST')
device = {
'mac_address': guest_mac,
@@ -224,7 +224,7 @@ def main():
if report_guest:
guest_status = check_guest_wifi_status(fc, guest_service)
if guest_status['active']:
guest_device = create_guest_wifi_device(fc, host)
guest_device = create_guest_wifi_device(fc)
if guest_device:
device_data.append(guest_device)

View File

@@ -20,6 +20,10 @@ from messaging.reporting import skip_repeated_notifications
from messaging.in_app import update_unread_notifications_count
from const import NULL_EQUIVALENTS_SQL
# Predicate used in every negative-event INSERT to skip forced-online devices.
# Centralised here so all three event paths stay in sync.
_SQL_NOT_FORCED_ONLINE = "LOWER(COALESCE(devForceStatus, '')) != 'online'"
# Make sure log level is initialized correctly
Logger(get_setting_value("LOG_LEVEL"))
@@ -179,6 +183,7 @@ def insert_events(db):
WHERE devAlertDown != 0
AND devCanSleep = 0
AND devPresentLastScan = 1
AND {_SQL_NOT_FORCED_ONLINE}
AND NOT EXISTS (SELECT 1 FROM CurrentScan
WHERE devMac = scanMac
) """)
@@ -194,6 +199,7 @@ def insert_events(db):
AND devCanSleep = 1
AND devIsSleeping = 0
AND devPresentLastScan = 0
AND {_SQL_NOT_FORCED_ONLINE}
AND NOT EXISTS (SELECT 1 FROM CurrentScan
WHERE devMac = scanMac)
AND NOT EXISTS (SELECT 1 FROM Events
@@ -229,6 +235,7 @@ def insert_events(db):
FROM Devices
WHERE devAlertDown = 0
AND devPresentLastScan = 1
AND {_SQL_NOT_FORCED_ONLINE}
AND NOT EXISTS (SELECT 1 FROM CurrentScan
WHERE devMac = scanMac
) """)

View File

@@ -171,6 +171,7 @@ def insert_device(
can_sleep: int = 0,
last_connection: str | None = None,
last_ip: str = "192.168.1.1",
force_status: str | None = None,
) -> None:
"""
Insert a minimal Devices row.
@@ -189,16 +190,19 @@ def insert_device(
ISO-8601 UTC string; defaults to 60 minutes ago when omitted.
last_ip:
Value stored in devLastIP.
force_status:
Value for devForceStatus (``'online'``, ``'offline'``, or ``None``/
``'dont_force'``).
"""
cur.execute(
"""
INSERT INTO Devices
(devMac, devAlertDown, devPresentLastScan, devCanSleep,
devLastConnection, devLastIP, devIsArchived, devIsNew)
VALUES (?, ?, ?, ?, ?, ?, 0, 0)
devLastConnection, devLastIP, devIsArchived, devIsNew, devForceStatus)
VALUES (?, ?, ?, ?, ?, ?, 0, 0, ?)
""",
(mac, alert_down, present_last_scan, can_sleep,
last_connection or minutes_ago(60), last_ip),
last_connection or minutes_ago(60), last_ip, force_status),
)

View File

@@ -7,11 +7,12 @@ first import so no live config reads, log files, or result files are
created during tests.
"""
import hashlib
import sys
import os
from unittest.mock import patch, MagicMock
from utils.crypto_utils import string_to_fake_mac
import pytest
# ---------------------------------------------------------------------------
@@ -59,19 +60,12 @@ def _make_host_entry(mac="AA:BB:CC:DD:EE:FF", ip="192.168.1.10",
@pytest.fixture
def mock_fritz_hosts():
"""
Patches fritzconnection.lib.fritzhosts in sys.modules so that
fritzbox.get_connected_devices() uses a controllable FritzHosts mock.
Yields the FritzHosts *instance* (what FritzHosts(fc) returns).
Patches fritzbox.FritzHosts so that get_connected_devices() uses a
controllable mock. Yields the FritzHosts *instance* (what FritzHosts(fc)
returns).
"""
hosts_instance = MagicMock()
fritz_hosts_module = MagicMock()
fritz_hosts_module.FritzHosts = MagicMock(return_value=hosts_instance)
with patch.dict("sys.modules", {
"fritzconnection": MagicMock(),
"fritzconnection.lib": MagicMock(),
"fritzconnection.lib.fritzhosts": fritz_hosts_module,
}):
with patch("fritzbox.FritzHosts", return_value=hosts_instance):
yield hosts_instance
@@ -229,12 +223,15 @@ class TestCreateGuestWifiDevice:
assert device["active_status"] == "Active"
assert device["interface_type"] == "Access Point"
assert device["ip_address"] == ""
# MAC must match string_to_fake_mac output (fa:ce: prefix)
assert device["mac_address"].startswith("fa:ce:")
def test_guest_mac_has_locally_administered_bit(self):
"""First byte must be 0x02 — locally-administered, unicast."""
"""The locally-administered bit (0x02) must be set in the first byte.
string_to_fake_mac uses the 'fa:ce:' prefix; 0xFA & 0x02 == 0x02."""
device = fritzbox.create_guest_wifi_device(self._fc_with_mac("AA:BB:CC:DD:EE:FF"))
first_byte = int(device["mac_address"].split(":")[0], 16)
assert first_byte == 0x02
assert first_byte & 0x02 != 0
def test_guest_mac_format_is_valid(self):
"""MAC must be 6 colon-separated lowercase hex pairs."""
@@ -258,11 +255,11 @@ class TestCreateGuestWifiDevice:
assert mac_a != mac_b
def test_no_fritzbox_mac_uses_fallback(self):
"""When DeviceInfo returns no MAC, fall back to 02:00:00:00:00:01."""
"""When DeviceInfo returns no MAC, fall back to a sentinel-derived MAC."""
fc = MagicMock()
fc.call_action.return_value = {"NewMACAddress": ""}
device = fritzbox.create_guest_wifi_device(fc)
assert device["mac_address"] == "02:00:00:00:00:01"
assert device["mac_address"] == string_to_fake_mac("FRITZBOX_GUEST")
def test_device_info_exception_returns_none(self):
"""If DeviceInfo call raises, create_guest_wifi_device must return None."""
@@ -274,12 +271,11 @@ class TestCreateGuestWifiDevice:
def test_known_mac_produces_known_guest_mac(self):
"""
Regression anchor: for a fixed Fritz!Box MAC, the expected guest MAC
is precomputed here independently. If the hashing logic in
fritzbox.py changes, this test fails immediately.
is derived via string_to_fake_mac(normalize_mac(...)). If the hashing
logic in fritzbox.py or string_to_fake_mac changes, this test fails.
"""
fritzbox_mac = "aa:bb:cc:dd:ee:ff" # normalize_mac output of "AA:BB:CC:DD:EE:FF"
digest = hashlib.md5(f"GUEST:{fritzbox_mac}".encode()).digest()
expected = "02:" + ":".join(f"{b:02x}" for b in digest[:5])
fritzbox_mac = normalize_mac("AA:BB:CC:DD:EE:FF")
expected = string_to_fake_mac(fritzbox_mac)
device = fritzbox.create_guest_wifi_device(self._fc_with_mac("AA:BB:CC:DD:EE:FF"))
assert device["mac_address"] == expected
@@ -296,10 +292,8 @@ class TestGetFritzboxConnection:
fc_instance.modelname = "FRITZ!Box 7590"
fc_instance.system_version = "7.57"
fc_class = MagicMock(return_value=fc_instance)
fc_module = MagicMock()
fc_module.FritzConnection = fc_class
with patch.dict("sys.modules", {"fritzconnection": fc_module}):
with patch("fritzbox.FritzConnection", fc_class):
result = fritzbox.get_fritzbox_connection("fritz.box", 49443, "admin", "pass", True)
assert result is fc_instance
@@ -308,16 +302,13 @@ class TestGetFritzboxConnection:
)
def test_import_error_returns_none(self):
with patch.dict("sys.modules", {"fritzconnection": None}):
with patch("fritzbox.FritzConnection", side_effect=ImportError("fritzconnection not found")):
result = fritzbox.get_fritzbox_connection("fritz.box", 49443, "admin", "pass", True)
assert result is None
def test_connection_exception_returns_none(self):
fc_module = MagicMock()
fc_module.FritzConnection.side_effect = Exception("Connection refused")
with patch.dict("sys.modules", {"fritzconnection": fc_module}):
with patch("fritzbox.FritzConnection", side_effect=Exception("Connection refused")):
result = fritzbox.get_fritzbox_connection("fritz.box", 49443, "admin", "pass", True)
assert result is None

View File

@@ -444,3 +444,122 @@ class TestDownCountSleepingSuppression:
assert count == 1, (
f"Expected 1 down device (sleeping device must not be counted), got {count}"
)
# ---------------------------------------------------------------------------
# Layer 1c: insert_events() — forced-online device suppression
#
# Devices with devForceStatus='online' are always considered present by the
# operator. Generating 'Device Down' or 'Disconnected' events for them causes
# spurious flapping detection (devFlapping counts these events in DevicesView).
#
# Affected queries in insert_events():
# 1a Device Down (non-sleeping) — DevicesView query
# 1b Device Down (sleep-expired) — DevicesView query
# 3 Disconnected — Devices table query
# ---------------------------------------------------------------------------
class TestInsertEventsForceOnline:
"""
Regression tests: forced-online devices must never generate
'Device Down' or 'Disconnected' events.
"""
def test_forced_online_no_device_down_event(self):
"""
devForceStatus='online', devAlertDown=1, absent from CurrentScan.
Must NOT produce a 'Device Down' event (regression: used to fire and
cause devFlapping=1 after the threshold was reached).
"""
conn = _make_db()
cur = conn.cursor()
_insert_device(cur, "ff:00:00:00:00:01", alert_down=1, present_last_scan=1,
force_status="online")
conn.commit()
insert_events(DummyDB(conn))
assert "ff:00:00:00:00:01" not in _down_event_macs(cur), (
"forced-online device must never generate a 'Device Down' event"
)
def test_forced_online_sleep_expired_no_device_down_event(self):
"""
devForceStatus='online', devCanSleep=1, sleep window expired.
Must NOT produce a 'Device Down' event via the sleep-expired path.
"""
conn = _make_db(sleep_minutes=30)
cur = conn.cursor()
_insert_device(cur, "ff:00:00:00:00:02", alert_down=1, present_last_scan=0,
can_sleep=1, last_connection=_minutes_ago(45),
force_status="online")
conn.commit()
insert_events(DummyDB(conn))
assert "ff:00:00:00:00:02" not in _down_event_macs(cur), (
"forced-online sleeping device must not get 'Device Down' after sleep expires"
)
def test_forced_online_no_disconnected_event(self):
"""
devForceStatus='online', devAlertDown=0 (Disconnected path), absent.
Must NOT produce a 'Disconnected' event.
"""
conn = _make_db()
cur = conn.cursor()
_insert_device(cur, "ff:00:00:00:00:03", alert_down=0, present_last_scan=1,
force_status="online")
conn.commit()
insert_events(DummyDB(conn))
cur.execute(
"SELECT COUNT(*) AS cnt FROM Events "
"WHERE eveMac = 'ff:00:00:00:00:03' AND eveEventType = 'Disconnected'"
)
assert cur.fetchone()["cnt"] == 0, (
"forced-online device must never generate a 'Disconnected' event"
)
def test_forced_online_uppercase_no_device_down_event(self):
"""devForceStatus='ONLINE' (uppercase) must also be suppressed."""
conn = _make_db()
cur = conn.cursor()
_insert_device(cur, "ff:00:00:00:00:04", alert_down=1, present_last_scan=1,
force_status="ONLINE")
conn.commit()
insert_events(DummyDB(conn))
assert "ff:00:00:00:00:04" not in _down_event_macs(cur), (
"forced-online device (uppercase) must never generate a 'Device Down' event"
)
def test_dont_force_still_fires_device_down(self):
"""devForceStatus='dont_force' must behave normally — event fires."""
conn = _make_db()
cur = conn.cursor()
_insert_device(cur, "ff:00:00:00:00:05", alert_down=1, present_last_scan=1,
force_status="dont_force")
conn.commit()
insert_events(DummyDB(conn))
assert "ff:00:00:00:00:05" in _down_event_macs(cur), (
"dont_force device must still generate 'Device Down' when absent"
)
def test_forced_offline_still_fires_device_down(self):
"""devForceStatus='offline' suppresses nothing — event fires."""
conn = _make_db()
cur = conn.cursor()
_insert_device(cur, "ff:00:00:00:00:06", alert_down=1, present_last_scan=1,
force_status="offline")
conn.commit()
insert_events(DummyDB(conn))
assert "ff:00:00:00:00:06" in _down_event_macs(cur), (
"forced-offline device must still generate 'Device Down' when absent"
)