mirror of
https://github.com/jokob-sk/NetAlertX.git
synced 2026-04-07 02:31:27 -07:00
The fritzconnection imports were originally placed inside the function bodies as a defensive pattern: by catching ImportError locally, get_fritzbox_connection() and get_connected_devices() could each return None or an empty list with a user-friendly log message instead of crashing at import time. This kept the plugin runnable even when the dependency was missing. Requested by reviewer jokob-sk in PR #1592: move all imports to the top of the module, treating fritzconnection as a required dependency that is assumed to be installed via requirements.txt. Changes: - Add top-level imports for FritzConnection and FritzHosts (fritzbox.py:16-17) - Remove inline import and ImportError handler from get_fritzbox_connection() (fritzbox.py:48, 64-67) - Remove inline import and ImportError handler from get_connected_devices() (fritzbox.py:79, 133-134) Functional behavior of the plugin is unchanged.
267 lines
9.5 KiB
Python
Executable File
267 lines
9.5 KiB
Python
Executable File
#!/usr/bin/env python
|
|
|
|
import hashlib
|
|
import os
|
|
import sys
|
|
from pytz import timezone
|
|
|
|
# Define the installation path and extend the system path for plugin imports
|
|
INSTALL_PATH = os.getenv('NETALERTX_APP', '/app')
|
|
sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
|
|
|
|
from const import logPath # noqa: E402, E261 [flake8 lint suppression]
|
|
from plugin_helper import Plugin_Objects, normalize_mac # noqa: E402, E261 [flake8 lint suppression]
|
|
from logger import mylog, Logger # noqa: E402, E261 [flake8 lint suppression]
|
|
from helper import get_setting_value # noqa: E402, E261 [flake8 lint suppression]
|
|
from fritzconnection import FritzConnection # noqa: E402, E261 [flake8 lint suppression]
|
|
from fritzconnection.lib.fritzhosts import FritzHosts # noqa: E402, E261 [flake8 lint suppression]
|
|
|
|
import conf # noqa: E402, E261 [flake8 lint suppression]
|
|
|
|
# Make sure the TIMEZONE for logging is correct
|
|
conf.tz = timezone(get_setting_value('TIMEZONE'))
|
|
|
|
# Make sure log level is initialized correctly
|
|
Logger(get_setting_value('LOG_LEVEL'))
|
|
|
|
pluginName = 'FRITZBOX'
|
|
|
|
INTERFACE_MAP = {
|
|
'802.11': 'WiFi',
|
|
'Ethernet': 'LAN',
|
|
}
|
|
|
|
# Define the current path and log file paths
|
|
LOG_PATH = logPath + '/plugins'
|
|
LOG_FILE = os.path.join(LOG_PATH, f'script.{pluginName}.log')
|
|
RESULT_FILE = os.path.join(LOG_PATH, f'last_result.{pluginName}.log')
|
|
|
|
# Initialize the Plugin obj output file
|
|
plugin_objects = Plugin_Objects(RESULT_FILE)
|
|
|
|
|
|
def get_fritzbox_connection(host, port, user, password, use_tls):
|
|
"""
|
|
Create FritzConnection with error handling.
|
|
Returns: FritzConnection object or None on failure
|
|
"""
|
|
try:
|
|
mylog('verbose', [f'[{pluginName}] Attempting connection to {host}:{port} (TLS: {use_tls})'])
|
|
|
|
fc = FritzConnection(
|
|
address=host,
|
|
port=port,
|
|
user=user,
|
|
password=password,
|
|
use_tls=use_tls,
|
|
timeout=10,
|
|
)
|
|
|
|
mylog('verbose', [f'[{pluginName}] Successfully connected to Fritz!Box'])
|
|
mylog('verbose', [f'[{pluginName}] Model: {fc.modelname}, Software: {fc.system_version}'])
|
|
|
|
return fc
|
|
except Exception as e:
|
|
mylog('none', [f'[{pluginName}] ⚠ ERROR: Failed to connect to Fritz!Box: {e}'])
|
|
mylog('none', [f'[{pluginName}] Check host ({host}), port ({port}), and credentials'])
|
|
mylog('none', [f'[{pluginName}] Ensure TR-064 is enabled in Fritz!Box settings'])
|
|
return None
|
|
|
|
|
|
def get_connected_devices(fc, active_only):
|
|
"""
|
|
Query all hosts from Fritz!Box via FritzHosts service.
|
|
Use get_hosts_info() for count, then get_generic_host_entry(index) for each.
|
|
Filter by NewActive status if active_only=True.
|
|
Returns: List of device dictionaries
|
|
"""
|
|
devices = []
|
|
|
|
try:
|
|
hosts = FritzHosts(fc)
|
|
host_count = hosts.host_numbers
|
|
|
|
mylog('verbose', [f'[{pluginName}] Found {host_count} total hosts in Fritz!Box'])
|
|
|
|
for index in range(host_count):
|
|
try:
|
|
host_info = hosts.get_generic_host_entry(index)
|
|
|
|
# Extract relevant fields
|
|
mac_address = host_info.get('NewMACAddress', '')
|
|
ip_address = host_info.get('NewIPAddress', '')
|
|
hostname = host_info.get('NewHostName', '')
|
|
active = host_info.get('NewActive', 0)
|
|
interface_type = host_info.get('NewInterfaceType', 'Unknown')
|
|
|
|
# Skip if active_only and device is not active
|
|
if active_only and not active:
|
|
continue
|
|
|
|
# Skip entries without MAC address
|
|
if not mac_address:
|
|
continue
|
|
|
|
# Normalize MAC address
|
|
mac_address = normalize_mac(mac_address)
|
|
|
|
# Map interface type to readable format
|
|
interface_display = interface_type
|
|
for key, value in INTERFACE_MAP.items():
|
|
if key in interface_type:
|
|
interface_display = value
|
|
break
|
|
|
|
# Build device dictionary
|
|
device = {
|
|
'mac_address': mac_address,
|
|
'ip_address': ip_address if ip_address else '',
|
|
'hostname': hostname if hostname else 'Unknown',
|
|
'active_status': 'Active' if active else 'Inactive',
|
|
'interface_type': interface_display
|
|
}
|
|
|
|
devices.append(device)
|
|
mylog('verbose', [f'[{pluginName}] Device: {mac_address} ({hostname}) - {ip_address} - {interface_display}'])
|
|
|
|
except Exception as e:
|
|
mylog('minimal', [f'[{pluginName}] Warning: Failed to get host entry {index}: {e}'])
|
|
continue
|
|
|
|
mylog('verbose', [f'[{pluginName}] Processed {len(devices)} devices'])
|
|
|
|
except Exception as e:
|
|
mylog('none', [f'[{pluginName}] ⚠ ERROR: Failed to query devices: {e}'])
|
|
|
|
return devices
|
|
|
|
|
|
def check_guest_wifi_status(fc, guest_service_num):
|
|
"""
|
|
Query a specific WLANConfiguration service for guest network status.
|
|
Returns: Dict with active status and interface info
|
|
"""
|
|
guest_info = {
|
|
'active': False,
|
|
'ssid': 'Guest WiFi',
|
|
'interface': 'Guest Network'
|
|
}
|
|
|
|
try:
|
|
service = f'WLANConfiguration{guest_service_num}'
|
|
result = fc.call_action(service, 'GetInfo')
|
|
status = result.get('NewEnable', False)
|
|
ssid = result.get('NewSSID', '')
|
|
|
|
if status:
|
|
guest_info['active'] = True
|
|
guest_info['ssid'] = ssid if ssid else 'Guest WiFi'
|
|
mylog('verbose', [f'[{pluginName}] Guest WiFi active on service {guest_service_num}: {guest_info["ssid"]}'])
|
|
else:
|
|
mylog('verbose', [f'[{pluginName}] Guest WiFi service {guest_service_num} is disabled'])
|
|
|
|
except Exception as e:
|
|
mylog('minimal', [f'[{pluginName}] Warning: Failed to query WLANConfiguration{guest_service_num}: {e}'])
|
|
|
|
return guest_info
|
|
|
|
|
|
def create_guest_wifi_device(fc):
|
|
"""
|
|
Create a synthetic device entry for guest WiFi.
|
|
Derives a locally-administered MAC (02:xx:xx:xx:xx:xx) from the Fritz!Box MAC.
|
|
Returns: Device dictionary
|
|
"""
|
|
try:
|
|
# Get Fritz!Box MAC address
|
|
fritzbox_mac = fc.call_action('DeviceInfo:1', 'GetInfo').get('NewMACAddress', '')
|
|
|
|
if fritzbox_mac:
|
|
# Derive a deterministic locally-administered MAC (02:xx:xx:xx:xx:xx).
|
|
# The 02 prefix sets the locally-administered bit, ensuring no collision
|
|
# with real OUI-assigned MACs. The remaining 5 bytes come from an MD5
|
|
# hash of the Fritz!Box MAC so the guest MAC is stable across runs.
|
|
digest = hashlib.md5(f'GUEST:{normalize_mac(fritzbox_mac)}'.encode(), usedforsecurity=False).digest()
|
|
guest_mac = '02:' + ':'.join(f'{b:02x}' for b in digest[:5])
|
|
else:
|
|
# Fallback if we can't get Fritz!Box MAC
|
|
guest_mac = '02:00:00:00:00:01'
|
|
|
|
device = {
|
|
'mac_address': guest_mac,
|
|
'ip_address': '',
|
|
'hostname': 'Guest WiFi Network',
|
|
'active_status': 'Active',
|
|
'interface_type': 'Access Point'
|
|
}
|
|
|
|
mylog('verbose', [f'[{pluginName}] Created guest WiFi device: {guest_mac}'])
|
|
return device
|
|
|
|
except Exception as e:
|
|
mylog('minimal', [f'[{pluginName}] Warning: Failed to create guest WiFi device: {e}'])
|
|
return None
|
|
|
|
|
|
def main():
|
|
mylog('verbose', [f'[{pluginName}] In script'])
|
|
|
|
# Retrieve configuration settings
|
|
host = get_setting_value('FRITZBOX_HOST')
|
|
port = get_setting_value('FRITZBOX_PORT')
|
|
user = get_setting_value('FRITZBOX_USER')
|
|
password = get_setting_value('FRITZBOX_PASS')
|
|
use_tls = get_setting_value('FRITZBOX_USE_TLS')
|
|
report_guest = get_setting_value('FRITZBOX_REPORT_GUEST')
|
|
guest_service = get_setting_value('FRITZBOX_GUEST_SERVICE')
|
|
active_only = get_setting_value('FRITZBOX_ACTIVE_ONLY')
|
|
|
|
mylog('verbose', [f'[{pluginName}] Settings: host={host}, port={port}, use_tls={use_tls}, active_only={active_only}'])
|
|
|
|
# Create Fritz!Box connection
|
|
fc = get_fritzbox_connection(host, port, user, password, use_tls)
|
|
|
|
if not fc:
|
|
mylog('none', [f'[{pluginName}] ⚠ ERROR: Could not establish connection to Fritz!Box'])
|
|
mylog('none', [f'[{pluginName}] Plugin will return empty results'])
|
|
plugin_objects.write_result_file()
|
|
return 1
|
|
|
|
# Retrieve device data
|
|
device_data = get_connected_devices(fc, active_only)
|
|
|
|
# Check guest WiFi if enabled
|
|
if report_guest:
|
|
guest_status = check_guest_wifi_status(fc, guest_service)
|
|
if guest_status['active']:
|
|
guest_device = create_guest_wifi_device(fc)
|
|
if guest_device:
|
|
device_data.append(guest_device)
|
|
|
|
# Process the data into native application tables
|
|
if device_data:
|
|
for device in device_data:
|
|
plugin_objects.add_object(
|
|
primaryId=device['mac_address'],
|
|
secondaryId=device['ip_address'],
|
|
watched1=device['hostname'],
|
|
watched2=device['active_status'],
|
|
watched3=device['interface_type'],
|
|
watched4='',
|
|
extra='',
|
|
foreignKey=device['mac_address']
|
|
)
|
|
|
|
mylog('verbose', [f'[{pluginName}] Successfully processed {len(device_data)} devices'])
|
|
else:
|
|
mylog('minimal', [f'[{pluginName}] No devices found'])
|
|
|
|
# Log result
|
|
plugin_objects.write_result_file()
|
|
|
|
return 0
|
|
|
|
|
|
if __name__ == '__main__':
|
|
sys.exit(main())
|