Fixes and tidy-ups

Some Flak8 fixes, some adjustments to logging levels, ie warnings and errors
This commit is contained in:
Ingo Ratsdorf
2025-09-13 18:19:10 +12:00
parent 1601c10025
commit e6daa33bca

View File

@@ -1,34 +1,29 @@
#!/usr/bin/env python
import json
import subprocess
import argparse
import os
import pathlib
import sys
from datetime import datetime
import time
import re
import unicodedata
import paho.mqtt.client as mqtt
# from paho.mqtt import client as mqtt_client
# from paho.mqtt import CallbackAPIVersion as mqtt_CallbackAPIVersion
import hashlib
import sqlite3
# Register NetAlertX directories
INSTALL_PATH="/app"
INSTALL_PATH = "/app"
sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
# NetAlertX modules
import conf
from const import apiPath, confFileName, logPath
from const import confFileName, logPath
from plugin_utils import getPluginObject
from plugin_helper import Plugin_Objects
from logger import mylog, Logger, append_line_to_file
from helper import timeNowTZ, get_setting_value, bytes_to_string, sanitize_string, normalize_string
from models.notification_instance import NotificationInstance
from logger import mylog, Logger
from helper import timeNowTZ, get_setting_value, bytes_to_string, \
sanitize_string, normalize_string
from database import DB, get_device_stats
from pytz import timezone
@@ -49,20 +44,22 @@ plugin_objects = Plugin_Objects(RESULT_FILE)
md5_hash = hashlib.md5()
# globals
mqtt_sensors = []
mqtt_connected_to_broker = False
mqtt_client = None # mqtt client
topic_root = get_setting_value('MQTT_topic_root')
def main():
mylog('verbose', [f'[{pluginName}](publisher) In script'])
mylog('verbose', [f'[{pluginName}](publisher) In script'])
# Check if basic config settings supplied
if check_config() == False:
mylog('verbose', [f'[{pluginName}] ⚠ ERROR: Publisher notification gateway not set up correctly. Check your {confFileName} {pluginName}_* variables.'])
if not check_config():
mylog('verbose', [f'[{pluginName}] ⚠ ERROR: Publisher notification \
gateway not set up correctly. Check your {confFileName} \
{pluginName}_* variables.'])
return
# Create a database connection
@@ -74,56 +71,80 @@ def main():
plugin_objects.write_result_file()
#-------------------------------------------------------------------------------
# -----------------------------------------------------------------------------
# MQTT
#-------------------------------------------------------------------------------
#-------------------------------------------------------------------------------
# -----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
def check_config():
if get_setting_value('MQTT_BROKER') == '' or get_setting_value('MQTT_PORT') == '' or get_setting_value('MQTT_USER') == '' or get_setting_value('MQTT_PASSWORD') == '':
mylog('verbose', [f'[Check Config] ⚠ ERROR: MQTT service not set up correctly. Check your {confFileName} MQTT_* variables.'])
return False
else:
return True
"""
Checks whether the MQTT configuration settings are properly set.
Returns:
bool: True if all required MQTT settings
('MQTT_BROKER', 'MQTT_PORT', 'MQTT_USER', 'MQTT_PASSWORD')
are non-empty;
False otherwise. Logs a verbose error message
if any setting is missing.
"""
if get_setting_value('MQTT_BROKER') == '' \
or get_setting_value('MQTT_PORT') == '' \
or get_setting_value('MQTT_USER') == '' \
or get_setting_value('MQTT_PASSWORD') == '':
mylog('verbose', [f'[Check Config] ⚠ ERROR: MQTT service not set up \
correctly. Check your {confFileName} MQTT_* variables.'])
return False
else:
return True
#-------------------------------------------------------------------------------
# Sensor configs are tracking which sensors in NetAlertX exist and if a config has changed
# -----------------------------------------------------------------------------
# Sensor configs are tracking which sensors in NetAlertX exist
# and if a config has changed
class sensor_config:
def __init__(self, deviceId, deviceName, sensorType, sensorName, icon, mac):
def __init__(self,
deviceId,
deviceName,
sensorType,
sensorName,
icon,
mac):
"""
Initialize the sensor_config object with provided parameters. Sets up sensor configuration
and generates necessary MQTT topics and messages based on the sensor type.
Initialize the sensor_config object with provided parameters.
Sets up sensor configuration and generates necessary MQTT topics
and messages based on the sensor type.
"""
# Assign initial attributes
self.deviceId = deviceId
self.deviceName = deviceName
self.sensorType = sensorType
self.sensorName = sensorName
self.icon = icon
self.icon = icon
self.mac = mac
self.model = deviceName
self.hash = ''
self.model = deviceName
self.hash = ''
self.state_topic = ''
self.json_attr_topic = ''
self.topic = ''
self.message = {} # Initialize message as an empty dictionary
self.unique_id = ''
# Call helper functions to initialize the message, generate a hash, and handle plugin object
# Call helper functions to initialize the message, generate a hash,
# and handle plugin object
self.initialize_message()
self.generate_hash()
self.handle_plugin_object()
def initialize_message(self):
"""
Initialize the MQTT message payload based on the sensor type. This method handles sensors of types:
Initialize the MQTT message payload based on the sensor type.
This method handles sensors of types:
- 'timestamp'
- 'binary_sensor'
- 'sensor'
- 'device_tracker'
"""
# Ensure self.message is initialized as a dictionary if not already done
# Ensure self.message is initialized as a dictionary
# if not already done
if not isinstance(self.message, dict):
self.message = {}
@@ -153,7 +174,6 @@ class sensor_config:
"icon": f'mdi:{self.icon}'
})
# Handle 'device_tracker' sensor type
elif self.sensorType == 'device_tracker':
self.topic = f'homeassistant/device_tracker/{self.deviceId}/config'
@@ -229,25 +249,36 @@ class sensor_config:
)
#-------------------------------------------------------------------------------
# -------------------------------------------------------------------------------
def publish_mqtt(mqtt_client, topic, message):
"""
Publishes a message to an MQTT topic using the provided MQTT client.
If the message is not a string, it is converted to a JSON-formatted string.
The function retrieves the desired QoS level from settings and logs the publishing process.
If the client is not connected to the broker, the function logs an error and aborts.
It attempts to publish the message, retrying until the publish status indicates success.
Args:
mqtt_client: The MQTT client instance used to publish the message.
topic (str): The MQTT topic to publish to.
message (Any): The message payload to send. Non-string messages are converted to JSON.
Returns:
bool: True if the message was published successfully, False if not connected to the broker.
"""
status = 1
# convert anything but a simple string to json
if not isinstance(message, str):
message = json.dumps(message).replace("'",'"')
message = json.dumps(message).replace("'", '"')
qos = get_setting_value('MQTT_QOS')
mylog('verbose', [f"[{pluginName}] Sending MQTT topic: {topic}"])
mylog('verbose', [f"[{pluginName}] Sending MQTT message: {message}"])
mylog('debug', [f"[{pluginName}] Sending MQTT topic: {topic}"])
mylog('debug', [f"[{pluginName}] Sending MQTT message: {message}"])
# mylog('verbose', [f"[{pluginName}] get_setting_value('MQTT_QOS'): {qos}"])
if mqtt_connected_to_broker == False:
mylog('verbose', [f"[{pluginName}] ⚠ ERROR: Not connected to broker, aborting."])
if not mqtt_connected_to_broker:
mylog('minimal', [f"[{pluginName}] ⚠ ERROR: Not connected to broker, aborting."])
return False
while status != 0:
@@ -267,45 +298,46 @@ def publish_mqtt(mqtt_client, topic, message):
# mylog('verbose', [f"[{pluginName}] status: {status}"])
# mylog('verbose', [f"[{pluginName}] result: {result}"])
if status != 0:
mylog('verbose', [f"[{pluginName}] Waiting to reconnect to MQTT broker"])
time.sleep(0.1)
if status != 0:
mylog('debug', [f"[{pluginName}] Waiting to reconnect to MQTT broker"])
time.sleep(0.1)
return True
#-------------------------------------------------------------------------------
# ------------------------------------------------------------------------------
# Create a generic device for overal stats
def create_generic_device(mqtt_client, deviceId, deviceName):
create_sensor(mqtt_client, deviceId, deviceName, 'sensor', 'online', 'wifi-check')
create_sensor(mqtt_client, deviceId, deviceName, 'sensor', 'down', 'wifi-cancel')
def create_generic_device(mqtt_client, deviceId, deviceName):
create_sensor(mqtt_client, deviceId, deviceName, 'sensor', 'online', 'wifi-check')
create_sensor(mqtt_client, deviceId, deviceName, 'sensor', 'down', 'wifi-cancel')
create_sensor(mqtt_client, deviceId, deviceName, 'sensor', 'all', 'wifi')
create_sensor(mqtt_client, deviceId, deviceName, 'sensor', 'archived', 'wifi-lock')
create_sensor(mqtt_client, deviceId, deviceName, 'sensor', 'new', 'wifi-plus')
create_sensor(mqtt_client, deviceId, deviceName, 'sensor', 'unknown', 'wifi-alert')
#-------------------------------------------------------------------------------
# ------------------------------------------------------------------------------
# Register sensor config on the broker
def create_sensor(mqtt_client, deviceId, deviceName, sensorType, sensorName, icon, mac=""):
global mqtt_sensors
def create_sensor(mqtt_client, deviceId, deviceName, sensorType, sensorName, icon, mac=""):
global mqtt_sensors
# check previous configs
sensorConfig = sensor_config(deviceId, deviceName, sensorType, sensorName, icon, mac)
sensorConfig = sensor_config(deviceId, deviceName, sensorType, sensorName, icon, mac)
# send if new
if sensorConfig.isNew:
# Create the HA sensor config if a new device is discovered
if sensorConfig.isNew:
# add the sensor to the global list to keep track of succesfully added sensors
if publish_mqtt(mqtt_client, sensorConfig.topic, sensorConfig.message):
# hack - delay adding to the queue in case the process is
time.sleep(get_setting_value('MQTT_DELAY_SEC')) # restarted and previous publish processes aborted
# (it takes ~2s to update a sensor config on the broker)
mqtt_sensors.append(sensorConfig)
if publish_mqtt(mqtt_client, sensorConfig.topic, sensorConfig.message):
# hack - delay adding to the queue in case the process is
# restarted and previous publish processes aborted
# (it takes ~2s to update a sensor config on the broker)
time.sleep(get_setting_value('MQTT_DELAY_SEC'))
mqtt_sensors.append(sensorConfig)
return sensorConfig
#-------------------------------------------------------------------------------
# -----------------------------------------------------------------------------
def mqtt_create_client():
# attempt reconnections on failure, ref https://www.emqx.com/en/blog/how-to-use-mqtt-in-python
@@ -313,11 +345,11 @@ def mqtt_create_client():
RECONNECT_RATE = 2
MAX_RECONNECT_COUNT = 12
MAX_RECONNECT_DELAY = 60
mytransport = 'tcp' # or 'websockets'
mytransport = 'tcp' # or 'websockets'
def on_disconnect(mqtt_client, userdata, rc):
global mqtt_connected_to_broker
mylog('verbose', [f"[{pluginName}] Connection terminated, reason_code: {rc}"])
@@ -328,7 +360,7 @@ def mqtt_create_client():
try:
mqtt_client.reconnect()
mqtt_connected_to_broker = True # Signal connection
mqtt_connected_to_broker = True # Signal connection
mylog('verbose', [f"[{pluginName}] Reconnected successfully"])
return
except Exception as err:
@@ -338,19 +370,18 @@ def mqtt_create_client():
reconnect_delay *= RECONNECT_RATE
reconnect_delay = min(reconnect_delay, MAX_RECONNECT_DELAY)
reconnect_count += 1
mqtt_connected_to_broker = False
def on_connect(mqtt_client, userdata, flags, rc, properties):
global mqtt_connected_to_broker
# REF: Good docu on reason codes: https://www.emqx.com/en/blog/mqtt5-new-features-reason-code-and-ack
if rc == 0:
mylog('verbose', [f"[{pluginName}] Connected to broker"])
mqtt_connected_to_broker = True # Signal connection
else:
if rc == 0:
mylog('verbose', [f"[{pluginName}] Connected to broker"])
mqtt_connected_to_broker = True # Signal connection
else:
mylog('verbose', [f"[{pluginName}] Connection failed, reason_code: {rc}"])
mqtt_connected_to_broker = False
@@ -367,10 +398,12 @@ def mqtt_create_client():
version = mqtt.MQTTv5
# we now hardcode the client id into here.
# TODO: Add config ffor client id
# TODO: Add config for client id (atm, we use a fixed client id,
# so only one instance of NetAlertX can connect to the broker at any given time)
# If you intend to run multiple instances simultaneously, make sure to set unique client IDs for each instance.
mqtt_client = mqtt.Client(
client_id='netalertx',
callback_api_version = mqtt.CallbackAPIVersion.VERSION2,
callback_api_version=mqtt.CallbackAPIVersion.VERSION2,
transport=mytransport,
protocol=version)
mqtt_client.on_connect = on_connect
@@ -379,8 +412,8 @@ def mqtt_create_client():
if get_setting_value('MQTT_TLS'):
mqtt_client.tls_set()
mqtt_client.username_pw_set(username = get_setting_value('MQTT_USER'), password = get_setting_value('MQTT_PASSWORD'))
err_code = mqtt_client.connect(host = get_setting_value('MQTT_BROKER'), port = get_setting_value('MQTT_PORT'))
mqtt_client.username_pw_set(username=get_setting_value('MQTT_USER'), password=get_setting_value('MQTT_PASSWORD'))
err_code = mqtt_client.connect(host=get_setting_value('MQTT_BROKER'), port=get_setting_value('MQTT_PORT'))
if (err_code == mqtt.MQTT_ERR_SUCCESS):
# We (prematurely) set the connection state to connected
# the callback may be delayed
@@ -389,36 +422,37 @@ def mqtt_create_client():
# Mosquitto works straight away
# EMQX has a delay and does not update in loop below, so we cannot rely on it, we wait 1 sec
time.sleep(1)
mqtt_client.loop_start()
mqtt_client.loop_start()
return mqtt_client
#-------------------------------------------------------------------------------
def mqtt_start(db):
# -----------------------------------------------------------------------------
def mqtt_start(db):
global mqtt_client, mqtt_connected_to_broker
if mqtt_connected_to_broker == False:
mqtt_connected_to_broker = True
mqtt_client = mqtt_create_client()
if not mqtt_connected_to_broker:
mqtt_connected_to_broker = True
mqtt_client = mqtt_create_client()
deviceName = get_setting_value('MQTT_DEVICE_NAME')
deviceId = get_setting_value('MQTT_DEVICE_ID')
# General stats
deviceId = get_setting_value('MQTT_DEVICE_ID')
# General stats
# Create a generic device for overal stats
if get_setting_value('MQTT_SEND_STATS') == True:
# Create a new device representing overall stats
if get_setting_value('MQTT_SEND_STATS') == True:
# Create a new device representing overall stats
create_generic_device(mqtt_client, deviceId, deviceName)
# Get the data
row = get_device_stats(db)
row = get_device_stats(db)
# Publish (wrap into {} and remove last ',' from above)
publish_mqtt(mqtt_client, f"{topic_root}/sensor/{deviceId}/state",
{
publish_mqtt(mqtt_client, f"{topic_root}/sensor/{deviceId}/state",
{
"online": row[0],
"down": row[1],
"all": row[2],
@@ -429,7 +463,7 @@ def mqtt_start(db):
)
# Generate device-specific MQTT messages if enabled
if get_setting_value('MQTT_SEND_DEVICES') == True:
if get_setting_value('MQTT_SEND_DEVICES'):
# Specific devices processing
@@ -438,37 +472,37 @@ def mqtt_start(db):
sec_delay = len(devices) * int(get_setting_value('MQTT_DELAY_SEC'))*5
mylog('verbose', [f"[{pluginName}] Estimated delay: ", (sec_delay), 's ', '(', round(sec_delay/60,1) , 'min)' ])
mylog('verbose', [f"[{pluginName}] Estimated delay: ", (sec_delay), 's ', '(', round(sec_delay/60, 1), 'min)'])
debug_index = 0
for device in devices:
for device in devices:
# # debug statement START 🔻
# if 'Moto' not in device["devName"]:
# mylog('none', [f"[{pluginName}] ALERT - ⚠⚠⚠⚠ DEBUGGING ⚠⚠⚠⚠ - this should not be in uncommented in production"])
# mylog('none', [f"[{pluginName}] ALERT - ⚠⚠⚠⚠ DEBUGGING ⚠⚠⚠⚠ - this should not be in uncommented in production"])
# continue
# # debug statement END 🔺
# Create devices in Home Assistant - send config messages
deviceId = 'mac_' + device["devMac"].replace(" ", "").replace(":", "_").lower()
# Normalize the string and remove unwanted characters
devDisplayName = re.sub('[^a-zA-Z0-9-_\\s]', '', normalize_string(device["devName"]))
devDisplayName = re.sub('[^a-zA-Z0-9-_\\s]', '', normalize_string(device["devName"]))
sensorConfig = create_sensor(mqtt_client, deviceId, devDisplayName, 'sensor', 'last_ip', 'ip-network', device["devMac"])
sensorConfig = create_sensor(mqtt_client, deviceId, devDisplayName, 'sensor', 'mac_address', 'folder-key-network', device["devMac"])
sensorConfig = create_sensor(mqtt_client, deviceId, devDisplayName, 'sensor', 'is_new', 'bell-alert-outline', device["devMac"])
sensorConfig = create_sensor(mqtt_client, deviceId, devDisplayName, 'sensor', 'vendor', 'cog', device["devMac"])
sensorConfig = create_sensor(mqtt_client, deviceId, devDisplayName, 'sensor', 'vendor', 'cog', device["devMac"])
sensorConfig = create_sensor(mqtt_client, deviceId, devDisplayName, 'sensor', 'first_connection', 'calendar-start', device["devMac"])
sensorConfig = create_sensor(mqtt_client, deviceId, devDisplayName, 'sensor', 'last_connection', 'calendar-end', device["devMac"])
# handle device_tracker
# IMPORTANT: shared payload - device_tracker attributes and individual sensors
devJson = {
"last_ip": device["devLastIP"],
"is_new": str(device["devIsNew"]),
"alert_down": str(device["devAlertDown"]),
"vendor": sanitize_string(device["devVendor"]),
devJson = {
"last_ip": device["devLastIP"],
"is_new": str(device["devIsNew"]),
"alert_down": str(device["devAlertDown"]),
"vendor": sanitize_string(device["devVendor"]),
"mac_address": str(device["devMac"]),
"model": devDisplayName,
"last_connection": prepTimeStamp(str(device["devLastConnection"])),
@@ -480,37 +514,36 @@ def mqtt_start(db):
"network_parent_name": next((dev["devName"] for dev in devices if dev["devMAC"] == device["devParentMAC"]), "")
}
# bulk update device sensors in home assistant
# bulk update device sensors in home assistant
publish_mqtt(mqtt_client, sensorConfig.state_topic, devJson) # REQUIRED, DON'T DELETE
# create and update is_present sensor
sensorConfig = create_sensor(mqtt_client, deviceId, devDisplayName, 'binary_sensor', 'is_present', 'wifi', device["devMac"])
publish_mqtt(mqtt_client, sensorConfig.state_topic,
{
publish_mqtt(mqtt_client, sensorConfig.state_topic,
{
"is_present": to_binary_sensor(str(device["devPresentLastScan"]))
}
)
)
# handle device_tracker
sensorConfig = create_sensor(mqtt_client, deviceId, devDisplayName, 'device_tracker', 'is_home', 'home', device["devMac"])
sensorConfig = create_sensor(mqtt_client, deviceId, devDisplayName, 'device_tracker', 'is_home', 'home', device["devMac"])
# <away|home> are only valid states
state = 'away'
if to_binary_sensor(str(device["devPresentLastScan"])) == "ON":
state = 'home'
publish_mqtt(mqtt_client, sensorConfig.state_topic, state)
publish_mqtt(mqtt_client, sensorConfig.state_topic, state)
# publish device_tracker attributes
publish_mqtt(mqtt_client, sensorConfig.json_attr_topic, devJson)
publish_mqtt(mqtt_client, sensorConfig.json_attr_topic, devJson)
#===============================================================================
# =============================================================================
# Home Assistant UTILs
#===============================================================================
# =============================================================================
def to_binary_sensor(input):
# In HA a binary sensor returns ON or OFF
# In HA a binary sensor returns ON or OFF
result = "OFF"
# bytestring
@@ -528,6 +561,7 @@ def to_binary_sensor(input):
result = "ON"
return result
# -------------------------------------
# Convert to format that is interpretable by Home Assistant
def prepTimeStamp(datetime_str):
@@ -547,9 +581,7 @@ def prepTimeStamp(datetime_str):
# Convert to the required format with 'T' between date and time and ensure the timezone is included
return parsed_datetime.isoformat() # This will include the timezone offset
# -------------INIT---------------------
if __name__ == '__main__':
sys.exit(main())