From c8a40920b4dc81371c818e8cbbe4213590a8347e Mon Sep 17 00:00:00 2001 From: jokob-sk Date: Mon, 20 Jan 2025 23:42:24 +1100 Subject: [PATCH] cleanup, faster devices screen update #967 #923 --- server/__main__.py | 37 +++--- server/api.py | 17 ++- server/app_state.py | 109 ++++++++++++++++++ server/database.py | 2 +- server/graphql_server/graphql_server_start.py | 3 +- server/helper.py | 96 --------------- server/initialise.py | 7 +- server/plugin.py | 82 ++++++------- server/plugin_utils.py | 3 +- ...{execution_log.py => user_events_queue.py} | 15 ++- 10 files changed, 198 insertions(+), 173 deletions(-) create mode 100755 server/app_state.py rename server/{execution_log.py => user_events_queue.py} (84%) diff --git a/server/__main__.py b/server/__main__.py index bc57bd45..7ecb18fa 100755 --- a/server/__main__.py +++ b/server/__main__.py @@ -25,7 +25,8 @@ import subprocess import conf from const import * from logger import mylog -from helper import filePermissions, timeNowTZ, updateState, get_setting_value +from helper import filePermissions, timeNowTZ, get_setting_value +from app_state import updateState from api import update_api from networkscan import process_scan from initialise import importConfigs @@ -89,7 +90,7 @@ def main (): while True: # re-load user configuration and plugins - all_plugins = importConfigs(db, all_plugins) + all_plugins, imported = importConfigs(db, all_plugins) # update time started conf.loop_start_time = timeNowTZ() @@ -98,11 +99,11 @@ def main (): # Handle plugins executed ONCE if conf.plugins_once_run == False: - pluginsState = run_plugin_scripts(db, all_plugins, 'once') + run_plugin_scripts(db, all_plugins, 'once') conf.plugins_once_run = True - - # check if there is a front end initiated event which needs to be executed - pluginsState = check_and_run_user_event(db, all_plugins, pluginsState) + + # check if user is waiting for api_update + check_and_run_user_event(db, all_plugins) # Update API endpoints update_api(db, all_plugins, False) @@ -121,27 +122,27 @@ def main (): startTime = startTime.replace (microsecond=0) # Check if any plugins need to run on schedule - pluginsState = run_plugin_scripts(db, all_plugins, 'schedule', pluginsState) + run_plugin_scripts(db, all_plugins, 'schedule') # determine run/scan type based on passed time # -------------------------------------------- - # Runs plugin scripts which are set to run every timne after a scans finished - pluginsState = run_plugin_scripts(db, all_plugins, 'always_after_scan', pluginsState) - + # Runs plugin scripts which are set to run every time after a scans finished + run_plugin_scripts(db, all_plugins, 'always_after_scan') # process all the scanned data into new devices - mylog('debug', [f'[MAIN] processScan: {pluginsState.processScan}']) + processScan = updateState("Check scan").processScan + mylog('debug', [f'[MAIN] processScan: {processScan}']) - if pluginsState.processScan == True: - mylog('debug', "[MAIN] start processig scan results") - pluginsState.processScan = False + if processScan == True: + mylog('debug', "[MAIN] start processig scan results") process_scan(db) + updateState("Scan processed", None, None, None, None, False) # -------- # Reporting # run plugins before notification processing (e.g. Plugins to discover device names) - pluginsState = run_plugin_scripts(db, all_plugins, 'before_name_updates', pluginsState) + run_plugin_scripts(db, all_plugins, 'before_name_updates') # Resolve devices names mylog('debug','[Main] Resolve devices names') @@ -155,7 +156,7 @@ def main (): # new devices were found if len(newDevices) > 0: # run all plugins registered to be run when new devices are found - pluginsState = run_plugin_scripts(db, all_plugins, 'on_new_device', pluginsState) + run_plugin_scripts(db, all_plugins, 'on_new_device') # Notification handling # ---------------------------------------- @@ -170,12 +171,10 @@ def main (): # run all enabled publisher gateways if notificationObj.HasNotifications: - pluginsState = run_plugin_scripts(db, all_plugins, 'on_notification', pluginsState) + run_plugin_scripts(db, all_plugins, 'on_notification') notification.setAllProcessed() notification.clearPendingEmailFlag() - - else: mylog('verbose', ['[Notification] No changes to report']) diff --git a/server/api.py b/server/api.py index 86ebc107..35bb31fc 100755 --- a/server/api.py +++ b/server/api.py @@ -7,8 +7,9 @@ import datetime import conf from const import (apiPath, sql_appevents, sql_devices_all, sql_events_pending_alert, sql_settings, sql_plugins_events, sql_plugins_history, sql_plugins_objects,sql_language_strings, sql_notifications_all, sql_online_history, sql_devices_tiles) from logger import mylog -from helper import write_file, get_setting_value, updateState, timeNowTZ -from execution_log import ExecutionLog +from helper import write_file, get_setting_value, timeNowTZ +from app_state import updateState +from user_events_queue import UserEventsQueue from notification import write_notification # Import the start_server function @@ -31,12 +32,10 @@ def update_api(db, all_plugins, forceUpdate, updateOnlyDataSources=[], is_ad_hoc start_periodic_write(interval=1) # Update app_state.json and retrieve app_state to check if GraphQL server is running - app_state = updateState("Update: API", None, None, None, None) + app_state = updateState() - folder = apiPath - # Save plugins - write_file(folder + 'plugins.json', json.dumps({"data": all_plugins})) + write_file(apiPath + 'plugins.json', json.dumps({"data": all_plugins})) # Prepare database tables we want to expose dataSourcesSQLs = [ @@ -57,7 +56,7 @@ def update_api(db, all_plugins, forceUpdate, updateOnlyDataSources=[], is_ad_hoc # Save selected database tables for dsSQL in dataSourcesSQLs: if not updateOnlyDataSources or dsSQL[0] in updateOnlyDataSources: - api_endpoint_class(db, forceUpdate, dsSQL[1], folder + 'table_' + dsSQL[0] + '.json', is_ad_hoc_user_event) + api_endpoint_class(db, forceUpdate, dsSQL[1], apiPath + 'table_' + dsSQL[0] + '.json', is_ad_hoc_user_event) # Start the GraphQL server graphql_port_value = get_setting_value("GRAPHQL_PORT") @@ -87,7 +86,7 @@ class api_endpoint_class: self.path = path self.fileName = path.split('/')[-1] self.hash = hash(json.dumps(self.jsonData)) - self.debounce_interval = 5 # Time to wait before writing + self.debounce_interval = 3 # Time in seconds to wait before writing self.changeDetectedWhen = None # self.last_update_time = current_time - datetime.timedelta(minutes=1) # Last time data was updated self.is_ad_hoc_user_event = is_ad_hoc_user_event @@ -147,7 +146,7 @@ class api_endpoint_class: # Update user event execution log # mylog('verbose', [f'[API] api_endpoint_class: is_ad_hoc_user_event {self.is_ad_hoc_user_event}']) if self.is_ad_hoc_user_event: - execution_log = ExecutionLog() + execution_log = UserEventsQueue() execution_log.finalize_event("update_api") self.is_ad_hoc_user_event = False diff --git a/server/app_state.py b/server/app_state.py new file mode 100755 index 00000000..f02939c0 --- /dev/null +++ b/server/app_state.py @@ -0,0 +1,109 @@ +import os +import json + +import conf +from const import * +from logger import mylog, logResult +from helper import timeNowTZ, timeNow, checkNewVersion + +# Register NetAlertX directories +INSTALL_PATH="/app" + + +#------------------------------------------------------------------------------- +# App state +#------------------------------------------------------------------------------- +# A class to manage the application state and to provide a frontend accessible API point +# To keep an existing value pass None +class app_state_class: + def __init__(self, currentState = None, settingsSaved=None, settingsImported=None, showSpinner=False, graphQLServerStarted=0, processScan=False): + # json file containing the state to communicate with the frontend + stateFile = apiPath + 'app_state.json' + previousState = "" + + # Update self + self.lastUpdated = str(timeNowTZ()) + + if os.path.exists(stateFile): + try: + with open(stateFile, 'r') as json_file: + previousState = json.load(json_file) + except json.decoder.JSONDecodeError as e: + mylog('none', [f'[app_state_class] Failed to handle app_state.json: {e}']) + + # Check if the file exists and recover previous values + if previousState != "": + self.settingsSaved = previousState.get("settingsSaved", 0) + self.settingsImported = previousState.get("settingsImported", 0) + self.processScan = previousState.get("processScan", False) + self.showSpinner = previousState.get("showSpinner", False) + self.isNewVersion = previousState.get("isNewVersion", False) + self.isNewVersionChecked = previousState.get("isNewVersionChecked", 0) + self.graphQLServerStarted = previousState.get("graphQLServerStarted", 0) + self.currentState = previousState.get("currentState", "Init") + else: # init first time values + self.settingsSaved = 0 + self.settingsImported = 0 + self.showSpinner = False + self.processScan = False + self.isNewVersion = checkNewVersion() + self.isNewVersionChecked = int(timeNow().timestamp()) + self.graphQLServerStarted = 0 + self.currentState = "Init" + + # Overwrite with provided parameters if supplied + if settingsSaved is not None: + self.settingsSaved = settingsSaved + if settingsImported is not None: + self.settingsImported = settingsImported + if showSpinner is not None: + self.showSpinner = showSpinner + if graphQLServerStarted is not None: + self.graphQLServerStarted = graphQLServerStarted + if processScan is not None: + self.processScan = processScan + if currentState is not None: + self.currentState = currentState + + # check for new version every hour and if currently not running new version + if self.isNewVersion is False and self.isNewVersionChecked + 3600 < int(timeNow().timestamp()): + self.isNewVersion = checkNewVersion() + self.isNewVersionChecked = int(timeNow().timestamp()) + + # Update .json file + # with open(stateFile, 'w') as json_file: + # json.dump(self, json_file, cls=AppStateEncoder, indent=4) + + # Remove lastUpdated from the dictionary for comparison + currentStateDict = self.__dict__.copy() + currentStateDict.pop('lastUpdated', None) + + # Compare current state with previous state before updating + if previousState != currentStateDict: + # Sanity check before saving the .json file + try: + json_data = json.dumps(self, cls=AppStateEncoder, indent=4) + with open(stateFile, 'w') as json_file: + json_file.write(json_data) + except (TypeError, ValueError) as e: + mylog('none', [f'[app_state_class] Failed to serialize object to JSON: {e}']) + + return # Allows chaining by returning self + + + +#------------------------------------------------------------------------------- +# method to update the state +def updateState(newState = None, settingsSaved = None, settingsImported = None, showSpinner = False, graphQLServerStarted = None, processScan = None): + + return app_state_class(newState, settingsSaved, settingsImported, showSpinner, graphQLServerStarted, processScan) + + +#------------------------------------------------------------------------------- +# Checks if the object has a __dict__ attribute. If it does, it assumes that it's an instance of a class and serializes its attributes dynamically. +class AppStateEncoder(json.JSONEncoder): + def default(self, obj): + if hasattr(obj, '__dict__'): + # If the object has a '__dict__', assume it's an instance of a class + return obj.__dict__ + return super().default(obj) \ No newline at end of file diff --git a/server/database.py b/server/database.py index 52620fa6..5aad5b67 100755 --- a/server/database.py +++ b/server/database.py @@ -8,7 +8,7 @@ import json from const import fullDbPath, sql_devices_stats, sql_devices_all, sql_generateGuid from logger import mylog -from helper import json_obj, initOrSetParam, row_to_json, timeNowTZ#, split_string #, updateState +from helper import json_obj, initOrSetParam, row_to_json, timeNowTZ from appevent import AppEvent_obj class DB(): diff --git a/server/graphql_server/graphql_server_start.py b/server/graphql_server/graphql_server_start.py index 7b92a2a4..89d125f8 100755 --- a/server/graphql_server/graphql_server_start.py +++ b/server/graphql_server/graphql_server_start.py @@ -9,7 +9,8 @@ INSTALL_PATH = "/app" sys.path.extend([f"{INSTALL_PATH}/server"]) from logger import mylog -from helper import get_setting_value, timeNowTZ, updateState +from helper import get_setting_value, timeNowTZ +from app_state import updateState from notification import write_notification # Flask application diff --git a/server/helper.py b/server/helper.py index b4352c44..8f71cd41 100755 --- a/server/helper.py +++ b/server/helper.py @@ -52,94 +52,6 @@ def get_timezone_offset(): return offset_formatted -#------------------------------------------------------------------------------- -# App state -#------------------------------------------------------------------------------- -# A class to manage the application state and to provide a frontend accessible API point -# To keep an existing value pass None -class app_state_class: - def __init__(self, currentState, settingsSaved=None, settingsImported=None, showSpinner=False, graphQLServerStarted=0): - # json file containing the state to communicate with the frontend - stateFile = apiPath + '/app_state.json' - previousState = "" - - # if currentState == 'Initializing': - # checkNewVersion(False) - - # Update self - self.currentState = currentState - self.lastUpdated = str(timeNowTZ()) - - if os.path.exists(stateFile): - try: - with open(stateFile, 'r') as json_file: - previousState = json.load(json_file) - except json.decoder.JSONDecodeError as e: - mylog('none', [f'[app_state_class] Failed to handle app_state.json: {e}']) - - - # Check if the file exists and recover previous values - if previousState != "": - self.settingsSaved = previousState.get("settingsSaved", 0) - self.settingsImported = previousState.get("settingsImported", 0) - self.showSpinner = previousState.get("showSpinner", False) - self.isNewVersion = previousState.get("isNewVersion", False) - self.isNewVersionChecked = previousState.get("isNewVersionChecked", 0) - self.graphQLServerStarted = previousState.get("graphQLServerStarted", 0) - else: # init first time values - self.settingsSaved = 0 - self.settingsImported = 0 - self.showSpinner = False - self.isNewVersion = checkNewVersion() - self.isNewVersionChecked = int(timeNow().timestamp()) - self.graphQLServerStarted = 0 - - # Overwrite with provided parameters if supplied - if settingsSaved is not None: - self.settingsSaved = settingsSaved - if settingsImported is not None: - self.settingsImported = settingsImported - if showSpinner is not None: - self.showSpinner = showSpinner - if graphQLServerStarted is not None: - self.graphQLServerStarted = graphQLServerStarted - - # check for new version every hour and if currently not running new version - if self.isNewVersion is False and self.isNewVersionChecked + 3600 < int(timeNow().timestamp()): - self.isNewVersion = checkNewVersion() - self.isNewVersionChecked = int(timeNow().timestamp()) - - # Update .json file - # with open(stateFile, 'w') as json_file: - # json.dump(self, json_file, cls=AppStateEncoder, indent=4) - - # Sanity check before saving the .json file - try: - json_data = json.dumps(self, cls=AppStateEncoder, indent=4) - with open(stateFile, 'w') as json_file: - json_file.write(json_data) - except (TypeError, ValueError) as e: - mylog('none', [f'[app_state_class] Failed to serialize object to JSON: {e}']) - - - - def isSet(self): - - result = False - - if self.currentState != "": - result = True - - return result - - -#------------------------------------------------------------------------------- -# method to update the state -def updateState(newState, settingsSaved = None, settingsImported = None, showSpinner = False, graphQLServerStarted = None): - - return app_state_class(newState, settingsSaved, settingsImported, showSpinner, graphQLServerStarted) - - #------------------------------------------------------------------------------- def updateSubnets(scan_subnets): subnets = [] @@ -887,14 +799,6 @@ def add_json_list (row, list): return list -#------------------------------------------------------------------------------- -# Checks if the object has a __dict__ attribute. If it does, it assumes that it's an instance of a class and serializes its attributes dynamically. -class AppStateEncoder(json.JSONEncoder): - def default(self, obj): - if hasattr(obj, '__dict__'): - # If the object has a '__dict__', assume it's an instance of a class - return obj.__dict__ - return super().default(obj) #------------------------------------------------------------------------------- # Checks if the object has a __dict__ attribute. If it does, it assumes that it's an instance of a class and serializes its attributes dynamically. diff --git a/server/initialise.py b/server/initialise.py index 6f3abdea..10749091 100755 --- a/server/initialise.py +++ b/server/initialise.py @@ -12,7 +12,8 @@ import re # Register NetAlertX libraries import conf from const import fullConfPath, applicationPath, fullConfFolder -from helper import fixPermissions, collect_lang_strings, updateSubnets, initOrSetParam, isJsonObject, updateState, setting_value_to_python_type, timeNowTZ, get_setting_value, generate_random_string +from helper import fixPermissions, collect_lang_strings, updateSubnets, initOrSetParam, isJsonObject, setting_value_to_python_type, timeNowTZ, get_setting_value, generate_random_string +from app_state import updateState from logger import mylog from api import update_api from scheduler import schedule_class @@ -133,7 +134,7 @@ def importConfigs (db, all_plugins): if (fileModifiedTime == conf.lastImportedConfFile) and all_plugins is not None: mylog('debug', ['[Import Config] skipping config file import']) - return all_plugins + return all_plugins, False # Header updateState("Import config", showSpinner = True) @@ -413,7 +414,7 @@ def importConfigs (db, all_plugins): # front end app log loggging write_notification(msg, 'info', timeNowTZ()) - return all_plugins + return all_plugins, True diff --git a/server/plugin.py b/server/plugin.py index 33d978a2..5cfd6251 100755 --- a/server/plugin.py +++ b/server/plugin.py @@ -12,11 +12,12 @@ from collections import namedtuple import conf from const import pluginsPath, logPath, applicationPath, reportTemplatesPath from logger import mylog, Logger -from helper import timeNowTZ, updateState, get_file_content, write_file, get_setting, get_setting_value +from helper import timeNowTZ, get_file_content, write_file, get_setting, get_setting_value +from app_state import updateState from api import update_api from plugin_utils import logEventStatusCounts, get_plugin_string, get_plugin_setting_obj, print_plugin_info, list_to_csv, combine_plugin_objects, resolve_wildcards_arr, handle_empty, custom_plugin_decoder, decode_and_rename_files from notification import Notification_obj, write_notification -from execution_log import ExecutionLog +from user_events_queue import UserEventsQueue # Make sure log level is initialized correctly Logger(get_setting_value('LOG_LEVEL')) @@ -102,12 +103,7 @@ class plugin_param: self.multiplyTimeout = multiplyTimeout #------------------------------------------------------------------------------- -class plugins_state: - def __init__(self, processScan = False): - self.processScan = processScan - -#------------------------------------------------------------------------------- -def run_plugin_scripts(db, all_plugins, runType, pluginsState = plugins_state()): +def run_plugin_scripts(db, all_plugins, runType): # Header updateState("Run: Plugins") @@ -140,7 +136,7 @@ def run_plugin_scripts(db, all_plugins, runType, pluginsState = plugins_state()) print_plugin_info(plugin, ['display_name']) mylog('debug', ['[Plugins] CMD: ', get_plugin_setting_obj(plugin, "CMD")["value"]]) - pluginsState = execute_plugin(db, all_plugins, plugin, pluginsState) + execute_plugin(db, all_plugins, plugin) # update last run time if runType == "schedule": for schd in conf.mySchedules: @@ -148,9 +144,6 @@ def run_plugin_scripts(db, all_plugins, runType, pluginsState = plugins_state()) # note the last time the scheduled plugin run was executed schd.last_run = timeNowTZ() - return pluginsState - - # Function to run a plugin command def run_plugin(command, set_RUN_TIMEOUT, plugin): @@ -167,20 +160,15 @@ def run_plugin(command, set_RUN_TIMEOUT, plugin): #------------------------------------------------------------------------------- # Executes the plugin command specified in the setting with the function specified as CMD -def execute_plugin(db, all_plugins, plugin, pluginsState = plugins_state() ): +def execute_plugin(db, all_plugins, plugin ): sql = db.sql - - if pluginsState is None: - mylog('debug', ['[Plugins] pluginsState is None']) - pluginsState = plugins_state() - # ------- necessary settings check -------- set = get_plugin_setting_obj(plugin, "CMD") # handle missing "function":"CMD" setting if set == None: - return pluginsState + return set_CMD = set["value"] @@ -394,7 +382,7 @@ def execute_plugin(db, all_plugins, plugin, pluginsState = plugins_state() ): # handle missing "function":"DB_PATH" setting if set == None: mylog('none', ['[Plugins] ⚠ ERROR: DB_PATH setting for plugin type sqlite-db-query missing.']) - return pluginsState + return fullSqlitePath = set["value"] @@ -408,7 +396,7 @@ def execute_plugin(db, all_plugins, plugin, pluginsState = plugins_state() ): except sqlite3.Error as e: mylog('none',[f'[Plugins] ⚠ ERROR: DB_PATH setting ({fullSqlitePath}) for plugin {plugin["unique_prefix"]}. Did you mount it correctly?']) mylog('none',[f'[Plugins] ⚠ ERROR: ATTACH DATABASE failed with SQL ERROR: ', e]) - return pluginsState + return for row in arr: # There has to be always 9 or 13 columns @@ -459,7 +447,7 @@ def execute_plugin(db, all_plugins, plugin, pluginsState = plugins_state() ): # check if the subprocess / SQL query failed / there was no valid output if len(sqlParams) == 0: mylog('none', [f'[Plugins] No output received from the plugin "{plugin["unique_prefix"]}"']) - return pluginsState + return else: mylog('verbose', ['[Plugins] SUCCESS, received ', len(sqlParams), ' entries']) mylog('debug', ['[Plugins] sqlParam entries: ', sqlParams]) @@ -468,17 +456,24 @@ def execute_plugin(db, all_plugins, plugin, pluginsState = plugins_state() ): if len(sqlParams) > 0: # create objects - pluginsState = process_plugin_events(db, plugin, pluginsState, sqlParams) + process_plugin_events(db, plugin, sqlParams) # update API endpoints - update_api(db, all_plugins, False, ["plugins_events","plugins_objects", "plugins_history", "appevents"]) + endpoints = ["plugins_events","plugins_objects", "plugins_history", "appevents"] + + # check if we need to update devices api endpoint as well to prevent long user waits on Loading... + userUpdatedDevices = UserEventsQueue().has_update_devices + if userUpdatedDevices: + endpoints += ["devices"] + + update_api(db, all_plugins, True, endpoints, userUpdatedDevices) - return pluginsState + return #------------------------------------------------------------------------------- # Check if watched values changed for the given plugin -def process_plugin_events(db, plugin, pluginsState, plugEventsArr): +def process_plugin_events(db, plugin, plugEventsArr): sql = db.sql @@ -780,13 +775,14 @@ def process_plugin_events(db, plugin, pluginsState, plugEventsArr): db.commitDB() # perform scan if mapped to CurrentScan table - if dbTable == 'CurrentScan': - pluginsState.processScan = True + if dbTable == 'CurrentScan': + updateState("Process scan: True", None, None, None, None, True) # set processScan = True in the appState + db.commitDB() - return pluginsState + return #------------------------------------------------------------------------------- @@ -844,14 +840,15 @@ class plugin_object_class: self.watchedHash = str(hash(tmp)) + #=============================================================================== # Handling of user initialized front-end events #=============================================================================== -def check_and_run_user_event(db, all_plugins, pluginsState): +def check_and_run_user_event(db, all_plugins): """ Process user events from the execution queue log file and notify the user about executed events. """ - execution_log = ExecutionLog() + execution_log = UserEventsQueue() # Track whether to show notification for executed events executed_events = [] @@ -859,7 +856,10 @@ def check_and_run_user_event(db, all_plugins, pluginsState): # Read the log file to get the lines lines = execution_log.read_log() if not lines: - return pluginsState # Exit early if the log file is empty + mylog('debug', ['[check_and_run_user_event] User Execution Queue is empty']) + return # Exit early if the log file is empty + else: + mylog('debug', ['[check_and_run_user_event] Process User Execution Queue:' + ', '.join(map(str, lines))]) for line in lines: # Extract event name and parameters from the log line @@ -871,11 +871,11 @@ def check_and_run_user_event(db, all_plugins, pluginsState): # Process each event type if event == 'test': - pluginsState = handle_test(param, db, all_plugins, pluginsState) + handle_test(param, db, all_plugins) executed_events.append(f"test with param {param}") execution_log.finalize_event("test") elif event == 'run': - pluginsState = handle_run(param, db, all_plugins, pluginsState) + handle_run(param, db, all_plugins) executed_events.append(f"run with param {param}") execution_log.finalize_event("run") elif event == 'update_api': @@ -892,27 +892,27 @@ def check_and_run_user_event(db, all_plugins, pluginsState): mylog('minimal', ['[check_and_run_user_event] INFO: Executed events: ', executed_events_message]) write_notification(f"[Ad-hoc events] Events executed: {executed_events_message}", "interrupt", timeNowTZ()) - return pluginsState + return #------------------------------------------------------------------------------- -def handle_run(runType, db, all_plugins, pluginsState): +def handle_run(runType, db, all_plugins): mylog('minimal', ['[', timeNowTZ(), '] START Run: ', runType]) # run the plugin to run for plugin in all_plugins: if plugin["unique_prefix"] == runType: - pluginsState = execute_plugin(db, all_plugins, plugin, pluginsState) + execute_plugin(db, all_plugins, plugin) mylog('minimal', ['[', timeNowTZ(), '] END Run: ', runType]) - return pluginsState + return #------------------------------------------------------------------------------- -def handle_test(runType, db, all_plugins, pluginsState): +def handle_test(runType, db, all_plugins): mylog('minimal', ['[', timeNowTZ(), '] [Test] START Test: ', runType]) @@ -924,12 +924,12 @@ def handle_test(runType, db, all_plugins, pluginsState): notificationObj = notification.create(sample_json, "") # Run test - pluginsState = handle_run(runType, db, all_plugins, pluginsState) + handle_run(runType, db, all_plugins) # Remove sample notification notificationObj.remove(notificationObj.GUID) mylog('minimal', ['[Test] END Test: ', runType]) - return pluginsState + return diff --git a/server/plugin_utils.py b/server/plugin_utils.py index c24d6d1f..5cd55ad3 100755 --- a/server/plugin_utils.py +++ b/server/plugin_utils.py @@ -4,7 +4,8 @@ import json import conf from logger import mylog from const import pluginsPath, logPath, apiPath -from helper import timeNowTZ, updateState, get_file_content, write_file, get_setting, get_setting_value, setting_value_to_python_type +from helper import timeNowTZ, get_file_content, write_file, get_setting, get_setting_value, setting_value_to_python_type +from app_state import updateState from crypto_utils import decrypt_data module_name = 'Plugin utils' diff --git a/server/execution_log.py b/server/user_events_queue.py similarity index 84% rename from server/execution_log.py rename to server/user_events_queue.py index da36ecf6..11cb4dc1 100755 --- a/server/execution_log.py +++ b/server/user_events_queue.py @@ -4,7 +4,7 @@ import os from const import pluginsPath, logPath, applicationPath, reportTemplatesPath from logger import mylog -class ExecutionLog: +class UserEventsQueue: """ Handles the execution queue log file, allowing reading, writing, and removing processed events. @@ -14,12 +14,23 @@ class ExecutionLog: self.log_path = logPath self.log_file = os.path.join(self.log_path, "execution_queue.log") + + def has_update_devices(self): + lines = self.read_log() + + for line in lines: + if 'update_api|devices' in line: + return True + + return False + def read_log(self): """ Reads the log file and returns all lines. Returns an empty list if the file doesn't exist. """ if not os.path.exists(self.log_file): + mylog('none', ['[UserEventsQueue] Log file not found: ', self.log_file]) return [] # No log file, return empty list with open(self.log_file, "r") as file: return file.readlines() @@ -61,7 +72,7 @@ class ExecutionLog: self.write_log(updated_lines) - mylog('minimal', ['[ExecutionLog] Processed event: ', event]) + mylog('minimal', ['[UserEventsQueue] Processed event: ', event]) return removed