This commit is contained in:
jokob-sk
2024-09-27 16:15:36 +10:00

View File

@@ -273,29 +273,54 @@ def create_sensor(mqtt_client, deviceId, deviceName, sensorType, sensorName, ico
#-------------------------------------------------------------------------------
def mqtt_create_client():
# attempt reconnections on failure, ref https://www.emqx.com/en/blog/how-to-use-mqtt-in-python
FIRST_RECONNECT_DELAY = 1
RECONNECT_RATE = 2
MAX_RECONNECT_COUNT = 12
MAX_RECONNECT_DELAY = 60
mytransport = 'tcp' # or 'websockets'
def on_disconnect(mqtt_client, userdata, reason_code):
def on_disconnect(mqtt_client, userdata, rc):
global mqtt_connected_to_broker
# REF: If we wanted a auto reconnect, a good source is here: https://www.emqx.com/en/blog/how-to-use-mqtt-in-python
mqtt_connected_to_broker = False
mylog('debug', [f"[{pluginName}] Connection terminated, reason_code: {reason_code}"])
mylog('debug', [f"[{pluginName}] Connection terminated, reason_code: {rc}"])
reconnect_count, reconnect_delay = 0, FIRST_RECONNECT_DELAY
while reconnect_count < MAX_RECONNECT_COUNT:
mylog('debug', [f"[{pluginName}] Reconnecting in {reconnect_delay} seconds..."])
time.sleep(reconnect_delay)
def on_connect(mqtt_client, userdata, flags, reason_code, properties):
try:
mqtt_client.reconnect()
mqtt_connected_to_broker = True # Signal connection
mylog('debug', [f"[{pluginName}] Reconnected successfully"])
return
except Exception as err:
mylog('debug', [f"[{pluginName}] {err} Reconnect failed. Retrying..."])
pass
reconnect_delay *= RECONNECT_RATE
reconnect_delay = min(reconnect_delay, MAX_RECONNECT_DELAY)
reconnect_count += 1
mqtt_connected_to_broker = False
def on_connect(mqtt_client, userdata, flags, rc, properties):
global mqtt_connected_to_broker
# REF: Good docu on reason codes: https://www.emqx.com/en/blog/mqtt5-new-features-reason-code-and-ack
if reason_code == 0:
if rc == 0:
mylog('verbose', [f"[{pluginName}] Connected to broker"])
mqtt_connected_to_broker = True # Signal connection
else:
mylog('verbose', [f"[{pluginName}] Connection failed, reason_code: {reason_code}"])
mylog('verbose', [f"[{pluginName}] Connection failed, reason_code: {rc}"])
mqtt_connected_to_broker = False
global mqtt_client
global mqtt_connected_to_broker
# Paho will be soon not supporting V1 anymore, so this really should not be a user choice to start with
# This code now uses V2 by default
@@ -306,10 +331,13 @@ def mqtt_create_client():
else:
version = mqtt.MQTTv5
# we now hardcode the client id into here.
# TODO: Add config ffor client id
mqtt_client = mqtt.Client(
client_id='netalertx',
callback_api_version = mqtt.CallbackAPIVersion.VERSION2,
transport=mytransport,
protocol=mqtt.MQTTv5)
protocol=version)
mqtt_client.on_connect = on_connect
mqtt_client.on_disconnect = on_disconnect
@@ -317,7 +345,15 @@ def mqtt_create_client():
mqtt_client.tls_set()
mqtt_client.username_pw_set(username = get_setting_value('MQTT_USER'), password = get_setting_value('MQTT_PASSWORD'))
mqtt_client.connect(host = get_setting_value('MQTT_BROKER'), port = get_setting_value('MQTT_PORT'))
err_code = mqtt_client.connect(host = get_setting_value('MQTT_BROKER'), port = get_setting_value('MQTT_PORT'))
if (err_code == mqtt.MQTT_ERR_SUCCESS):
# We (prematurely) set the connection state to connected
# the callback may be delayed
mqtt_connected_to_broker = True
# the client connects but connect callbacks will be called async and there may be a waiting time
# Mosquitto works straight away
# EMQX has a delay and does not update in loop below, so we cannot rely on it, we wait 1 sec
time.sleep(1)
mqtt_client.loop_start()
return mqtt_client