From b7b2e0bc6572fc66c9c13079b71c6aa9abbd7bac Mon Sep 17 00:00:00 2001 From: Ingo Ratsdorf Date: Fri, 27 Sep 2024 12:24:46 +1200 Subject: [PATCH] fixes to MQTT publisher This wasn't working for EMQX due to callback trigger delays it never connected. Also added a reconnect feature and a client id so it looks better in the EMQX connection dashboard. No confirmed to be working with Mosquitto and EMQX --- front/plugins/_publisher_mqtt/mqtt.py | 54 ++++++++++++++++++++++----- 1 file changed, 45 insertions(+), 9 deletions(-) diff --git a/front/plugins/_publisher_mqtt/mqtt.py b/front/plugins/_publisher_mqtt/mqtt.py index 20df275c..b29b10f3 100755 --- a/front/plugins/_publisher_mqtt/mqtt.py +++ b/front/plugins/_publisher_mqtt/mqtt.py @@ -273,29 +273,54 @@ def create_sensor(mqtt_client, deviceId, deviceName, sensorType, sensorName, ico #------------------------------------------------------------------------------- def mqtt_create_client(): + # attempt reconnections on failure, ref https://www.emqx.com/en/blog/how-to-use-mqtt-in-python + FIRST_RECONNECT_DELAY = 1 + RECONNECT_RATE = 2 + MAX_RECONNECT_COUNT = 12 + MAX_RECONNECT_DELAY = 60 + mytransport = 'tcp' # or 'websockets' - def on_disconnect(mqtt_client, userdata, reason_code): + def on_disconnect(mqtt_client, userdata, rc): global mqtt_connected_to_broker - # REF: If we wanted a auto reconnect, a good source is here: https://www.emqx.com/en/blog/how-to-use-mqtt-in-python - mqtt_connected_to_broker = False - mylog('debug', [f"[{pluginName}] Connection terminated, reason_code: {reason_code}"]) + mylog('debug', [f"[{pluginName}] Connection terminated, reason_code: {rc}"]) + reconnect_count, reconnect_delay = 0, FIRST_RECONNECT_DELAY + while reconnect_count < MAX_RECONNECT_COUNT: + mylog('debug', [f"[{pluginName}] Reconnecting in {reconnect_delay} seconds..."]) + time.sleep(reconnect_delay) - def on_connect(mqtt_client, userdata, flags, reason_code, properties): + try: + mqtt_client.reconnect() + mqtt_connected_to_broker = True # Signal connection + mylog('debug', [f"[{pluginName}] Reconnected successfully"]) + return + except Exception as err: + mylog('debug', [f"[{pluginName}] {err} Reconnect failed. Retrying..."]) + pass + + reconnect_delay *= RECONNECT_RATE + reconnect_delay = min(reconnect_delay, MAX_RECONNECT_DELAY) + reconnect_count += 1 + + mqtt_connected_to_broker = False + + + def on_connect(mqtt_client, userdata, flags, rc, properties): global mqtt_connected_to_broker # REF: Good docu on reason codes: https://www.emqx.com/en/blog/mqtt5-new-features-reason-code-and-ack - if reason_code == 0: + if rc == 0: mylog('verbose', [f"[{pluginName}] Connected to broker"]) mqtt_connected_to_broker = True # Signal connection else: - mylog('verbose', [f"[{pluginName}] Connection failed, reason_code: {reason_code}"]) + mylog('verbose', [f"[{pluginName}] Connection failed, reason_code: {rc}"]) mqtt_connected_to_broker = False global mqtt_client + global mqtt_connected_to_broker # Paho will be soon not supporting V1 anymore, so this really should not be a user choice to start with # This code now uses V2 by default @@ -306,10 +331,13 @@ def mqtt_create_client(): else: version = mqtt.MQTTv5 + # we now hardcode the client id into here. + # TODO: Add config ffor client id mqtt_client = mqtt.Client( + client_id='netalertx', callback_api_version = mqtt.CallbackAPIVersion.VERSION2, transport=mytransport, - protocol=mqtt.MQTTv5) + protocol=version) mqtt_client.on_connect = on_connect mqtt_client.on_disconnect = on_disconnect @@ -317,7 +345,15 @@ def mqtt_create_client(): mqtt_client.tls_set() mqtt_client.username_pw_set(username = get_setting_value('MQTT_USER'), password = get_setting_value('MQTT_PASSWORD')) - mqtt_client.connect(host = get_setting_value('MQTT_BROKER'), port = get_setting_value('MQTT_PORT')) + err_code = mqtt_client.connect(host = get_setting_value('MQTT_BROKER'), port = get_setting_value('MQTT_PORT')) + if (err_code == mqtt.MQTT_ERR_SUCCESS): + # We (prematurely) set the connection state to connected + # the callback may be delayed + mqtt_connected_to_broker = True + # the client connects but connect callbacks will be called async and there may be a waiting time + # Mosquitto works straight away + # EMQX has a delay and does not update in loop below, so we cannot rely on it, we wait 1 sec + time.sleep(1) mqtt_client.loop_start() return mqtt_client