🔃 Sync hub 2.0.1 #788

This commit is contained in:
jokob-sk
2024-09-18 08:48:08 +10:00
parent e9e45c34ae
commit d2fe53bc81
3 changed files with 148 additions and 129 deletions

View File

@@ -48,24 +48,65 @@ def main():
node_name = get_setting_value('SYNC_node_name')
send_devices = get_setting_value('SYNC_devices')
pull_nodes = get_setting_value('SYNC_nodes')
# variables to determine operation mode
is_hub = False
is_node = False
# Check if api_token set
if not api_token:
mylog('verbose', [f'[{pluginName}] ⚠ ERROR api_token not defined - quitting.'])
return -1
# Get all plugin configurations
all_plugins = get_plugins_configs()
# check if this is a hub or a node
if len(hub_url) > 0 and (send_devices or plugins_to_sync):
is_node = True
mylog('verbose', [f'[{pluginName}] Mode 1: PUSH (NODE) - This is a NODE as SYNC_hub_url, SYNC_devices or SYNC_plugins are set'])
if len(pull_nodes) > 0:
is_hub = True
mylog('verbose', [f'[{pluginName}] Mode 2: PULL (HUB) - This is a HUB as SYNC_nodes is set'])
mylog('verbose', [f'[{pluginName}] plugins_to_sync {plugins_to_sync}'])
# Mode 1: PUSH/SEND (NODE)
if is_node:
# PUSHING/SENDING Plugins
# Get all plugin configurations
all_plugins = get_plugins_configs()
# Plugins processing
index = 0
for plugin in all_plugins:
pref = plugin["unique_prefix"]
mylog('verbose', [f'[{pluginName}] plugins_to_sync {plugins_to_sync}'])
for plugin in all_plugins:
pref = plugin["unique_prefix"]
if pref in plugins_to_sync:
index += 1
mylog('verbose', [f'[{pluginName}] synching "{pref}" ({index}/{len(plugins_to_sync)})'])
index = 0
if pref in plugins_to_sync:
index += 1
mylog('verbose', [f'[{pluginName}] synching "{pref}" ({index}/{len(plugins_to_sync)})'])
# Construct the file path for the plugin's last_result.log file
plugin_folder = plugin["code_name"]
file_path = f"{INSTALL_PATH}/front/plugins/{plugin_folder}/last_result.log"
# Construct the file path for the plugin's last_result.log file
plugin_folder = plugin["code_name"]
file_path = f"{INSTALL_PATH}/front/plugins/{plugin_folder}/last_result.log"
if os.path.exists(file_path):
# Read the content of the log file
with open(file_path, 'r') as f:
file_content = f.read()
mylog('verbose', [f'[{pluginName}] Sending file_content: "{file_content}"'])
# encrypt and send data to the hub
send_data(api_token, file_content, encryption_key, plugin_folder, node_name, pref, hub_url)
else:
mylog('verbose', [f'[{pluginName}] {plugin_folder}/last_result.log not found'])
# PUSHING/SENDING devices
if send_devices:
file_path = f"{INSTALL_PATH}/front/api/table_devices.json"
plugin_folder = 'sync'
pref = 'SYNC'
if os.path.exists(file_path):
# Read the content of the log file
@@ -73,39 +114,21 @@ def main():
file_content = f.read()
mylog('verbose', [f'[{pluginName}] Sending file_content: "{file_content}"'])
# encrypt and send data to the hub
send_data(api_token, file_content, encryption_key, plugin_folder, node_name, pref, hub_url)
else:
mylog('verbose', [f'[{pluginName}] SYNC_hub_url not defined, skipping posting "Devices" data'])
else:
mylog('verbose', [f'[{pluginName}] SYNC_hub_url not defined, skipping posting "Plugins" and "Devices" data'])
else:
mylog('verbose', [f'[{pluginName}] {plugin_folder}/last_result.log not found'])
# DEVICES sync
# PUSH/SEND (NODE)
if send_devices:
file_path = f"{INSTALL_PATH}/front/api/table_devices.json"
plugin_folder = 'sync'
pref = 'SYNC'
if os.path.exists(file_path):
# Read the content of the log file
with open(file_path, 'r') as f:
file_content = f.read()
mylog('verbose', [f'[{pluginName}] Sending file_content: "{file_content}"'])
send_data(api_token, file_content, encryption_key, plugin_folder, node_name, pref, hub_url)
# DEVICES sync
# PULL/GET (HUB)
# Mode 2: PULL/GET (HUB)
# PULLING DEVICES
file_dir = os.path.join(pluginsPath, 'sync')
file_prefix = 'last_result'
# pull data from nodes if specified
if len(pull_nodes) > 0:
if is_hub:
for node_url in pull_nodes:
response_json = get_data(api_token, node_url)
@@ -128,104 +151,105 @@ def main():
write_notification(message, 'info', timeNowTZ())
# process any received data for the Device DB table
# Process any received data for the Device DB table
# Create the file path
# Decode files, rename them, and get the list of files
files_to_process = decode_and_rename_files(file_dir, file_prefix)
# Connect to the App database
conn = sqlite3.connect(fullDbPath)
cursor = conn.cursor()
# Collect all unique dev_MAC values from the JSON files
unique_mac_addresses = set()
device_data = []
mylog('verbose', [f'[{pluginName}] Devices files to process: "{files_to_process}"'])
for file_name in files_to_process:
# only process received .log files, skipping the one logging the progress of this plugin
if file_name != 'last_result.log':
mylog('verbose', [f'[{pluginName}] Processing: "{file_name}"'])
# Store e.g. Node_1 from last_result.encoded.Node_1.1.log
tmp_SyncHubNodeName = ''
if len(file_name.split('.')) > 3:
tmp_SyncHubNodeName = file_name.split('.')[2]
file_path = f"{INSTALL_PATH}/front/plugins/sync/{file_name}"
with open(file_path, 'r') as f:
data = json.load(f)
for device in data['data']:
if device['dev_MAC'] not in unique_mac_addresses:
device['dev_SyncHubNodeName'] = tmp_SyncHubNodeName
unique_mac_addresses.add(device['dev_MAC'])
device_data.append(device)
if len(files_to_process) > 0:
mylog('verbose', [f'[{pluginName}] Mode 3: RECEIVE (HUB) - This is a HUB as received data found'])
if len(device_data) > 0:
# Retrieve existing dev_MAC values from the Devices table
placeholders = ', '.join('?' for _ in unique_mac_addresses)
cursor.execute(f'SELECT dev_MAC FROM Devices WHERE dev_MAC IN ({placeholders})', tuple(unique_mac_addresses))
existing_mac_addresses = set(row[0] for row in cursor.fetchall())
# Connect to the App database
conn = sqlite3.connect(fullDbPath)
cursor = conn.cursor()
# insert devices into the lats_result.log to manage state
for device in device_data:
if device['dev_PresentLastScan'] == 1:
plugin_objects.add_object(
primaryId = device['dev_MAC'],
secondaryId = device['dev_LastIP'],
watched1 = device['dev_Name'],
watched2 = device['dev_Vendor'],
watched3 = device['dev_SyncHubNodeName'],
watched4 = device['dev_GUID'],
extra = '',
foreignKey = device['dev_GUID'])
# Collect all unique dev_MAC values from the JSON files
unique_mac_addresses = set()
device_data = []
# Filter out existing devices
new_devices = [device for device in device_data if device['dev_MAC'] not in existing_mac_addresses]
mylog('verbose', [f'[{pluginName}] Devices files to process: "{files_to_process}"'])
# Remove 'rowid' key if it exists
for device in new_devices:
device.pop('rowid', None)
for file_name in files_to_process:
mylog('verbose', [f'[{pluginName}] All devices: "{len(device_data)}"'])
mylog('verbose', [f'[{pluginName}] New devices: "{len(new_devices)}"'])
# only process received .log files, skipping the one logging the progress of this plugin
if file_name != 'last_result.log':
mylog('verbose', [f'[{pluginName}] Processing: "{file_name}"'])
# Prepare the insert statement
if new_devices:
# Store e.g. Node_1 from last_result.encoded.Node_1.1.log
tmp_SyncHubNodeName = ''
if len(file_name.split('.')) > 3:
tmp_SyncHubNodeName = file_name.split('.')[2]
columns = ', '.join(k for k in new_devices[0].keys() if k != 'rowid')
placeholders = ', '.join('?' for k in new_devices[0] if k != 'rowid')
sql = f'INSERT INTO Devices ({columns}) VALUES ({placeholders})'
# Extract values for the new devices
values = [tuple(device.values()) for device in new_devices]
file_path = f"{INSTALL_PATH}/front/plugins/sync/{file_name}"
with open(file_path, 'r') as f:
data = json.load(f)
for device in data['data']:
if device['dev_MAC'] not in unique_mac_addresses:
device['dev_SyncHubNodeName'] = tmp_SyncHubNodeName
unique_mac_addresses.add(device['dev_MAC'])
device_data.append(device)
mylog('verbose', [f'[{pluginName}] Inserting Devices SQL : "{sql}"'])
mylog('verbose', [f'[{pluginName}] Inserting Devices VALUES: "{values}"'])
if len(device_data) > 0:
# Retrieve existing dev_MAC values from the Devices table
placeholders = ', '.join('?' for _ in unique_mac_addresses)
cursor.execute(f'SELECT dev_MAC FROM Devices WHERE dev_MAC IN ({placeholders})', tuple(unique_mac_addresses))
existing_mac_addresses = set(row[0] for row in cursor.fetchall())
# Use executemany for batch insertion
cursor.executemany(sql, values)
# insert devices into the lats_result.log to manage state
for device in device_data:
if device['dev_PresentLastScan'] == 1:
plugin_objects.add_object(
primaryId = device['dev_MAC'],
secondaryId = device['dev_LastIP'],
watched1 = device['dev_Name'],
watched2 = device['dev_Vendor'],
watched3 = device['dev_SyncHubNodeName'],
watched4 = device['dev_GUID'],
extra = '',
foreignKey = device['dev_GUID'])
message = f'[{pluginName}] Inserted "{len(new_devices)}" new devices'
# Filter out existing devices
new_devices = [device for device in device_data if device['dev_MAC'] not in existing_mac_addresses]
mylog('verbose', [message])
write_notification(message, 'info', timeNowTZ())
# Remove 'rowid' key if it exists
for device in new_devices:
device.pop('rowid', None)
# Commit and close the connection
conn.commit()
conn.close()
mylog('verbose', [f'[{pluginName}] All devices: "{len(device_data)}"'])
mylog('verbose', [f'[{pluginName}] New devices: "{len(new_devices)}"'])
# log result
plugin_objects.write_result_file()
# Prepare the insert statement
if new_devices:
columns = ', '.join(k for k in new_devices[0].keys() if k != 'rowid')
placeholders = ', '.join('?' for k in new_devices[0] if k != 'rowid')
sql = f'INSERT INTO Devices ({columns}) VALUES ({placeholders})'
# Extract values for the new devices
values = [tuple(device.values()) for device in new_devices]
mylog('verbose', [f'[{pluginName}] Inserting Devices SQL : "{sql}"'])
mylog('verbose', [f'[{pluginName}] Inserting Devices VALUES: "{values}"'])
# Use executemany for batch insertion
cursor.executemany(sql, values)
message = f'[{pluginName}] Inserted "{len(new_devices)}" new devices'
mylog('verbose', [message])
write_notification(message, 'info', timeNowTZ())
# Commit and close the connection
conn.commit()
conn.close()
# log result
plugin_objects.write_result_file()
return 0
@@ -284,7 +308,7 @@ def get_data(api_token, node_url):
return ""
else:
message = f'[{pluginName}] Failed to send data for "{plugin_folder}" (Status code: {response.status_code})'
message = f'[{pluginName}] Failed to send data for "{node_url}" (Status code: {response.status_code})'
mylog('verbose', [message])
write_notification(message, 'alert', timeNowTZ())
return ""