diff --git a/front/php/templates/language/pt_br.json b/front/php/templates/language/pt_br.json old mode 100644 new mode 100755 diff --git a/front/php/templates/language/ru_ru.json b/front/php/templates/language/ru_ru.json old mode 100644 new mode 100755 diff --git a/server/api.py b/server/api.py index 7ef6c7aa..1d9543bd 100755 --- a/server/api.py +++ b/server/api.py @@ -1,32 +1,43 @@ import json - +import time +import threading +import datetime # Register NetAlertX modules import conf from const import (apiPath, sql_appevents, sql_devices_all, sql_events_pending_alert, sql_settings, sql_plugins_events, sql_plugins_history, sql_plugins_objects,sql_language_strings, sql_notifications_all, sql_online_history, sql_devices_tiles) from logger import mylog -from helper import write_file, get_setting_value, updateState +from helper import write_file, get_setting_value, updateState, timeNowTZ +from execution_log import ExecutionLog # Import the start_server function from graphql_server.graphql_server_start import start_server apiEndpoints = [] +# Lock for thread safety +api_lock = threading.Lock() +periodic_write_lock = threading.Lock() +stop_event = threading.Event() # Event to signal thread termination + #=============================================================================== # API #=============================================================================== -def update_api(db, all_plugins, isNotification = False, updateOnlyDataSources = []): +def update_api(db, all_plugins, isNotification=False, updateOnlyDataSources=[], is_ad_hoc_user_event=False): mylog('debug', ['[API] Update API starting']) - # update app_state.json and retrieve app_state to chjeck if GraphQL server is running + # Start periodic write if not running + start_periodic_write(interval=1) + + # Update app_state.json and retrieve app_state to check if GraphQL server is running app_state = updateState("Update: API", None, None, None, None) folder = apiPath # Save plugins - write_file(folder + 'plugins.json' , json.dumps({"data" : all_plugins})) + write_file(folder + 'plugins.json', json.dumps({"data": all_plugins})) - # prepare database tables we want to expose + # Prepare database tables we want to expose dataSourcesSQLs = [ ["appevents", sql_appevents], ["devices", sql_devices_all], @@ -44,18 +55,15 @@ def update_api(db, all_plugins, isNotification = False, updateOnlyDataSources = # Save selected database tables for dsSQL in dataSourcesSQLs: - - if updateOnlyDataSources == [] or dsSQL[0] in updateOnlyDataSources: - - api_endpoint_class(db, dsSQL[1], folder + 'table_' + dsSQL[0] + '.json') + if not updateOnlyDataSources or dsSQL[0] in updateOnlyDataSources: + api_endpoint_class(db, dsSQL[1], folder + 'table_' + dsSQL[0] + '.json', is_ad_hoc_user_event) # Start the GraphQL server graphql_port_value = get_setting_value("GRAPHQL_PORT") api_token_value = get_setting_value("API_TOKEN") - # start GraphQL server if not yet running + # Start GraphQL server if not yet running if app_state.graphQLServerStarted == 0: - # Validate if settings are available if graphql_port_value is not None and len(api_token_value) > 1: try: graphql_port_value = int(graphql_port_value) # Ensure port is an integer @@ -65,30 +73,32 @@ def update_api(db, all_plugins, isNotification = False, updateOnlyDataSources = else: mylog('none', [f"[API] GRAPHQL_PORT or API_TOKEN is not set, will try later."]) - #------------------------------------------------------------------------------- - - class api_endpoint_class: - def __init__(self, db, query, path): - + def __init__(self, db, query, path, is_ad_hoc_user_event=False): global apiEndpoints + + current_time = timeNowTZ() + self.db = db self.query = query self.jsonData = db.get_table_as_json(self.query).json self.path = path self.fileName = path.split('/')[-1] self.hash = hash(json.dumps(self.jsonData)) + self.debounce_interval = 5 # Time to wait before writing + self.last_update_time = current_time - datetime.timedelta(minutes=1) # Last time data was updated + self.is_ad_hoc_user_event = is_ad_hoc_user_event - # check if the endpoint needs to be updated + # Check if the endpoint needs to be updated found = False changed = False changedIndex = -1 index = 0 - # search previous endpoint states to check if API needs updating + # Search previous endpoint states to check if API needs updating for endpoint in apiEndpoints: - # match sql and API endpoint path + # Match SQL and API endpoint path if endpoint.query == self.query and endpoint.path == self.path: found = True if endpoint.hash != self.hash: @@ -97,19 +107,88 @@ class api_endpoint_class: index = index + 1 - # check if API endpoints have changed or if it's a new one + # Check if API endpoints have changed or if it's a new one if not found or changed: - mylog('verbose', [f'[API] Updating {self.fileName} file in /api']) + mylog('trace', [f'[API] api_endpoint_class: Updating {self.fileName}']) - write_file(self.path, json.dumps(self.jsonData)) - - if not found: + if not found: apiEndpoints.append(self) elif changed and changedIndex != -1 and changedIndex < len(apiEndpoints): - # update hash + # Update hash and data apiEndpoints[changedIndex].hash = self.hash + apiEndpoints[changedIndex].jsonData = self.jsonData + + mylog('trace', [f'[API] api_endpoint_class: Updating hash {self.hash}']) else: - mylog('minimal', [f'[API] ⚠ ERROR Updating {self.fileName}']) - + mylog('none', [f'[API] ⚠ ERROR Updating {self.fileName}']) + + # Needs to be called for initial updates + self.try_write() + + def try_write(self): + current_time = timeNowTZ() + + # Debugging info to understand the issue + # mylog('verbose', [f'[API] api_endpoint_class: {self.fileName} is_ad_hoc_user_event {self.is_ad_hoc_user_event} last_update_time={self.last_update_time}, debounce time={self.last_update_time + datetime.timedelta(seconds=self.debounce_interval)}.']) + + # Only attempt to write if the debounce time has passed + if current_time > (self.last_update_time + datetime.timedelta(seconds=self.debounce_interval)): + write_file(self.path, json.dumps(self.jsonData)) + # mylog('verbose', [f'[API] api_endpoint_class: Writing {self.fileName} after debounce.']) + self.last_update_time = timeNowTZ() # Reset last_update_time after writing + + # Update user event execution log + # mylog('verbose', [f'[API] api_endpoint_class: is_ad_hoc_user_event {self.is_ad_hoc_user_event}']) + if self.is_ad_hoc_user_event: + execution_log = ExecutionLog() + execution_log.finalize_event("update_api") + write_notification(f"[Ad-hoc events] Events executed: update_api", "interrupt", timeNowTZ()) + + else: + # Debugging if write is skipped + mylog('trace', [f'[API] api_endpoint_class: Skipping write for {self.fileName}, debounce time not passed.']) + + + +#=============================================================================== +# Periodic Write Functions +#=============================================================================== +periodic_write_running = False +periodic_write_thread = None + +def periodic_write(interval=1): + """Periodically checks all endpoints for pending writes.""" + global apiEndpoints + while not stop_event.is_set(): + with api_lock: + for endpoint in apiEndpoints: + endpoint.try_write() # Attempt to write each endpoint if necessary + time.sleep(interval) + + +def start_periodic_write(interval=1): + """Start periodic_write if it's not already running.""" + global periodic_write_running, periodic_write_thread + + with periodic_write_lock: + if not periodic_write_running: + mylog('trace', ["[API] Starting periodic_write thread."]) + periodic_write_running = True + periodic_write_thread = threading.Thread(target=periodic_write, args=(interval,), daemon=True) + periodic_write_thread.start() + else: + mylog('trace', ["[API] periodic_write is already running."]) + +def stop_periodic_write(): + """Stop the periodic_write thread.""" + global periodic_write_running + + with periodic_write_lock: + if periodic_write_running: + stop_event.set() + periodic_write_thread.join() + periodic_write_running = False + mylog('trace', ["[API] periodic_write thread stopped."]) + diff --git a/server/execution_log.py b/server/execution_log.py new file mode 100755 index 00000000..da36ecf6 --- /dev/null +++ b/server/execution_log.py @@ -0,0 +1,69 @@ +import os + +# Register NetAlertX modules +from const import pluginsPath, logPath, applicationPath, reportTemplatesPath +from logger import mylog + +class ExecutionLog: + """ + Handles the execution queue log file, allowing reading, writing, + and removing processed events. + """ + + def __init__(self): + self.log_path = logPath + self.log_file = os.path.join(self.log_path, "execution_queue.log") + + def read_log(self): + """ + Reads the log file and returns all lines. + Returns an empty list if the file doesn't exist. + """ + if not os.path.exists(self.log_file): + return [] # No log file, return empty list + with open(self.log_file, "r") as file: + return file.readlines() + + def write_log(self, lines): + """ + Overwrites the log file with the provided lines. + """ + with open(self.log_file, "w") as file: + file.writelines(lines) + + def finalize_event(self, event): + """ + Removes the first occurrence of the specified event from the log file. + Retains all other lines untouched. + + Returns: + bool: True if the event was found and removed, False otherwise. + """ + if not os.path.exists(self.log_file): + return False # No log file to process + + updated_lines = [] + removed = False + + # Process the log file line by line + with open(self.log_file, "r") as file: + for line in file: + columns = line.strip().split('|')[2:4] # Extract event and param columns + if len(columns) == 2: + event_name, _ = columns + if event_name == event and not removed: + # Skip this line (remove the processed event) + removed = True + continue + updated_lines.append(line) + + # Write back the remaining lines + self.write_log(updated_lines) + + + mylog('minimal', ['[ExecutionLog] Processed event: ', event]) + + return removed + + + diff --git a/server/plugin.py b/server/plugin.py index d050c7d1..bd6d765b 100755 --- a/server/plugin.py +++ b/server/plugin.py @@ -16,6 +16,7 @@ from helper import timeNowTZ, updateState, get_file_content, write_file, get_se from api import update_api from plugin_utils import logEventStatusCounts, get_plugin_string, get_plugin_setting_obj, print_plugin_info, list_to_csv, combine_plugin_objects, resolve_wildcards_arr, handle_empty, custom_plugin_decoder, decode_and_rename_files from notification import Notification_obj, write_notification +from execution_log import ExecutionLog #------------------------------------------------------------------------------- @@ -840,56 +841,57 @@ class plugin_object_class: # Handling of user initialized front-end events #=============================================================================== def check_and_run_user_event(db, all_plugins, pluginsState): - # Check if the log file exists - logFile = os.path.join(logPath, "execution_queue.log") + """ + Process user events from the execution queue log file and notify the user about executed events. + """ + execution_log = ExecutionLog() - # Track if not an API event and list of executed events - show_events_completed = False + # Track whether to show notification for executed events executed_events = [] - if not os.path.exists(logFile): - return pluginsState - - with open(logFile, "r") as file: - lines = file.readlines() - - remaining_lines = [] + # Read the log file to get the lines + lines = execution_log.read_log() + if not lines: + return pluginsState # Exit early if the log file is empty for line in lines: - # Split the line by '|', and take the third and fourth columns (indices 2 and 3) + # Extract event name and parameters from the log line columns = line.strip().split('|')[2:4] event, param = "", "" if len(columns) == 2: event, param = columns - if event == 'test': - show_events_completed = True - pluginsState = handle_test(param, db, all_plugins, pluginsState) - executed_events.append(f"test with param {param}") - elif event == 'run': - show_events_completed = True - pluginsState = handle_run(param, db, all_plugins, pluginsState) - executed_events.append(f"run with param {param}") - elif event == 'update_api': - # Update API endpoints - update_api(db, all_plugins, False, param.split(',')) - executed_events.append(f"update_api with param {param}") - else: - remaining_lines.append(line) + try: + # Process each event type + if event == 'test': + pluginsState = handle_test(param, db, all_plugins, pluginsState) + executed_events.append(f"test with param {param}") + execution_log.finalize_event("test") + elif event == 'run': + pluginsState = handle_run(param, db, all_plugins, pluginsState) + executed_events.append(f"run with param {param}") + execution_log.finalize_event("run") + elif event == 'update_api': + # async handling + update_api(db, all_plugins, False, param.split(','), True) + + else: + mylog('minimal', ['[check_and_run_user_event] WARNING: Unhandled event in execution queue: ', event, ' | ', param]) + execution_log.finalize_event(event) # Finalize unknown events to remove them + except Exception as e: + mylog('none', ['[check_and_run_user_event] ERROR: Error processing event "', event, '" with param "', param, '": ', str(e)]) - # Rewrite the log file with remaining lines - with open(logFile, "w") as file: - file.writelines(remaining_lines) - - # Only show pop-up if not an API event - if show_events_completed: + # Notify user about executed events (if applicable) + if len(executed_events) > 0 and executed_events: executed_events_message = ', '.join(executed_events) - write_notification(f'[Ad-hoc events] Events executed: {executed_events_message}', 'interrupt', timeNowTZ()) + mylog('minimal', ['[check_and_run_user_event] INFO: Executed events: ', executed_events_message]) + write_notification(f"[Ad-hoc events] Events executed: {executed_events_message}", "interrupt", timeNowTZ()) return pluginsState + #------------------------------------------------------------------------------- def handle_run(runType, db, all_plugins, pluginsState):