🔃 Sync Hub v0.6.5 - Devices table + Permissions fixes

This commit is contained in:
jokob-sk
2024-06-07 20:10:05 +10:00
parent b2d38c1c55
commit 1e3cea0f7f
5 changed files with 242 additions and 125 deletions

View File

@@ -93,10 +93,10 @@ fi
# Create an empty log files # Create an empty log files
# Create the execution_queue.log and app_front.log files if they don't exist # Create the execution_queue.log and app_front.log files if they don't exist
touch "${INSTALL_DIR}"/front/log/{app.log,execution_queue.log,app_front.log,app.php_errors.log,stderr.log,stdout.log,db_is_locked.log} touch "${INSTALL_DIR}"/front/log/{app.log,execution_queue.log,app_front.log,app.php_errors.log,stderr.log,stdout.log,db_is_locked.log}
touch "${INSTALL_DIR}"/api/{user_notifications.json} touch "${INSTALL_DIR}"/front/api/{user_notifications.json}
echo "[INSTALL] Fixing permissions after copied starter config & DB" echo "[INSTALL] Fixing permissions after copied starter config & DB"
chown -R nginx:www-data "${INSTALL_DIR}"/{config,front/log,db} chown -R nginx:www-data "${INSTALL_DIR}"/{config,front/log,db,front/api}
chmod 750 "${INSTALL_DIR}"/{config,front/log,db} chmod 750 "${INSTALL_DIR}"/{config,front/log,db}
find "${INSTALL_DIR}"/{config,front/log,db} -type f -exec chmod 640 {} \; find "${INSTALL_DIR}"/{config,front/log,db} -type f -exec chmod 640 {} \;

View File

@@ -379,6 +379,25 @@
} }
] ]
}, },
{
"function": "devices",
"type": "boolean",
"default_value": false,
"options": [],
"localized": ["name", "description"],
"name": [
{
"language_code": "en_us",
"string": "Send Devices"
}
],
"description": [
{
"language_code": "en_us",
"string": "When enabled the whole Devices table is sent over. Only new devices with new MACs are inserted in the target hub."
}
]
},
{ {
"function": "plugins", "function": "plugins",
"type": "text.multiselect", "type": "text.multiselect",
@@ -402,7 +421,7 @@
"name": [ "name": [
{ {
"language_code": "en_us", "language_code": "en_us",
"string": "Plugins" "string": "Send Plugins"
} }
], ],
"description": [ "description": [

View File

@@ -5,6 +5,8 @@ import pathlib
import sys import sys
import hashlib import hashlib
import requests import requests
import json
import sqlite3
# Define the installation path and extend the system path for plugin imports # Define the installation path and extend the system path for plugin imports
@@ -12,8 +14,9 @@ INSTALL_PATH = "/app"
sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"]) sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
from plugin_helper import Plugin_Object, Plugin_Objects, decodeBase64 from plugin_helper import Plugin_Object, Plugin_Objects, decodeBase64
from plugin_utils import get_plugins_configs from plugin_utils import get_plugins_configs, decode_and_rename_files
from logger import mylog from logger import mylog
from const import pluginsPath, fullDbPath
from helper import timeNowTZ, get_setting_value from helper import timeNowTZ, get_setting_value
from cryptography import encrypt_data from cryptography import encrypt_data
@@ -22,21 +25,21 @@ CUR_PATH = str(pathlib.Path(__file__).parent.resolve())
LOG_FILE = os.path.join(CUR_PATH, 'script.log') LOG_FILE = os.path.join(CUR_PATH, 'script.log')
RESULT_FILE = os.path.join(CUR_PATH, 'last_result.log') RESULT_FILE = os.path.join(CUR_PATH, 'last_result.log')
# Initialize the Plugin obj output file
plugin_objects = Plugin_Objects(RESULT_FILE)
pluginName = 'SYNC' pluginName = 'SYNC'
def main(): def main():
mylog('verbose', [f'[{pluginName}] In script']) mylog('verbose', [f'[{pluginName}] In script'])
# Initialize the Plugin obj output file
plugin_objects = Plugin_Objects(RESULT_FILE)
# Retrieve configuration settings # Retrieve configuration settings
plugins_to_sync = get_setting_value('SYNC_plugins') plugins_to_sync = get_setting_value('SYNC_plugins')
api_token = get_setting_value('SYNC_api_token') api_token = get_setting_value('SYNC_api_token')
encryption_key = get_setting_value('SYNC_encryption_key') encryption_key = get_setting_value('SYNC_encryption_key')
hub_url = get_setting_value('SYNC_hub_url') hub_url = get_setting_value('SYNC_hub_url')
node_name = get_setting_value('SYNC_node_name') node_name = get_setting_value('SYNC_node_name')
send_devices = get_setting_value('SYNC_devices')
# Get all plugin configurations # Get all plugin configurations
all_plugins = get_plugins_configs() all_plugins = get_plugins_configs()
@@ -44,6 +47,7 @@ def main():
mylog('verbose', [f'[{pluginName}] DEBUG {len(all_plugins)}']) mylog('verbose', [f'[{pluginName}] DEBUG {len(all_plugins)}'])
mylog('verbose', [f'[{pluginName}] plugins_to_sync {plugins_to_sync}']) mylog('verbose', [f'[{pluginName}] plugins_to_sync {plugins_to_sync}'])
# Plugins processing
index = 0 index = 0
for plugin in all_plugins: for plugin in all_plugins:
pref = plugin["unique_prefix"] pref = plugin["unique_prefix"]
@@ -63,47 +67,130 @@ def main():
mylog('verbose', [f'[{pluginName}] Sending file_content: "{file_content}"']) mylog('verbose', [f'[{pluginName}] Sending file_content: "{file_content}"'])
# Encrypt the log data using the encryption_key # encrypt and send data to the hub
encrypted_data = encrypt_data(file_content, encryption_key) send_data(api_token, file_content, encryption_key, plugin_folder, node_name, pref, hub_url)
mylog('verbose', [f'[{pluginName}] Sending encrypted_data: "{encrypted_data}"'])
# Prepare the data payload for the POST request
data = {
'data': encrypted_data,
'plugin_folder': plugin_folder,
'node_name': node_name
}
# Set the authorization header with the API token
headers = {'Authorization': f'Bearer {api_token}'}
api_endpoint = f"{hub_url}/plugins/sync/hub.php"
response = requests.post(api_endpoint, data=data, headers=headers)
mylog('verbose', [f'[{pluginName}] response: "{response}"'])
if response.status_code == 200:
mylog('verbose', [f'[{pluginName}] Data for "{plugin_folder}" sent successfully'])
else:
mylog('verbose', [f'[{pluginName}] Failed to send data for "{plugin_folder}"'])
# log result
plugin_objects.add_object(
primaryId = pref,
secondaryId = timeNowTZ(),
watched1 = node_name,
watched2 = response.status_code,
watched3 = response,
watched4 = '',
extra = '',
foreignKey = '')
else: else:
mylog('verbose', [f'[{pluginName}] {plugin_folder}/last_result.log not found']) mylog('verbose', [f'[{pluginName}] {plugin_folder}/last_result.log not found'])
# Devices procesing
if send_devices:
file_path = f"{INSTALL_PATH}/front/api/table_devices.json"
plugin_folder = 'sync'
pref = 'SYNC'
if os.path.exists(file_path):
# Read the content of the log file
with open(file_path, 'r') as f:
file_content = f.read()
mylog('verbose', [f'[{pluginName}] Sending file_content: "{file_content}"'])
send_data(api_token, file_content, encryption_key, plugin_folder, node_name, pref, hub_url)
# process any received data for the Device DB table
# Create the file path
file_dir = os.path.join(pluginsPath, 'sync')
file_prefix = 'last_result'
# Decode files, rename them, and get the list of files
files_to_process = decode_and_rename_files(file_dir, file_prefix)
# Connect to the App database
conn = sqlite3.connect(fullDbPath)
cursor = conn.cursor()
# Collect all unique dev_MAC values from the JSON files
unique_mac_addresses = set()
device_data = []
for file_path in files_to_process:
# only process received .log files, skipping the one logging the progress of this plugin
if file_path != 'last_result.log':
mylog('verbose', [f'[{pluginName}] Processing: "{file_path}"'])
# Store e.g. Node_1 from last_result.encoded.Node_1.1.log
tmp_SyncHubNodeName = ''
if len(filename.split('.')) > 3:
tmp_SyncHubNodeName = filename.split('.')[2]
with open(file_path, 'r') as f:
data = json.load(f)
for device in data['data']:
device['dev_SyncHubNodeName'] = tmp_SyncHubNodeName
unique_mac_addresses.add(device['dev_MAC'])
device_data.append(device)
if len(device_data) > 0:
# Retrieve existing dev_MAC values from the Devices table
placeholders = ', '.join('?' for _ in unique_mac_addresses)
cursor.execute(f'SELECT dev_MAC FROM Devices WHERE dev_MAC IN ({placeholders})', tuple(unique_mac_addresses))
existing_mac_addresses = set(row[0] for row in cursor.fetchall())
# Filter out existing devices
new_devices = [device for device in device_data if device['dev_MAC'] not in existing_mac_addresses]
# Prepare the insert statement
if new_devices:
columns = ', '.join(new_devices[0].keys())
placeholders = ', '.join('?' for _ in new_devices[0])
sql = f'INSERT INTO Devices ({columns}) VALUES ({placeholders})'
# Extract values for the new devices
values = [tuple(device.values()) for device in new_devices]
mylog('verbose', [f'[{pluginName}] Inserting Devices SQL : "{sql}"'])
mylog('verbose', [f'[{pluginName}] Inserting Devices VALUES: "{values}"'])
# Use executemany for batch insertion
cursor.executemany(sql, values)
# Commit and close the connection
conn.commit()
conn.close()
# log result # log result
plugin_objects.write_result_file() plugin_objects.write_result_file()
return 0 return 0
def send_data(api_token, file_content, encryption_key, plugin_folder, node_name, pref, hub_url):
# Encrypt the log data using the encryption_key
encrypted_data = encrypt_data(file_content, encryption_key)
mylog('verbose', [f'[{pluginName}] Sending encrypted_data: "{encrypted_data}"'])
# Prepare the data payload for the POST request
data = {
'data': encrypted_data,
'plugin_folder': plugin_folder,
'node_name': node_name
}
# Set the authorization header with the API token
headers = {'Authorization': f'Bearer {api_token}'}
api_endpoint = f"{hub_url}/plugins/sync/hub.php"
response = requests.post(api_endpoint, data=data, headers=headers)
mylog('verbose', [f'[{pluginName}] response: "{response}"'])
if response.status_code == 200:
mylog('verbose', [f'[{pluginName}] Data for "{plugin_folder}" sent successfully'])
else:
mylog('verbose', [f'[{pluginName}] Failed to send data for "{plugin_folder}"'])
# log result
plugin_objects.add_object(
primaryId = pref,
secondaryId = timeNowTZ(),
watched1 = node_name,
watched2 = response.status_code,
watched3 = response,
watched4 = '',
extra = '',
foreignKey = '')
if __name__ == '__main__': if __name__ == '__main__':
main() main()

View File

@@ -13,7 +13,7 @@ from const import pluginsPath, logPath, applicationPath, reportTemplatesPath
from logger import mylog from logger import mylog
from helper import timeNowTZ, updateState, get_file_content, write_file, get_setting, get_setting_value from helper import timeNowTZ, updateState, get_file_content, write_file, get_setting, get_setting_value
from api import update_api from api import update_api
from plugin_utils import logEventStatusCounts, get_plugin_string, get_plugin_setting_obj, print_plugin_info, list_to_csv, combine_plugin_objects, resolve_wildcards_arr, handle_empty, custom_plugin_decoder from plugin_utils import logEventStatusCounts, get_plugin_string, get_plugin_setting_obj, print_plugin_info, list_to_csv, combine_plugin_objects, resolve_wildcards_arr, handle_empty, custom_plugin_decoder, decode_and_rename_files
from notification import Notification_obj from notification import Notification_obj
from cryptography import decrypt_data from cryptography import decrypt_data
@@ -211,17 +211,15 @@ def execute_plugin(db, all_plugins, plugin, pluginsState = plugins_state() ):
mylog('debug', ['[Plugins] Resolved : ', command]) mylog('debug', ['[Plugins] Resolved : ', command])
try: try:
# try runnning a subprocess with a forced timeout in case the subprocess hangs # try running a subprocess with a forced timeout in case the subprocess hangs
output = subprocess.check_output (command, universal_newlines=True, stderr=subprocess.STDOUT, timeout=(set_RUN_TIMEOUT)) output = subprocess.check_output(command, universal_newlines=True, stderr=subprocess.STDOUT, timeout=(set_RUN_TIMEOUT))
except subprocess.CalledProcessError as e: except subprocess.CalledProcessError as e:
# An error occured, handle it # An error occurred, handle it
mylog('none', [e.output]) mylog('none', [e.output])
mylog('none', ['[Plugins] ⚠ ERROR - enable LOG_LEVEL=debug and check logs']) mylog('none', ['[Plugins] ⚠ ERROR - enable LOG_LEVEL=debug and check logs'])
except subprocess.TimeoutExpired as timeErr: except subprocess.TimeoutExpired as timeErr:
mylog('none', [f'[Plugins] ⚠ ERROR - TIMEOUT - the plugin {plugin["unique_prefix"]} forcefully terminated as timeout reached. Increase TIMEOUT setting and scan interval.']) mylog('none', [f'[Plugins] ⚠ ERROR - TIMEOUT - the plugin {plugin["unique_prefix"]} forcefully terminated as timeout reached. Increase TIMEOUT setting and scan interval.'])
# check the last run output
# Initialize newLines # Initialize newLines
newLines = [] newLines = []
@@ -229,85 +227,52 @@ def execute_plugin(db, all_plugins, plugin, pluginsState = plugins_state() ):
file_dir = os.path.join(pluginsPath, plugin["code_name"]) file_dir = os.path.join(pluginsPath, plugin["code_name"])
file_prefix = 'last_result' file_prefix = 'last_result'
# key to decrypt data if available
encryption_key = get_setting_value('SYNC_encryption_key')
# Check for files starting with the specified prefix # Decode files, rename them, and get the list of files
matching_files = [f for f in os.listdir(file_dir) if f.startswith(file_prefix)] files_to_process = decode_and_rename_files(file_dir, file_prefix)
for filename in matching_files: for filename in files_to_process:
# Create the full file path # Open the decrypted file and process its contents
file_path = os.path.join(file_dir, filename) with open(os.path.join(file_dir, filename), 'r') as f:
newLines = f.read().split('\n')
# if the script produced some output, clean it up to ensure it's the correct format
# cleanup - select only lines containing a separator to filter out unnecessary data
newLines = list(filter(lambda x: '|' in x, newLines))
# Store e.g. Node_1 from last_result.encoded.Node_1.1.log
tmp_SyncHubNodeName = ''
if len(filename.split('.')) > 3:
tmp_SyncHubNodeName = filename.split('.')[2]
# Check if the file exists for line in newLines:
if os.path.exists(file_path): columns = line.split("|")
# There have to be always 9 columns
tmp_SyncHubNodeName = 'null' if len(columns) == 9:
# Create a tuple containing values to be inserted into the database.
# Check if the file name contains "encoded" # Each value corresponds to a column in the table in the order of the columns.
if '.encoded.' in filename and encryption_key != '': # must match the Plugins_Objects and Plugins_Events database tables and can be used as input for the plugin_object_class.
sqlParams.append(
# store e.g. Node_1 from last_result.encoded.Node_1.1.log (
tmp_SyncHubNodeName = filename.split('.')[2] 0, # "Index" placeholder
plugin["unique_prefix"], # "Plugin" column value from the plugin dictionary
# Decrypt the entire file columns[0], # "Object_PrimaryID" value from columns list
with open(file_path, 'r+') as f: columns[1], # "Object_SecondaryID" value from columns list
encrypted_data = f.read() 'null', # Placeholder for "DateTimeCreated" column
decrypted_data = decrypt_data(encrypted_data, encryption_key) columns[2], # "DateTimeChanged" value from columns list
columns[3], # "Watched_Value1" value from columns list
# Write the decrypted data back to the file columns[4], # "Watched_Value2" value from columns list
f.seek(0) columns[5], # "Watched_Value3" value from columns list
f.write(decrypted_data) columns[6], # "Watched_Value4" value from columns list
f.truncate() 'not-processed', # "Status" column (placeholder)
columns[7], # "Extra" value from columns list
# Rename the file e.g. from last_result.encoded.Node_1.1.log to last_result.decoded.Node_1.1.log 'null', # Placeholder for "UserData" column
new_filename = filename.replace('.encoded.', '.decoded.') columns[8], # "ForeignKey" value from columns list
os.rename(file_path, os.path.join(file_dir, new_filename)) tmp_SyncHubNodeName # Sync Hub Node name
elif filename == 'last_result.log' :
new_filename = filename
else:
# skipping decoded and other files
continue
# Open the decrypted file and process its contents
with open(os.path.join(file_dir, new_filename), 'r') as f:
newLines = f.read().split('\n')
# if the script produced some outpout, clean it up to ensure it's the correct format
# cleanup - select only lines containing a separator to filter out unnecessary data
newLines = list(filter(lambda x: '|' in x, newLines))
for line in newLines:
columns = line.split("|")
# There has to be always 9 columns
if len(columns) == 9:
# Create a tuple containing values to be inserted into the database.
# Each value corresponds to a column in the table in the order of the columns.
# must match the Plugins_Objects and Plugins_Events database tables and can be used as input for the plugin_object_class.
sqlParams.append(
(
0, # "Index" placeholder
plugin["unique_prefix"], # "Plugin" column value from the plugin dictionary
columns[0], # "Object_PrimaryID" value from columns list
columns[1], # "Object_SecondaryID" value from columns list
'null', # Placeholder for "DateTimeCreated" column
columns[2], # "DateTimeChanged" value from columns list
columns[3], # "Watched_Value1" value from columns list
columns[4], # "Watched_Value2" value from columns list
columns[5], # "Watched_Value3" value from columns list
columns[6], # "Watched_Value4" value from columns list
'not-processed', # "Status" column (placeholder)
columns[7], # "Extra" value from columns list
'null', # Placeholder for "UserData" column
columns[8], # "ForeignKey" value from columns list
tmp_SyncHubNodeName # Sync Hub Node name
)
) )
else: )
mylog('none', ['[Plugins] Skipped invalid line in the output: ', line]) else:
else: mylog('none', ['[Plugins] Skipped invalid line in the output: ', line])
mylog('debug', [f'[Plugins] The file {file_path} does not exist'])
# TODO: delete processed files # TODO: delete processed files
# os.rename(file_path, os.path.join(file_dir, new_filename)) # os.rename(file_path, os.path.join(file_dir, new_filename))
@@ -430,9 +395,6 @@ def execute_plugin(db, all_plugins, plugin, pluginsState = plugins_state() ):
return pluginsState return pluginsState
#------------------------------------------------------------------------------- #-------------------------------------------------------------------------------
# Check if watched values changed for the given plugin # Check if watched values changed for the given plugin
def process_plugin_events(db, plugin, pluginsState, plugEventsArr): def process_plugin_events(db, plugin, pluginsState, plugEventsArr):

View File

@@ -243,4 +243,53 @@ def getPluginObject(keyValues):
return {} return {}
# ------------------------------------------------------------------
# decode any encoded last_result files
def decode_and_rename_files(file_dir, file_prefix):
"""
Decodes and renames files in the specified directory if they are encrypted.
Returns a list of files to be processed and the Sync Hub Node name.
"""
# Initialize the list of files to be processed and Sync Hub Node name
files_to_process = []
# key to decrypt data if available
encryption_key = get_setting_value('SYNC_encryption_key')
# Check for files starting with the specified prefix
matching_files = [f for f in os.listdir(file_dir) if f.startswith(file_prefix)]
for filename in matching_files:
# Create the full file path
file_path = os.path.join(file_dir, filename)
# Check if the file exists
if os.path.exists(file_path):
# Check if the file name contains "encoded"
if '.encoded.' in filename and encryption_key:
# Decrypt the entire file
with open(file_path, 'r+') as f:
encrypted_data = f.read()
decrypted_data = decrypt_data(encrypted_data, encryption_key)
# Write the decrypted data back to the file
f.seek(0)
f.write(decrypted_data)
f.truncate()
# Rename the file e.g. from last_result.encoded.Node_1.1.log to last_result.decoded.Node_1.1.log
new_filename = filename.replace('.encoded.', '.decoded.')
os.rename(file_path, os.path.join(file_dir, new_filename))
files_to_process.append(new_filename)
elif filename == 'last_result.log':
files_to_process.append(filename)
else:
# Skipping decoded and other files
continue
else:
mylog('debug', [f'[Plugins] The file {file_path} does not exist'])
return files_to_process