MQTT timestamp normalization for HomeAssistant

This commit is contained in:
jokob-sk
2024-10-02 15:51:08 +10:00
2 changed files with 132 additions and 95 deletions

View File

@@ -38,7 +38,6 @@ conf.tz = timezone(get_setting_value('TIMEZONE'))
CUR_PATH = str(pathlib.Path(__file__).parent.resolve())
RESULT_FILE = os.path.join(CUR_PATH, 'last_result.log')
# Initialize the Plugin obj output file
plugin_objects = Plugin_Objects(RESULT_FILE)
# Create an MD5 hash object
@@ -47,7 +46,6 @@ md5_hash = hashlib.md5()
pluginName = 'MQTT'
# globals
mqtt_sensors = []
mqtt_connected_to_broker = False
mqtt_client = None # mqtt client
@@ -87,115 +85,144 @@ def check_config():
# Sensor configs are tracking which sensors in NetAlertX exist and if a config has changed
class sensor_config:
def __init__(self, deviceId, deviceName, sensorType, sensorName, icon, mac):
"""
Initialize the sensor_config object with provided parameters. Sets up sensor configuration
and generates necessary MQTT topics and messages based on the sensor type.
"""
# Assign initial attributes
self.deviceId = deviceId
self.deviceName = deviceName
self.sensorType = sensorType
self.sensorName = sensorName
self.icon = icon
self.mac = mac
self.mac = mac
self.model = deviceName
self.hash = ''
self.state_topic = ''
self.json_attr_topic = ''
self.topic = ''
self.message = ''
self.message = {} # Initialize message as an empty dictionary
self.unique_id = ''
# handle sensors of type "binary_sensor" or "sensor"
if self.sensorType == 'binary_sensor' or self.sensorType == 'sensor':
# Call helper functions to initialize the message, generate a hash, and handle plugin object
self.initialize_message()
self.generate_hash()
self.handle_plugin_object()
def initialize_message(self):
"""
Initialize the MQTT message payload based on the sensor type. This method handles sensors of types:
- 'timestamp'
- 'binary_sensor'
- 'sensor'
- 'device_tracker'
"""
# Ensure self.message is initialized as a dictionary if not already done
if not isinstance(self.message, dict):
self.message = {}
# Handle sensors with a 'timestamp' device class
if self.sensorName in ['last_connection', 'first_connection']:
self.message.update({
"device_class": "timestamp"
})
# Handle 'binary_sensor' or 'sensor' types
if self.sensorType in ['binary_sensor', 'sensor']:
self.topic = f'homeassistant/{self.sensorType}/{self.deviceId}/{self.sensorName}/config'
self.state_topic = f'system-sensors/{self.sensorType}/{self.deviceId}/state'
self.unique_id = self.deviceId+'_sensor_'+self.sensorName
self.unique_id = f'{self.deviceId}_sensor_{self.sensorName}'
self.message = {
"name" : self.sensorName,
"state_topic" : self.state_topic,
"value_template" : "{{value_json."+self.sensorName+"}}",
"unique_id" : self.unique_id,
"device":
{
"identifiers" : [self.deviceId+"_sensor"],
"manufacturer" : "NetAlertX",
"name" : self.deviceName
},
"icon": f'mdi:{self.icon}'
}
# handle sensors of type "device_tracker"
# Update the message dictionary, expanding it without overwriting
self.message.update({
"name": self.sensorName,
"state_topic": self.state_topic,
"value_template": f"{{{{value_json.{self.sensorName}}}}}",
"unique_id": self.unique_id,
"device": {
"identifiers": [f"{self.deviceId}_sensor"],
"manufacturer": "NetAlertX",
"name": self.deviceName
},
"icon": f'mdi:{self.icon}'
})
# Handle 'device_tracker' sensor type
elif self.sensorType == 'device_tracker':
self.topic = f'homeassistant/device_tracker/{self.deviceId}/config'
self.state_topic = f'system-sensors/device_tracker/{self.deviceId}/state'
self.json_attr_topic = f'system-sensors/device_tracker/{self.deviceId}/attributes'
self.unique_id = f'{self.deviceId}_{self.sensorType}_{self.sensorName}'
payload_home = 'home'
payload_away = 'away'
# Construct the message dictionary for device_tracker
self.message = {
"state_topic": self.state_topic,
"json_attributes_topic": self.json_attr_topic,
"name": self.sensorName,
"payload_home": 'home',
"payload_not_home": 'away',
"unique_id": self.unique_id,
"icon": f'mdi:{self.icon}',
"device": {
"identifiers": [f"{self.deviceId}_sensor", self.unique_id],
"manufacturer": "NetAlertX",
"model": self.model or "Unknown", # Use model if available, else set to 'Unknown'
"name": self.deviceName
}
}
self.message = {
"state_topic": self.state_topic,
"json_attributes_topic": self.json_attr_topic,
"name": self.sensorName,
"payload_home": payload_home,
"payload_not_home": payload_away,
"unique_id" : self.unique_id,
"icon": f'mdi:{self.icon}',
"device":
{
"identifiers" : [self.deviceId+"_sensor", self.unique_id],
"manufacturer" : "NetAlertX",
"name" : self.deviceName
},
}
# handle sensors of type "timestamp"
elif self.sensorName in ['last_connection', 'first_connection']:
self.message["device_class"] = "timestamp"
def generate_hash(self):
"""
Generate an MD5 hash based on the combined string of deviceId, deviceName, sensorType, sensorName, and icon.
This hash will uniquely identify the sensor configuration.
"""
# Concatenate all relevant attributes into a single string
input_string = f"{self.deviceId}{self.deviceName}{self.sensorType}{self.sensorName}{self.icon}"
md5_hash = hashlib.md5() # Initialize the MD5 hash object
md5_hash.update(input_string.encode('utf-8')) # Update hash with input string
self.hash = md5_hash.hexdigest() # Store the hex representation of the hash
# Define your input string
input_string = str(self.deviceId) + str(self.deviceName) + str(self.sensorType) + str(self.sensorName) + str(self.icon)
def handle_plugin_object(self):
"""
Fetch the plugin object from the system based on the generated hash. If the object exists, it logs that the sensor is
already known. If not, it marks the sensor as new and logs relevant information.
"""
# Retrieve the plugin object based on the sensor's hash
plugObj = getPluginObject({"Plugin": "MQTT", "Watched_Value3": self.hash})
# Hash the input string and convert the hash to a string
# Update the hash object with the bytes of the input string
md5_hash.update(input_string.encode('utf-8'))
# Get the hexadecimal representation of the MD5 hash
md5_hash_hex = md5_hash.hexdigest()
hash_value = str(md5_hash_hex)
self.hash = hash_value
plugObj = getPluginObject({"Plugin":"MQTT", "Watched_Value3":hash_value})
# mylog('verbose', [f"[{pluginName}] Previous plugin object entry: {json.dumps(plugObj)}"])
if plugObj == {}:
# Check if the plugin object is new
if not plugObj:
self.isNew = True
mylog('verbose', [f"[{pluginName}] New sensor entry name : {self.deviceName}"])
mylog('verbose', [f"[{pluginName}] New sensor entry mac : {self.mac}"])
mylog('verbose', [f"[{pluginName}] New sensor entry hash_value : {hash_value}"])
mylog('verbose', [f"[{pluginName}] New sensor entry (name|mac|hash) : ({self.deviceName}|{self.mac}|{self.hash}"])
else:
device_name = plugObj.get("Watched_Value1", "Unknown")
mylog('verbose', [f"[{pluginName}] Existing, skip Device Name : {device_name}"])
mylog('verbose', [f"[{pluginName}] Existing, skip Device Name: {device_name}"])
self.isNew = False
# Store the sensor configuration in global plugin_objects
self.store_plugin_object()
# Log sensor
def store_plugin_object(self):
"""
Store the sensor configuration in the global plugin_objects, which tracks sensors based on a unique combination
of attributes including deviceId, sensorName, hash, and MAC.
"""
global plugin_objects
if mac == '':
mac = "N/A"
# Add the sensor to the global plugin_objects
plugin_objects.add_object(
primaryId = deviceId,
secondaryId = sensorName,
watched1 = deviceName,
watched2 = sensorType,
watched3 = hash_value,
watched4 = mac,
extra = input_string,
foreignKey = mac
primaryId=self.deviceId,
secondaryId=self.sensorName,
watched1=self.deviceName,
watched2=self.sensorType,
watched3=self.hash,
watched4=self.mac,
extra=f"{self.deviceId}{self.deviceName}{self.sensorType}{self.sensorName}{self.icon}",
foreignKey=self.mac
)
#-------------------------------------------------------------------------------
def publish_mqtt(mqtt_client, topic, message):
@@ -414,8 +441,10 @@ def mqtt_start(db):
for device in devices:
# debug statement
# if 'Moto' in device["dev_Name"]:
# debug statement START 🔻
if 'Moto' not in device["dev_Name"]:
continue
# debug statement END 🔺
# Create devices in Home Assistant - send config messages
deviceId = 'mac_' + device["dev_MAC"].replace(" ", "").replace(":", "_").lower()
@@ -427,7 +456,7 @@ def mqtt_start(db):
sensorConfig = create_sensor(mqtt_client, deviceId, devDisplayName, 'sensor', 'is_new', 'bell-alert-outline', device["dev_MAC"])
sensorConfig = create_sensor(mqtt_client, deviceId, devDisplayName, 'sensor', 'vendor', 'cog', device["dev_MAC"])
sensorConfig = create_sensor(mqtt_client, deviceId, devDisplayName, 'sensor', 'first_connection', 'calendar-start', device["dev_MAC"])
sensorConfig = create_sensor(mqtt_client, deviceId, devDisplayName, 'sensor', 'last_connection', 'calendar-end', device["dev_MAC"])
sensorConfig = create_sensor(mqtt_client, deviceId, devDisplayName, 'sensor', 'last_connection', 'calendar-end', device["dev_MAC"])
devJson = {
@@ -435,9 +464,9 @@ def mqtt_start(db):
"is_new": str(device["dev_NewDevice"]),
"vendor": sanitize_string(device["dev_Vendor"]),
"mac_address": str(device["dev_MAC"]),
"model": devDisplayName,
"last_connection": prepTimeStamp(str(device["dev_LastConnection"])),
"first_connection": prepTimeStamp(str(device["dev_FirstConnection"]))
}
"first_connection": prepTimeStamp(str(device["dev_FirstConnection"])) }
# bulk update device sensors in home assistant
publish_mqtt(mqtt_client, sensorConfig.state_topic, devJson)
@@ -490,16 +519,21 @@ def to_binary_sensor(input):
# -------------------------------------
# Convert to format that is interpretable by Home Assistant
def prepTimeStamp(datetime_str):
try:
# Attempt to parse the input string to ensure it's a valid datetime
parsed_datetime = datetime.fromisoformat(datetime_str)
except ValueError:
mylog('verbose', [f"[{pluginName}] Timestamp conversion failed of string '{datetime_str}'" ])
# Use the current time if the input format is invalid
parsed_datetime = timeNowTZ()
# Convert to the required format with 'T' between date and time
return parsed_datetime.isoformat()
try:
# Attempt to parse the input string to ensure it's a valid datetime
parsed_datetime = datetime.fromisoformat(datetime_str)
# If the parsed datetime is naive (i.e., does not contain timezone info), add UTC timezone
if parsed_datetime.tzinfo is None:
parsed_datetime = parsed_datetime.replace(tzinfo=conf.tz)
except ValueError:
mylog('verbose', [f"[{pluginName}] Timestamp conversion failed of string '{datetime_str}'"])
# Use the current time if the input format is invalid
parsed_datetime = timeNowTZ() # Assuming this function returns the current time with timezone
# Convert to the required format with 'T' between date and time and ensure the timezone is included
return parsed_datetime.isoformat() # This will include the timezone offset
# -------------INIT---------------------
if __name__ == '__main__':