MQTT rework v0.3 📩

This commit is contained in:
Jokob-sk
2023-10-14 23:02:43 +11:00
parent 7da9bf03a3
commit 31e1116483
5 changed files with 180 additions and 75 deletions

View File

@@ -79,19 +79,15 @@
{
"column": "Object_SecondaryID",
"css_classes": "col-sm-2",
"show": false,
"show": true,
"type": "label",
"default_value":"",
"options": [],
"localized": ["name"],
"name":[{
"language_code": "en_us",
"string" : "N/A"
},
{
"language_code": "es_es",
"string" : "N/A"
}]
"string" : "Sensor ID"
}]
},
{
"column": "DateTimeCreated",
@@ -109,7 +105,7 @@
{
"column": "DateTimeChanged",
"css_classes": "col-sm-2",
"show": false,
"show": true,
"type": "label",
"default_value":"",
"options": [],
@@ -133,55 +129,48 @@
"localized": ["name"],
"name":[{
"language_code": "en_us",
"string" : "Notification GUID"
"string" : "Device name"
}]
},
{
"column": "Watched_Value2",
"css_classes": "col-sm-8",
"show": true,
"type": "textarea_readonly",
"type": "label",
"default_value":"",
"options": [],
"localized": ["name"],
"name":[{
"language_code": "en_us",
"string" : "Result"
"string" : "Sensor type"
}]
},
{
"column": "Watched_Value3",
"css_classes": "col-sm-2",
"show": false,
"show": true,
"type": "label",
"default_value":"",
"options": [],
"localized": ["name"],
"name":[{
"language_code": "en_us",
"string" : "N/A"
},
{
"language_code": "es_es",
"string" : "N/A"
}]
"string" : "Sensor name"
}]
},
{
"column": "Watched_Value4",
"css_classes": "col-sm-2",
"show": false,
"show": true,
"type": "label",
"default_value":"",
"options": [],
"localized": ["name"],
"name":[{
"language_code": "en_us",
"string" : "N/A"
},
{
"language_code": "es_es",
"string" : "N/A"
}]
"string" : "Hash"
}
]
},
{
"column": "UserData",
@@ -203,7 +192,7 @@
{
"column": "Status",
"css_classes": "col-sm-1",
"show": false,
"show": true,
"type": "replace",
"default_value":"",
"options": [

View File

@@ -17,7 +17,9 @@ sys.path.extend(["/home/pi/pialert/front/plugins", "/home/pi/pialert/pialert"])
# PiAlert modules
import conf
from plugin_helper import Plugin_Objects
from const import apiPath
from plugin_helper import getPluginObject
from plugin_utils import Plugin_Objects
from logger import mylog, append_line_to_file
from helper import timeNowTZ, noti_obj, get_setting_value, bytes_to_string, sanitize_string
from notification import Notification_obj
@@ -27,8 +29,18 @@ from database import DB, get_all_devices, get_device_stats
CUR_PATH = str(pathlib.Path(__file__).parent.resolve())
RESULT_FILE = os.path.join(CUR_PATH, 'last_result.log')
# Initialize the Plugin obj output file
plugin_objects = Plugin_Objects(RESULT_FILE)
pluginName = 'MQTT'
# globals
mqtt_sensors = []
mqtt_connected_to_broker = False
client = None # mqtt client
def main():
mylog('verbose', [f'[{pluginName}](publisher) In script'])
@@ -44,6 +56,8 @@ def main():
mqtt_start(db)
plugin_objects.write_result_file()
#-------------------------------------------------------------------------------
@@ -68,6 +82,21 @@ class sensor_config:
self.sensorName = sensorName
self.icon = icon
self.hash = str(hash(str(deviceId) + str(deviceName)+ str(sensorType)+ str(sensorName)+ str(icon)))
self.isNew = getPluginObject({"Plugin":"MQTT", "Watched_Value4":hash}) is None
# Log sensor
global plugin_objects
plugin_objects.add_object(
primaryId = pluginName,
secondaryId = deviceId,
watched1 = deviceName,
watched2 = sensorType,
watched3 = sensorName,
watched4 = hash,
extra = 'null',
foreignKey = deviceId
)
#-------------------------------------------------------------------------------
@@ -105,69 +134,71 @@ def create_generic_device(client):
#-------------------------------------------------------------------------------
def create_sensor(client, deviceId, deviceName, sensorType, sensorName, icon):
new_sensor_config = sensor_config(deviceId, deviceName, sensorType, sensorName, icon)
global mqtt_sensors
# check if config already in list and if not, add it, otherwise skip
is_unique = True
mylog('minimal', [f"[{pluginName}] Already previously published sensors: {len(conf.mqtt_sensors)}"])
for sensor in conf.mqtt_sensors:
if sensor.hash == new_sensor_config.hash:
is_unique = False
break
new_sensor_config = sensor_config(deviceId, deviceName, sensorType, sensorName, icon)
# save if unique
if is_unique:
# save if new
if new_sensor_config.isNew:
mylog('minimal', [f"[{pluginName}] Publishing sensor number {len(mqtt_sensors)}"])
publish_sensor(client, new_sensor_config)
#-------------------------------------------------------------------------------
def publish_sensor(client, sensorConf):
def publish_sensor(client, sensorConfig):
global mqtt_sensors
message = '{ \
"name":"'+ sensorConf.deviceName +' '+sensorConf.sensorName+'", \
"state_topic":"system-sensors/'+sensorConf.sensorType+'/'+sensorConf.deviceId+'/state", \
"value_template":"{{value_json.'+sensorConf.sensorName+'}}", \
"unique_id":"'+sensorConf.deviceId+'_sensor_'+sensorConf.sensorName+'", \
"name":"'+ sensorConfig.deviceName +' '+sensorConfig.sensorName+'", \
"state_topic":"system-sensors/'+sensorConfig.sensorType+'/'+sensorConfig.deviceId+'/state", \
"value_template":"{{value_json.'+sensorConfig.sensorName+'}}", \
"unique_id":"'+sensorConfig.deviceId+'_sensor_'+sensorConfig.sensorName+'", \
"device": \
{ \
"identifiers": ["'+sensorConf.deviceId+'_sensor"], \
"identifiers": ["'+sensorConfig.deviceId+'_sensor"], \
"manufacturer": "PiAlert", \
"name":"'+sensorConf.deviceName+'" \
"name":"'+sensorConfig.deviceName+'" \
}, \
"icon":"mdi:'+sensorConf.icon+'" \
"icon":"mdi:'+sensorConfig.icon+'" \
}'
topic='homeassistant/'+sensorConf.sensorType+'/'+sensorConf.deviceId+'/'+sensorConf.sensorName+'/config'
topic='homeassistant/'+sensorConfig.sensorType+'/'+sensorConfig.deviceId+'/'+sensorConfig.sensorName+'/config'
# add the sensor to the global list to keep track of succesfully added sensors
if publish_mqtt(client, topic, message):
# hack - delay adding to the queue in case the process is
time.sleep(get_setting_value('MQTT_DELAY_SEC')) # restarted and previous publish processes aborted
# (it takes ~2s to update a sensor config on the broker)
conf.mqtt_sensors.append(sensorConf)
mqtt_sensors.append(sensorConfig)
#-------------------------------------------------------------------------------
def mqtt_create_client():
def mqtt_create_client():
def on_disconnect(client, userdata, rc):
conf.mqtt_connected_to_broker = False
global mqtt_connected_to_broker
mqtt_connected_to_broker = False
# not sure is below line is correct / necessary
# client = mqtt_create_client()
def on_connect(client, userdata, flags, rc):
global mqtt_connected_to_broker
if rc == 0:
mylog('verbose', [f"[{pluginName}] Connected to broker"])
conf.mqtt_connected_to_broker = True # Signal connection
mqtt_connected_to_broker = True # Signal connection
else:
mylog('none', [f"[{pluginName}] Connection failed"])
conf.mqtt_connected_to_broker = False
mqtt_connected_to_broker = False
global client
client = mqtt_client.Client('PiAlert') # Set Connecting Client ID
client.username_pw_set(get_setting_value('MQTT_USER'), get_setting_value('MQTT_PASSWORD'))
client.on_connect = on_connect
@@ -180,13 +211,12 @@ def mqtt_create_client():
#-------------------------------------------------------------------------------
def mqtt_start(db):
#global client
global client, mqtt_connected_to_broker
if conf.mqtt_connected_to_broker == False:
conf.mqtt_connected_to_broker = True
conf.client = mqtt_create_client()
if mqtt_connected_to_broker == False:
mqtt_connected_to_broker = True
client = mqtt_create_client()
client = conf.client
# General stats
# Create a generic device for overal stats

View File

@@ -123,3 +123,63 @@ class AppEvent_obj:
# Commit changes
self.db.commitDB()
def getPluginObject(**kwargs):
# Check if nothing, end
if not any(kwargs.values()):
return None
# Optional parameters
GUID = kwargs.get("GUID", "")
Plugin = kwargs.get("Plugin", "")
MAC = kwargs.get("MAC", "")
IP = kwargs.get("IP", "")
PrimaryID = kwargs.get("PrimaryID", "")
SecondaryID = kwargs.get("SecondaryID", "")
ForeignKey = kwargs.get("ForeignKey", "")
Index = kwargs.get("Index", "")
RowID = kwargs.get("RowID", "")
# we need the plugin
if Plugin == "":
return None
plugins_objects = apiPath + 'table_plugins_objects.json'
try:
with open(plugins_objects, 'r') as json_file:
data = json.load(json_file)
for item in data.get("data",[]):
if item.get("Index") == Index:
return item
for item in data.get("data",[]):
if item.get("ObjectPrimaryID") == PrimaryID and item.get("ObjectSecondaryID") == SecondaryID:
return item
for item in data.get("data",[]):
if item.get("ObjectPrimaryID") == MAC and item.get("ObjectSecondaryID") == IP:
return item
for item in data.get("data",[]):
if item.get("ObjectPrimaryID") == PrimaryID and item.get("ObjectSecondaryID") == IP:
return item
for item in data.get("data",[]):
if item.get("ObjectPrimaryID") == MAC and item.get("ObjectSecondaryID") == IP:
return item
mylog('debug', [f'[{module_name}] Error - Object not found - GUID:{GUID} | Plugin:{Plugin} | MAC:{MAC} | IP:{IP} | PrimaryID:{PrimaryID} | SecondaryID:{SecondaryID} | ForeignKey:{ForeignKey} | Index:{Index} | RowID:{RowID} '])
return None
except (FileNotFoundError, json.JSONDecodeError, ValueError) as e:
# Handle the case when the file is not found, JSON decoding fails, or data is not in the expected format
mylog('none', [f'[{module_name}] Error - JSONDecodeError or FileNotFoundError for file {plugins_objects}'])
return None

View File

@@ -24,12 +24,6 @@ last_scan_run = ''
last_version_check = ''
arpscan_devices = []
# for MQTT
mqtt_connected_to_broker = False
mqtt_sensors = []
client = None # mqtt client
# ACTUAL CONFIGRATION ITEMS set to defaults
# -------------------------------------------

View File

@@ -6,6 +6,7 @@ from logger import mylog
from const import pluginsPath, logPath
from helper import timeNowTZ, updateState, get_file_content, write_file, get_setting, get_setting_value
module_name = 'Plugin utils'
#-------------------------------------------------------------------------------
def logEventStatusCounts(objName, pluginEvents):
@@ -19,17 +20,17 @@ def logEventStatusCounts(objName, pluginEvents):
status_counts[status] = 1
for status, count in status_counts.items():
mylog('debug', [f'[Plugins] In {objName} there are {count} events with the status "{status}" '])
mylog('debug', [f'[{module_name}] In {objName} there are {count} events with the status "{status}" '])
#-------------------------------------------------------------------------------
def print_plugin_info(plugin, elements = ['display_name']):
mylog('verbose', ['[Plugins] ---------------------------------------------'])
mylog('verbose', [f'[{module_name}] ---------------------------------------------'])
for el in elements:
res = get_plugin_string(plugin, el)
mylog('verbose', ['[Plugins] ', el ,': ', res])
mylog('verbose', [f'[{module_name}] ', el ,': ', res])
#-------------------------------------------------------------------------------
@@ -41,9 +42,9 @@ def get_plugin_setting(plugin, function_key):
for set in plugin['settings']:
if set["function"] == function_key:
result = set
if result == None:
mylog('debug', ['[Plugins] Setting with "function":"', function_key, '" is missing in plugin: ', get_plugin_string(plugin, 'display_name')])
# if result == None:
# mylog('debug', [f'[{module_name}] Setting with "function":"', function_key, '" is missing in plugin: ', get_plugin_string(plugin, 'display_name')])
return result
@@ -76,9 +77,9 @@ def list_to_csv(arr):
tmp = ''
arrayItemStr = ''
mylog('debug', '[Plugins] Flattening the below array')
mylog('debug', f'[{module_name}] Flattening the below array')
mylog('debug', arr)
mylog('debug', f'[Plugins] isinstance(arr, list) : {isinstance(arr, list)} | isinstance(arr, str) : {isinstance(arr, str)}')
mylog('debug', f'[{module_name}] isinstance(arr, list) : {isinstance(arr, list)} | isinstance(arr, str) : {isinstance(arr, str)}')
if isinstance(arr, str):
return arr.replace('[','').replace(']','').replace("'", '') # removing brackets and single quotes (not allowed)
@@ -97,12 +98,12 @@ def list_to_csv(arr):
tmp = tmp[:-1] # Remove last comma ','
mylog('debug', f'[Plugins] Flattened array: {tmp}')
mylog('debug', f'[{module_name}] Flattened array: {tmp}')
return tmp
else:
mylog('none', f'[Plugins] ERROR Could not convert array: {arr}')
mylog('none', f'[{module_name}] ERROR Could not convert array: {arr}')
@@ -128,7 +129,7 @@ def combine_plugin_objects(old, new):
# Replace {wildcars} with parameters
def resolve_wildcards_arr(commandArr, params):
mylog('debug', ['[Plugins] Pre-Resolved CMD: '] + commandArr)
mylog('debug', [f'[{module_name}] Pre-Resolved CMD: '] + commandArr)
for param in params:
# mylog('debug', ['[Plugins] key : {', param[0], '}'])
@@ -181,5 +182,36 @@ def handle_empty(value):
return value
#-------------------------------------------------------------------------------
# Get and return a plugin object based on key-value pairs
# keyValues example: getPluginObject({"Plugin":"MQTT", "Watched_Value4":"someValue"})
def getPluginObject(keyValues):
plugins_objects = apiPath + 'table_plugins_objects.json'
try:
with open(plugins_objects, 'r') as json_file:
data = json.load(json_file)
for item in data.get("data", []):
# Initialize a flag to check if all key-value pairs match
all_match = True
for key, value in keyValues.items():
if item.get(key) != value:
all_match = False
break # No need to continue checking if one pair doesn't match
if all_match:
return item
mylog('debug', [f'[{module_name}] Error - Object not found {json.dumps(keyValues)} '])
return None
except (FileNotFoundError, json.JSONDecodeError, ValueError) as e:
# Handle the case when the file is not found, JSON decoding fails, or data is not in the expected format
mylog('none', [f'[{module_name}] Error - JSONDecodeError or FileNotFoundError for file {plugins_objects}'])
return None