Merge pull request #1205 from ingoratsdorf/mqtt-optimisations
Some checks failed
Code checks / check-url-paths (push) Has been cancelled
docker / docker_dev (push) Has been cancelled
Deploy MkDocs / deploy (push) Has been cancelled

Mqtt optimisations and TZ fixes
This commit is contained in:
Jokob @NetAlertX
2025-09-28 20:22:50 +10:00
committed by GitHub

View File

@@ -1,36 +1,32 @@
#!/usr/bin/env python #!/usr/bin/env python
import json import json
import subprocess
import argparse
import os import os
import pathlib
import sys import sys
from datetime import datetime from datetime import datetime
import time import time
import re import re
import unicodedata
import paho.mqtt.client as mqtt import paho.mqtt.client as mqtt
# from paho.mqtt import client as mqtt_client # from paho.mqtt import client as mqtt_client
# from paho.mqtt import CallbackAPIVersion as mqtt_CallbackAPIVersion # from paho.mqtt import CallbackAPIVersion as mqtt_CallbackAPIVersion
import hashlib import hashlib
import sqlite3 from pytz import timezone
# Register NetAlertX directories # Register NetAlertX directories
INSTALL_PATH="/app" INSTALL_PATH = "/app"
sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"]) sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
# NetAlertX modules # NetAlertX modules
import conf import conf
from const import apiPath, confFileName, logPath from const import confFileName, logPath
from plugin_utils import getPluginObject from plugin_utils import getPluginObject
from plugin_helper import Plugin_Objects from plugin_helper import Plugin_Objects
from logger import mylog, Logger, append_line_to_file from logger import mylog, Logger
from helper import timeNowTZ, get_setting_value, bytes_to_string, sanitize_string, normalize_string from helper import timeNowTZ, get_setting_value, bytes_to_string, \
from models.notification_instance import NotificationInstance sanitize_string, normalize_string
from database import DB, get_device_stats from database import DB, get_device_stats
from pytz import timezone
# Make sure the TIMEZONE for logging is correct # Make sure the TIMEZONE for logging is correct
conf.tz = timezone(get_setting_value('TIMEZONE')) conf.tz = timezone(get_setting_value('TIMEZONE'))
@@ -49,20 +45,19 @@ plugin_objects = Plugin_Objects(RESULT_FILE)
md5_hash = hashlib.md5() md5_hash = hashlib.md5()
# globals # globals
mqtt_sensors = [] mqtt_sensors = []
mqtt_connected_to_broker = False mqtt_connected_to_broker = False
mqtt_client = None # mqtt client mqtt_client = None # mqtt client
topic_root = get_setting_value('MQTT_topic_root') topic_root = get_setting_value('MQTT_topic_root')
def main(): def main():
mylog('verbose', [f'[{pluginName}](publisher) In script']) mylog('verbose', [f'[{pluginName}](publisher) In script'])
# Check if basic config settings supplied # Check if basic config settings supplied
if check_config() == False: if not check_config():
mylog('verbose', [f'[{pluginName}] ⚠ ERROR: Publisher notification gateway not set up correctly. Check your {confFileName} {pluginName}_* variables.'])
return return
# Create a database connection # Create a database connection
@@ -70,30 +65,52 @@ def main():
db.open() db.open()
mqtt_start(db) mqtt_start(db)
mqtt_client.disconnect()
plugin_objects.write_result_file() plugin_objects.write_result_file()
# -----------------------------------------------------------------------------
#-------------------------------------------------------------------------------
# MQTT # MQTT
#------------------------------------------------------------------------------- # -----------------------------------------------------------------------------
#------------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def check_config(): def check_config():
if get_setting_value('MQTT_BROKER') == '' or get_setting_value('MQTT_PORT') == '' or get_setting_value('MQTT_USER') == '' or get_setting_value('MQTT_PASSWORD') == '': """
mylog('verbose', [f'[Check Config] ⚠ ERROR: MQTT service not set up correctly. Check your {confFileName} MQTT_* variables.']) Checks whether the MQTT configuration settings are properly set.
Returns:
bool: True if all required MQTT settings
('MQTT_BROKER', 'MQTT_PORT', 'MQTT_USER', 'MQTT_PASSWORD')
are non-empty;
False otherwise. Logs a verbose error message
if any setting is missing.
"""
if get_setting_value('MQTT_BROKER') == '' \
or get_setting_value('MQTT_PORT') == '' \
or get_setting_value('MQTT_USER') == '' \
or get_setting_value('MQTT_PASSWORD') == '':
mylog('verbose', [f'[Check Config] ⚠ ERROR: MQTT service not set up \
correctly. Check your {confFileName} MQTT_* variables.'])
return False return False
else: else:
return True return True
#------------------------------------------------------------------------------- # -----------------------------------------------------------------------------
# Sensor configs are tracking which sensors in NetAlertX exist and if a config has changed # Sensor configs are tracking which sensors in NetAlertX exist
# and if a config has changed
class sensor_config: class sensor_config:
def __init__(self, deviceId, deviceName, sensorType, sensorName, icon, mac): def __init__(self,
deviceId,
deviceName,
sensorType,
sensorName,
icon,
mac):
""" """
Initialize the sensor_config object with provided parameters. Sets up sensor configuration Initialize the sensor_config object with provided parameters.
and generates necessary MQTT topics and messages based on the sensor type. Sets up sensor configuration and generates necessary MQTT topics
and messages based on the sensor type.
""" """
# Assign initial attributes # Assign initial attributes
self.deviceId = deviceId self.deviceId = deviceId
@@ -110,20 +127,23 @@ class sensor_config:
self.message = {} # Initialize message as an empty dictionary self.message = {} # Initialize message as an empty dictionary
self.unique_id = '' self.unique_id = ''
# Call helper functions to initialize the message, generate a hash, and handle plugin object # Call helper functions to initialize the message, generate a hash,
# and handle plugin object
self.initialize_message() self.initialize_message()
self.generate_hash() self.generate_hash()
self.handle_plugin_object() self.handle_plugin_object()
def initialize_message(self): def initialize_message(self):
""" """
Initialize the MQTT message payload based on the sensor type. This method handles sensors of types: Initialize the MQTT message payload based on the sensor type.
This method handles sensors of types:
- 'timestamp' - 'timestamp'
- 'binary_sensor' - 'binary_sensor'
- 'sensor' - 'sensor'
- 'device_tracker' - 'device_tracker'
""" """
# Ensure self.message is initialized as a dictionary if not already done # Ensure self.message is initialized as a dictionary
# if not already done
if not isinstance(self.message, dict): if not isinstance(self.message, dict):
self.message = {} self.message = {}
@@ -153,7 +173,6 @@ class sensor_config:
"icon": f'mdi:{self.icon}' "icon": f'mdi:{self.icon}'
}) })
# Handle 'device_tracker' sensor type # Handle 'device_tracker' sensor type
elif self.sensorType == 'device_tracker': elif self.sensorType == 'device_tracker':
self.topic = f'homeassistant/device_tracker/{self.deviceId}/config' self.topic = f'homeassistant/device_tracker/{self.deviceId}/config'
@@ -229,25 +248,36 @@ class sensor_config:
) )
#------------------------------------------------------------------------------- # -------------------------------------------------------------------------------
def publish_mqtt(mqtt_client, topic, message): def publish_mqtt(mqtt_client, topic, message):
"""
Publishes a message to an MQTT topic using the provided MQTT client.
If the message is not a string, it is converted to a JSON-formatted string.
The function retrieves the desired QoS level from settings and logs the publishing process.
If the client is not connected to the broker, the function logs an error and aborts.
It attempts to publish the message, retrying until the publish status indicates success.
Args:
mqtt_client: The MQTT client instance used to publish the message.
topic (str): The MQTT topic to publish to.
message (Any): The message payload to send. Non-string messages are converted to JSON.
Returns:
bool: True if the message was published successfully, False if not connected to the broker.
"""
status = 1 status = 1
# convert anything but a simple string to json # convert anything but a simple string to json
if not isinstance(message, str): if not isinstance(message, str):
message = json.dumps(message).replace("'",'"') message = json.dumps(message).replace("'", '"')
qos = get_setting_value('MQTT_QOS') qos = get_setting_value('MQTT_QOS')
mylog('verbose', [f"[{pluginName}] Sending MQTT topic: {topic}"]) mylog('debug', [f"[{pluginName}] Sending MQTT topic: {topic}",
mylog('verbose', [f"[{pluginName}] Sending MQTT message: {message}"]) f"[{pluginName}] Sending MQTT message: {message}"])
# mylog('verbose', [f"[{pluginName}] get_setting_value('MQTT_QOS'): {qos}"]) # mylog('verbose', [f"[{pluginName}] get_setting_value('MQTT_QOS'): {qos}"])
if mqtt_connected_to_broker == False: if not mqtt_connected_to_broker:
mylog('minimal', [f"[{pluginName}] ⚠ ERROR: Not connected to broker, aborting."])
mylog('verbose', [f"[{pluginName}] ⚠ ERROR: Not connected to broker, aborting."])
return False return False
while status != 0: while status != 0:
@@ -268,11 +298,11 @@ def publish_mqtt(mqtt_client, topic, message):
# mylog('verbose', [f"[{pluginName}] result: {result}"]) # mylog('verbose', [f"[{pluginName}] result: {result}"])
if status != 0: if status != 0:
mylog('verbose', [f"[{pluginName}] Waiting to reconnect to MQTT broker"]) mylog('debug', [f"[{pluginName}] Waiting to reconnect to MQTT broker"])
time.sleep(0.1) time.sleep(0.1)
return True return True
#------------------------------------------------------------------------------- # ------------------------------------------------------------------------------
# Create a generic device for overal stats # Create a generic device for overal stats
def create_generic_device(mqtt_client, deviceId, deviceName): def create_generic_device(mqtt_client, deviceId, deviceName):
@@ -284,28 +314,29 @@ def create_generic_device(mqtt_client, deviceId, deviceName):
create_sensor(mqtt_client, deviceId, deviceName, 'sensor', 'unknown', 'wifi-alert') create_sensor(mqtt_client, deviceId, deviceName, 'sensor', 'unknown', 'wifi-alert')
#------------------------------------------------------------------------------- # ------------------------------------------------------------------------------
# Register sensor config on the broker # Register sensor config on the broker
def create_sensor(mqtt_client, deviceId, deviceName, sensorType, sensorName, icon, mac=""): def create_sensor(mqtt_client, deviceId, deviceName, sensorType, sensorName, icon, mac=""):
global mqtt_sensors global mqtt_sensors
# check previous configs # check previous configs
sensorConfig = sensor_config(deviceId, deviceName, sensorType, sensorName, icon, mac) sensorConfig = sensor_config(deviceId, deviceName, sensorType, sensorName, icon, mac)
# send if new # Create the HA sensor config if a new device is discovered
if sensorConfig.isNew: if sensorConfig.isNew:
# add the sensor to the global list to keep track of succesfully added sensors # add the sensor to the global list to keep track of succesfully added sensors
if publish_mqtt(mqtt_client, sensorConfig.topic, sensorConfig.message): if publish_mqtt(mqtt_client, sensorConfig.topic, sensorConfig.message):
# hack - delay adding to the queue in case the process is # hack - delay adding to the queue in case the process is
time.sleep(get_setting_value('MQTT_DELAY_SEC')) # restarted and previous publish processes aborted # restarted and previous publish processes aborted
# (it takes ~2s to update a sensor config on the broker) # (it takes ~2s to update a sensor config on the broker)
time.sleep(get_setting_value('MQTT_DELAY_SEC'))
mqtt_sensors.append(sensorConfig) mqtt_sensors.append(sensorConfig)
return sensorConfig return sensorConfig
#-------------------------------------------------------------------------------
# -----------------------------------------------------------------------------
def mqtt_create_client(): def mqtt_create_client():
# attempt reconnections on failure, ref https://www.emqx.com/en/blog/how-to-use-mqtt-in-python # attempt reconnections on failure, ref https://www.emqx.com/en/blog/how-to-use-mqtt-in-python
@@ -341,7 +372,6 @@ def mqtt_create_client():
mqtt_connected_to_broker = False mqtt_connected_to_broker = False
def on_connect(mqtt_client, userdata, flags, rc, properties): def on_connect(mqtt_client, userdata, flags, rc, properties):
global mqtt_connected_to_broker global mqtt_connected_to_broker
@@ -367,10 +397,12 @@ def mqtt_create_client():
version = mqtt.MQTTv5 version = mqtt.MQTTv5
# we now hardcode the client id into here. # we now hardcode the client id into here.
# TODO: Add config ffor client id # TODO: Add config for client id (atm, we use a fixed client id,
# so only one instance of NetAlertX can connect to the broker at any given time)
# If you intend to run multiple instances simultaneously, make sure to set unique client IDs for each instance.
mqtt_client = mqtt.Client( mqtt_client = mqtt.Client(
client_id='netalertx', client_id='netalertx',
callback_api_version = mqtt.CallbackAPIVersion.VERSION2, callback_api_version=mqtt.CallbackAPIVersion.VERSION2,
transport=mytransport, transport=mytransport,
protocol=version) protocol=version)
mqtt_client.on_connect = on_connect mqtt_client.on_connect = on_connect
@@ -379,8 +411,8 @@ def mqtt_create_client():
if get_setting_value('MQTT_TLS'): if get_setting_value('MQTT_TLS'):
mqtt_client.tls_set() mqtt_client.tls_set()
mqtt_client.username_pw_set(username = get_setting_value('MQTT_USER'), password = get_setting_value('MQTT_PASSWORD')) mqtt_client.username_pw_set(username=get_setting_value('MQTT_USER'), password=get_setting_value('MQTT_PASSWORD'))
err_code = mqtt_client.connect(host = get_setting_value('MQTT_BROKER'), port = get_setting_value('MQTT_PORT')) err_code = mqtt_client.connect(host=get_setting_value('MQTT_BROKER'), port=get_setting_value('MQTT_PORT'))
if (err_code == mqtt.MQTT_ERR_SUCCESS): if (err_code == mqtt.MQTT_ERR_SUCCESS):
# We (prematurely) set the connection state to connected # We (prematurely) set the connection state to connected
# the callback may be delayed # the callback may be delayed
@@ -393,13 +425,13 @@ def mqtt_create_client():
return mqtt_client return mqtt_client
#-------------------------------------------------------------------------------
# -----------------------------------------------------------------------------
def mqtt_start(db): def mqtt_start(db):
global mqtt_client, mqtt_connected_to_broker global mqtt_client, mqtt_connected_to_broker
if mqtt_connected_to_broker == False: if not mqtt_connected_to_broker:
mqtt_connected_to_broker = True
mqtt_client = mqtt_create_client() mqtt_client = mqtt_create_client()
@@ -409,7 +441,7 @@ def mqtt_start(db):
# General stats # General stats
# Create a generic device for overal stats # Create a generic device for overal stats
if get_setting_value('MQTT_SEND_STATS') == True: if get_setting_value('MQTT_SEND_STATS'):
# Create a new device representing overall stats # Create a new device representing overall stats
create_generic_device(mqtt_client, deviceId, deviceName) create_generic_device(mqtt_client, deviceId, deviceName)
@@ -429,7 +461,7 @@ def mqtt_start(db):
) )
# Generate device-specific MQTT messages if enabled # Generate device-specific MQTT messages if enabled
if get_setting_value('MQTT_SEND_DEVICES') == True: if get_setting_value('MQTT_SEND_DEVICES'):
# Specific devices processing # Specific devices processing
@@ -438,9 +470,7 @@ def mqtt_start(db):
sec_delay = len(devices) * int(get_setting_value('MQTT_DELAY_SEC'))*5 sec_delay = len(devices) * int(get_setting_value('MQTT_DELAY_SEC'))*5
mylog('verbose', [f"[{pluginName}] Estimated delay: ", (sec_delay), 's ', '(', round(sec_delay/60,1) , 'min)' ]) mylog('verbose', [f"[{pluginName}] Estimated delay: ", (sec_delay), 's ', '(', round(sec_delay/60, 1), 'min)'])
debug_index = 0
for device in devices: for device in devices:
@@ -505,28 +535,23 @@ def mqtt_start(db):
publish_mqtt(mqtt_client, sensorConfig.json_attr_topic, devJson) publish_mqtt(mqtt_client, sensorConfig.json_attr_topic, devJson)
# =============================================================================
#===============================================================================
# Home Assistant UTILs # Home Assistant UTILs
#=============================================================================== # =============================================================================
def to_binary_sensor(input): def to_binary_sensor(input):
# In HA a binary sensor returns ON or OFF """
result = "OFF" Converts various input types to a binary sensor state ("ON" or "OFF") for Home Assistant.
"""
if isinstance(input, (int, float)) and input >= 1:
return "ON"
elif isinstance(input, bool) and input:
return "ON"
elif isinstance(input, str) and input == "1":
return "ON"
elif isinstance(input, bytes) and bytes_to_string(input) == "1":
return "ON"
return "OFF"
# bytestring
if isinstance(input, str):
if input == "1":
result = "ON"
elif isinstance(input, int):
if input == 1:
result = "ON"
elif isinstance(input, bool):
if input == True:
result = "ON"
elif isinstance(input, bytes):
if bytes_to_string(input) == "1":
result = "ON"
return result
# ------------------------------------- # -------------------------------------
# Convert to format that is interpretable by Home Assistant # Convert to format that is interpretable by Home Assistant
@@ -537,7 +562,7 @@ def prepTimeStamp(datetime_str):
# If the parsed datetime is naive (i.e., does not contain timezone info), add UTC timezone # If the parsed datetime is naive (i.e., does not contain timezone info), add UTC timezone
if parsed_datetime.tzinfo is None: if parsed_datetime.tzinfo is None:
parsed_datetime = parsed_datetime.replace(tzinfo=conf.tz) parsed_datetime = conf.tz.localize(parsed_datetime)
except ValueError: except ValueError:
mylog('verbose', [f"[{pluginName}] Timestamp conversion failed of string '{datetime_str}'"]) mylog('verbose', [f"[{pluginName}] Timestamp conversion failed of string '{datetime_str}'"])
@@ -547,9 +572,7 @@ def prepTimeStamp(datetime_str):
# Convert to the required format with 'T' between date and time and ensure the timezone is included # Convert to the required format with 'T' between date and time and ensure the timezone is included
return parsed_datetime.isoformat() # This will include the timezone offset return parsed_datetime.isoformat() # This will include the timezone offset
# -------------INIT--------------------- # -------------INIT---------------------
if __name__ == '__main__': if __name__ == '__main__':
sys.exit(main()) sys.exit(main())