mirror of
https://github.com/jokob-sk/NetAlertX.git
synced 2026-04-06 18:21:46 -07:00
Add Fritz!Box device scanner plugin via TR-064 protocol
NetAlertX had no native support for discovering devices connected to Fritz!Box routers. Users relying on Fritz!Box as their primary home router had to use generic network scanning (ARP/ICMP), missing Fritz!Box-specific details like interface type (WiFi/LAN) and connection status per device. Changes: - Add plugin implementation (front/plugins/fritzbox/fritzbox.py) Queries all hosts via FritzHosts TR-064 service, normalizes MACs, maps interface types (802.11→WiFi, Ethernet→LAN), and writes results to CurrentScan via Plugin_Objects. Supports filtering to active-only devices and optional guest WiFi monitoring via a synthetic AP device with a deterministic locally-administered MAC (02:xx derived from Fritz!Box MAC via MD5). - Add plugin configuration (front/plugins/fritzbox/config.json) Defines plugin_type "device_scanner" with settings for host, port, credentials, guest WiFi reporting, and active-only filtering. Maps scan columns to CurrentScan fields (scanMac, scanLastIP, scanName, scanType). Default schedule: every 5 minutes. - Add plugin documentation (front/plugins/fritzbox/README.md) Covers TR-064 protocol basics, quick setup guide, all settings with defaults, troubleshooting for common issues (connection refused, auth failures, no devices found), and technical details. - Add fritzconnection>=1.15.1 dependency (requirements.txt) Required Python library for TR-064 communication with Fritz!Box. - Add test suite (test/plugins/test_fritzbox.py:1-298) 298 lines covering get_connected_devices (active filtering, MAC normalization, interface mapping, error resilience), check_guest_wifi_status (service detection, SSID-based guest detection, fallback behavior), and create_guest_wifi_device (deterministic MAC generation, locally-administered bit, fallback MAC, regression anchor with precomputed hash). Users can now scan Fritz!Box-connected devices natively, seeing per-device connection status and interface type directly in NetAlertX. Guest WiFi monitoring provides visibility into guest network state. The plugin defaults to HTTPS on port 49443 with active-only filtering enabled.
This commit is contained in:
273
front/plugins/fritzbox/fritzbox.py
Executable file
273
front/plugins/fritzbox/fritzbox.py
Executable file
@@ -0,0 +1,273 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
import hashlib
|
||||
import os
|
||||
import sys
|
||||
from pytz import timezone
|
||||
|
||||
# Define the installation path and extend the system path for plugin imports
|
||||
INSTALL_PATH = os.getenv('NETALERTX_APP', '/app')
|
||||
sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
|
||||
|
||||
from const import logPath # noqa: E402, E261 [flake8 lint suppression]
|
||||
from plugin_helper import Plugin_Objects, normalize_mac # noqa: E402, E261 [flake8 lint suppression]
|
||||
from logger import mylog, Logger # noqa: E402, E261 [flake8 lint suppression]
|
||||
from helper import get_setting_value # noqa: E402, E261 [flake8 lint suppression]
|
||||
|
||||
import conf # noqa: E402, E261 [flake8 lint suppression]
|
||||
|
||||
# Make sure the TIMEZONE for logging is correct
|
||||
conf.tz = timezone(get_setting_value('TIMEZONE'))
|
||||
|
||||
# Make sure log level is initialized correctly
|
||||
Logger(get_setting_value('LOG_LEVEL'))
|
||||
|
||||
pluginName = 'FRITZBOX'
|
||||
|
||||
INTERFACE_MAP = {
|
||||
'802.11': 'WiFi',
|
||||
'Ethernet': 'LAN',
|
||||
}
|
||||
|
||||
# Define the current path and log file paths
|
||||
LOG_PATH = logPath + '/plugins'
|
||||
LOG_FILE = os.path.join(LOG_PATH, f'script.{pluginName}.log')
|
||||
RESULT_FILE = os.path.join(LOG_PATH, f'last_result.{pluginName}.log')
|
||||
|
||||
# Initialize the Plugin obj output file
|
||||
plugin_objects = Plugin_Objects(RESULT_FILE)
|
||||
|
||||
|
||||
def get_fritzbox_connection(host, port, user, password, use_tls):
|
||||
"""
|
||||
Create FritzConnection with error handling.
|
||||
Returns: FritzConnection object or None on failure
|
||||
"""
|
||||
try:
|
||||
from fritzconnection import FritzConnection
|
||||
|
||||
mylog('verbose', [f'[{pluginName}] Attempting connection to {host}:{port} (TLS: {use_tls})'])
|
||||
|
||||
fc = FritzConnection(
|
||||
address=host,
|
||||
port=port,
|
||||
user=user,
|
||||
password=password,
|
||||
use_tls=use_tls,
|
||||
)
|
||||
|
||||
mylog('verbose', [f'[{pluginName}] Successfully connected to Fritz!Box'])
|
||||
mylog('verbose', [f'[{pluginName}] Model: {fc.modelname}, Software: {fc.system_version}'])
|
||||
|
||||
return fc
|
||||
except ImportError as e:
|
||||
mylog('none', [f'[{pluginName}] ⚠ ERROR: fritzconnection library not installed: {e}'])
|
||||
mylog('none', [f'[{pluginName}] Please install with: pip install fritzconnection'])
|
||||
return None
|
||||
except Exception as e:
|
||||
mylog('none', [f'[{pluginName}] ⚠ ERROR: Failed to connect to Fritz!Box: {e}'])
|
||||
mylog('none', [f'[{pluginName}] Check host ({host}), port ({port}), and credentials'])
|
||||
mylog('none', [f'[{pluginName}] Ensure TR-064 is enabled in Fritz!Box settings'])
|
||||
return None
|
||||
|
||||
|
||||
def get_connected_devices(fc, active_only):
|
||||
"""
|
||||
Query all hosts from Fritz!Box via FritzHosts service.
|
||||
Use get_hosts_info() for count, then get_generic_host_entry(index) for each.
|
||||
Filter by NewActive status if active_only=True.
|
||||
Returns: List of device dictionaries
|
||||
"""
|
||||
devices = []
|
||||
|
||||
try:
|
||||
from fritzconnection.lib.fritzhosts import FritzHosts
|
||||
|
||||
hosts = FritzHosts(fc)
|
||||
host_count = hosts.host_numbers
|
||||
|
||||
mylog('verbose', [f'[{pluginName}] Found {host_count} total hosts in Fritz!Box'])
|
||||
|
||||
for index in range(host_count):
|
||||
try:
|
||||
host_info = hosts.get_generic_host_entry(index)
|
||||
|
||||
# Extract relevant fields
|
||||
mac_address = host_info.get('NewMACAddress', '')
|
||||
ip_address = host_info.get('NewIPAddress', '')
|
||||
hostname = host_info.get('NewHostName', '')
|
||||
active = host_info.get('NewActive', 0)
|
||||
interface_type = host_info.get('NewInterfaceType', 'Unknown')
|
||||
|
||||
# Skip if active_only and device is not active
|
||||
if active_only and not active:
|
||||
continue
|
||||
|
||||
# Skip entries without MAC address
|
||||
if not mac_address:
|
||||
continue
|
||||
|
||||
# Normalize MAC address
|
||||
mac_address = normalize_mac(mac_address)
|
||||
|
||||
# Map interface type to readable format
|
||||
interface_display = interface_type
|
||||
for key, value in INTERFACE_MAP.items():
|
||||
if key in interface_type:
|
||||
interface_display = value
|
||||
break
|
||||
|
||||
# Build device dictionary
|
||||
device = {
|
||||
'mac_address': mac_address,
|
||||
'ip_address': ip_address if ip_address else '',
|
||||
'hostname': hostname if hostname else 'Unknown',
|
||||
'active_status': 'Active' if active else 'Inactive',
|
||||
'interface_type': interface_display
|
||||
}
|
||||
|
||||
devices.append(device)
|
||||
mylog('verbose', [f'[{pluginName}] Device: {mac_address} ({hostname}) - {ip_address} - {interface_display}'])
|
||||
|
||||
except Exception as e:
|
||||
mylog('minimal', [f'[{pluginName}] Warning: Failed to get host entry {index}: {e}'])
|
||||
continue
|
||||
|
||||
mylog('verbose', [f'[{pluginName}] Processed {len(devices)} devices'])
|
||||
|
||||
except ImportError as e:
|
||||
mylog('none', [f'[{pluginName}] ⚠ ERROR: fritzconnection library not properly installed: {e}'])
|
||||
except Exception as e:
|
||||
mylog('none', [f'[{pluginName}] ⚠ ERROR: Failed to query devices: {e}'])
|
||||
|
||||
return devices
|
||||
|
||||
|
||||
def check_guest_wifi_status(fc, guest_service_num):
|
||||
"""
|
||||
Query a specific WLANConfiguration service for guest network status.
|
||||
Returns: Dict with active status and interface info
|
||||
"""
|
||||
guest_info = {
|
||||
'active': False,
|
||||
'ssid': 'Guest WiFi',
|
||||
'interface': 'Guest Network'
|
||||
}
|
||||
|
||||
try:
|
||||
service = f'WLANConfiguration{guest_service_num}'
|
||||
result = fc.call_action(service, 'GetInfo')
|
||||
status = result.get('NewEnable', False)
|
||||
ssid = result.get('NewSSID', '')
|
||||
|
||||
if status:
|
||||
guest_info['active'] = True
|
||||
guest_info['ssid'] = ssid if ssid else 'Guest WiFi'
|
||||
mylog('verbose', [f'[{pluginName}] Guest WiFi active on service {guest_service_num}: {guest_info["ssid"]}'])
|
||||
else:
|
||||
mylog('verbose', [f'[{pluginName}] Guest WiFi service {guest_service_num} is disabled'])
|
||||
|
||||
except Exception as e:
|
||||
mylog('minimal', [f'[{pluginName}] Warning: Failed to query WLANConfiguration{guest_service_num}: {e}'])
|
||||
|
||||
return guest_info
|
||||
|
||||
|
||||
def create_guest_wifi_device(fc):
|
||||
"""
|
||||
Create a synthetic device entry for guest WiFi.
|
||||
Derives a locally-administered MAC (02:xx:xx:xx:xx:xx) from the Fritz!Box MAC.
|
||||
Returns: Device dictionary
|
||||
"""
|
||||
try:
|
||||
# Get Fritz!Box MAC address
|
||||
fritzbox_mac = fc.call_action('DeviceInfo:1', 'GetInfo').get('NewMACAddress', '')
|
||||
|
||||
if fritzbox_mac:
|
||||
# Derive a deterministic locally-administered MAC (02:xx:xx:xx:xx:xx).
|
||||
# The 02 prefix sets the locally-administered bit, ensuring no collision
|
||||
# with real OUI-assigned MACs. The remaining 5 bytes come from an MD5
|
||||
# hash of the Fritz!Box MAC so the guest MAC is stable across runs.
|
||||
digest = hashlib.md5(f'GUEST:{normalize_mac(fritzbox_mac)}'.encode()).digest()
|
||||
guest_mac = '02:' + ':'.join(f'{b:02x}' for b in digest[:5])
|
||||
else:
|
||||
# Fallback if we can't get Fritz!Box MAC
|
||||
guest_mac = '02:00:00:00:00:01'
|
||||
|
||||
device = {
|
||||
'mac_address': guest_mac,
|
||||
'ip_address': '',
|
||||
'hostname': 'Guest WiFi Network',
|
||||
'active_status': 'Active',
|
||||
'interface_type': 'Access Point'
|
||||
}
|
||||
|
||||
mylog('verbose', [f'[{pluginName}] Created guest WiFi device: {guest_mac}'])
|
||||
return device
|
||||
|
||||
except Exception as e:
|
||||
mylog('minimal', [f'[{pluginName}] Warning: Failed to create guest WiFi device: {e}'])
|
||||
return None
|
||||
|
||||
|
||||
def main():
|
||||
mylog('verbose', [f'[{pluginName}] In script'])
|
||||
|
||||
# Retrieve configuration settings
|
||||
host = get_setting_value('FRITZBOX_HOST')
|
||||
port = get_setting_value('FRITZBOX_PORT')
|
||||
user = get_setting_value('FRITZBOX_USER')
|
||||
password = get_setting_value('FRITZBOX_PASS')
|
||||
use_tls = get_setting_value('FRITZBOX_USE_TLS')
|
||||
report_guest = get_setting_value('FRITZBOX_REPORT_GUEST')
|
||||
guest_service = get_setting_value('FRITZBOX_GUEST_SERVICE')
|
||||
active_only = get_setting_value('FRITZBOX_ACTIVE_ONLY')
|
||||
|
||||
mylog('verbose', [f'[{pluginName}] Settings: host={host}, port={port}, use_tls={use_tls}, active_only={active_only}'])
|
||||
|
||||
# Create Fritz!Box connection
|
||||
fc = get_fritzbox_connection(host, port, user, password, use_tls)
|
||||
|
||||
if not fc:
|
||||
mylog('none', [f'[{pluginName}] ⚠ ERROR: Could not establish connection to Fritz!Box'])
|
||||
mylog('none', [f'[{pluginName}] Plugin will return empty results'])
|
||||
plugin_objects.write_result_file()
|
||||
return 1
|
||||
|
||||
# Retrieve device data
|
||||
device_data = get_connected_devices(fc, active_only)
|
||||
|
||||
# Check guest WiFi if enabled
|
||||
if report_guest:
|
||||
guest_status = check_guest_wifi_status(fc, guest_service)
|
||||
if guest_status['active']:
|
||||
guest_device = create_guest_wifi_device(fc)
|
||||
if guest_device:
|
||||
device_data.append(guest_device)
|
||||
|
||||
# Process the data into native application tables
|
||||
if device_data:
|
||||
for device in device_data:
|
||||
plugin_objects.add_object(
|
||||
primaryId=device['mac_address'],
|
||||
secondaryId=device['ip_address'],
|
||||
watched1=device['hostname'],
|
||||
watched2=device['active_status'],
|
||||
watched3=device['interface_type'],
|
||||
watched4='',
|
||||
extra='',
|
||||
foreignKey=device['mac_address']
|
||||
)
|
||||
|
||||
mylog('verbose', [f'[{pluginName}] Successfully processed {len(device_data)} devices'])
|
||||
else:
|
||||
mylog('minimal', [f'[{pluginName}] No devices found'])
|
||||
|
||||
# Log result
|
||||
plugin_objects.write_result_file()
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
sys.exit(main())
|
||||
Reference in New Issue
Block a user