mirror of
https://github.com/jokob-sk/NetAlertX.git
synced 2025-12-07 01:26:11 -08:00
Merge pull request #810 from ingoratsdorf/contrib
Some checks are pending
docker / docker_dev (push) Waiting to run
Some checks are pending
docker / docker_dev (push) Waiting to run
fixes to MQTT publisher
This commit is contained in:
@@ -273,29 +273,54 @@ def create_sensor(mqtt_client, deviceId, deviceName, sensorType, sensorName, ico
|
||||
#-------------------------------------------------------------------------------
|
||||
def mqtt_create_client():
|
||||
|
||||
# attempt reconnections on failure, ref https://www.emqx.com/en/blog/how-to-use-mqtt-in-python
|
||||
FIRST_RECONNECT_DELAY = 1
|
||||
RECONNECT_RATE = 2
|
||||
MAX_RECONNECT_COUNT = 12
|
||||
MAX_RECONNECT_DELAY = 60
|
||||
|
||||
mytransport = 'tcp' # or 'websockets'
|
||||
|
||||
def on_disconnect(mqtt_client, userdata, reason_code):
|
||||
def on_disconnect(mqtt_client, userdata, rc):
|
||||
|
||||
global mqtt_connected_to_broker
|
||||
|
||||
# REF: If we wanted a auto reconnect, a good source is here: https://www.emqx.com/en/blog/how-to-use-mqtt-in-python
|
||||
mqtt_connected_to_broker = False
|
||||
mylog('debug', [f"[{pluginName}] Connection terminated, reason_code: {reason_code}"])
|
||||
mylog('debug', [f"[{pluginName}] Connection terminated, reason_code: {rc}"])
|
||||
reconnect_count, reconnect_delay = 0, FIRST_RECONNECT_DELAY
|
||||
while reconnect_count < MAX_RECONNECT_COUNT:
|
||||
mylog('debug', [f"[{pluginName}] Reconnecting in {reconnect_delay} seconds..."])
|
||||
time.sleep(reconnect_delay)
|
||||
|
||||
def on_connect(mqtt_client, userdata, flags, reason_code, properties):
|
||||
try:
|
||||
mqtt_client.reconnect()
|
||||
mqtt_connected_to_broker = True # Signal connection
|
||||
mylog('debug', [f"[{pluginName}] Reconnected successfully"])
|
||||
return
|
||||
except Exception as err:
|
||||
mylog('debug', [f"[{pluginName}] {err} Reconnect failed. Retrying..."])
|
||||
pass
|
||||
|
||||
reconnect_delay *= RECONNECT_RATE
|
||||
reconnect_delay = min(reconnect_delay, MAX_RECONNECT_DELAY)
|
||||
reconnect_count += 1
|
||||
|
||||
mqtt_connected_to_broker = False
|
||||
|
||||
|
||||
def on_connect(mqtt_client, userdata, flags, rc, properties):
|
||||
|
||||
global mqtt_connected_to_broker
|
||||
|
||||
# REF: Good docu on reason codes: https://www.emqx.com/en/blog/mqtt5-new-features-reason-code-and-ack
|
||||
if reason_code == 0:
|
||||
if rc == 0:
|
||||
mylog('verbose', [f"[{pluginName}] Connected to broker"])
|
||||
mqtt_connected_to_broker = True # Signal connection
|
||||
else:
|
||||
mylog('verbose', [f"[{pluginName}] Connection failed, reason_code: {reason_code}"])
|
||||
mylog('verbose', [f"[{pluginName}] Connection failed, reason_code: {rc}"])
|
||||
mqtt_connected_to_broker = False
|
||||
|
||||
global mqtt_client
|
||||
global mqtt_connected_to_broker
|
||||
|
||||
# Paho will be soon not supporting V1 anymore, so this really should not be a user choice to start with
|
||||
# This code now uses V2 by default
|
||||
@@ -306,10 +331,13 @@ def mqtt_create_client():
|
||||
else:
|
||||
version = mqtt.MQTTv5
|
||||
|
||||
# we now hardcode the client id into here.
|
||||
# TODO: Add config ffor client id
|
||||
mqtt_client = mqtt.Client(
|
||||
client_id='netalertx',
|
||||
callback_api_version = mqtt.CallbackAPIVersion.VERSION2,
|
||||
transport=mytransport,
|
||||
protocol=mqtt.MQTTv5)
|
||||
protocol=version)
|
||||
mqtt_client.on_connect = on_connect
|
||||
mqtt_client.on_disconnect = on_disconnect
|
||||
|
||||
@@ -317,7 +345,15 @@ def mqtt_create_client():
|
||||
mqtt_client.tls_set()
|
||||
|
||||
mqtt_client.username_pw_set(username = get_setting_value('MQTT_USER'), password = get_setting_value('MQTT_PASSWORD'))
|
||||
mqtt_client.connect(host = get_setting_value('MQTT_BROKER'), port = get_setting_value('MQTT_PORT'))
|
||||
err_code = mqtt_client.connect(host = get_setting_value('MQTT_BROKER'), port = get_setting_value('MQTT_PORT'))
|
||||
if (err_code == mqtt.MQTT_ERR_SUCCESS):
|
||||
# We (prematurely) set the connection state to connected
|
||||
# the callback may be delayed
|
||||
mqtt_connected_to_broker = True
|
||||
# the client connects but connect callbacks will be called async and there may be a waiting time
|
||||
# Mosquitto works straight away
|
||||
# EMQX has a delay and does not update in loop below, so we cannot rely on it, we wait 1 sec
|
||||
time.sleep(1)
|
||||
mqtt_client.loop_start()
|
||||
|
||||
return mqtt_client
|
||||
|
||||
Reference in New Issue
Block a user