chore: Try to reduce disk IO

This commit is contained in:
jokob-sk
2024-12-16 20:21:52 +11:00
parent 2e47af7b63
commit 953534724c
5 changed files with 211 additions and 61 deletions

0
front/php/templates/language/pt_br.json Normal file → Executable file
View File

0
front/php/templates/language/ru_ru.json Normal file → Executable file
View File

View File

@@ -1,32 +1,43 @@
import json import json
import time
import threading
import datetime
# Register NetAlertX modules # Register NetAlertX modules
import conf import conf
from const import (apiPath, sql_appevents, sql_devices_all, sql_events_pending_alert, sql_settings, sql_plugins_events, sql_plugins_history, sql_plugins_objects,sql_language_strings, sql_notifications_all, sql_online_history, sql_devices_tiles) from const import (apiPath, sql_appevents, sql_devices_all, sql_events_pending_alert, sql_settings, sql_plugins_events, sql_plugins_history, sql_plugins_objects,sql_language_strings, sql_notifications_all, sql_online_history, sql_devices_tiles)
from logger import mylog from logger import mylog
from helper import write_file, get_setting_value, updateState from helper import write_file, get_setting_value, updateState, timeNowTZ
from execution_log import ExecutionLog
# Import the start_server function # Import the start_server function
from graphql_server.graphql_server_start import start_server from graphql_server.graphql_server_start import start_server
apiEndpoints = [] apiEndpoints = []
# Lock for thread safety
api_lock = threading.Lock()
periodic_write_lock = threading.Lock()
stop_event = threading.Event() # Event to signal thread termination
#=============================================================================== #===============================================================================
# API # API
#=============================================================================== #===============================================================================
def update_api(db, all_plugins, isNotification = False, updateOnlyDataSources = []): def update_api(db, all_plugins, isNotification=False, updateOnlyDataSources=[], is_ad_hoc_user_event=False):
mylog('debug', ['[API] Update API starting']) mylog('debug', ['[API] Update API starting'])
# update app_state.json and retrieve app_state to chjeck if GraphQL server is running # Start periodic write if not running
start_periodic_write(interval=1)
# Update app_state.json and retrieve app_state to check if GraphQL server is running
app_state = updateState("Update: API", None, None, None, None) app_state = updateState("Update: API", None, None, None, None)
folder = apiPath folder = apiPath
# Save plugins # Save plugins
write_file(folder + 'plugins.json' , json.dumps({"data" : all_plugins})) write_file(folder + 'plugins.json', json.dumps({"data": all_plugins}))
# prepare database tables we want to expose # Prepare database tables we want to expose
dataSourcesSQLs = [ dataSourcesSQLs = [
["appevents", sql_appevents], ["appevents", sql_appevents],
["devices", sql_devices_all], ["devices", sql_devices_all],
@@ -44,18 +55,15 @@ def update_api(db, all_plugins, isNotification = False, updateOnlyDataSources =
# Save selected database tables # Save selected database tables
for dsSQL in dataSourcesSQLs: for dsSQL in dataSourcesSQLs:
if not updateOnlyDataSources or dsSQL[0] in updateOnlyDataSources:
if updateOnlyDataSources == [] or dsSQL[0] in updateOnlyDataSources: api_endpoint_class(db, dsSQL[1], folder + 'table_' + dsSQL[0] + '.json', is_ad_hoc_user_event)
api_endpoint_class(db, dsSQL[1], folder + 'table_' + dsSQL[0] + '.json')
# Start the GraphQL server # Start the GraphQL server
graphql_port_value = get_setting_value("GRAPHQL_PORT") graphql_port_value = get_setting_value("GRAPHQL_PORT")
api_token_value = get_setting_value("API_TOKEN") api_token_value = get_setting_value("API_TOKEN")
# start GraphQL server if not yet running # Start GraphQL server if not yet running
if app_state.graphQLServerStarted == 0: if app_state.graphQLServerStarted == 0:
# Validate if settings are available
if graphql_port_value is not None and len(api_token_value) > 1: if graphql_port_value is not None and len(api_token_value) > 1:
try: try:
graphql_port_value = int(graphql_port_value) # Ensure port is an integer graphql_port_value = int(graphql_port_value) # Ensure port is an integer
@@ -65,30 +73,32 @@ def update_api(db, all_plugins, isNotification = False, updateOnlyDataSources =
else: else:
mylog('none', [f"[API] GRAPHQL_PORT or API_TOKEN is not set, will try later."]) mylog('none', [f"[API] GRAPHQL_PORT or API_TOKEN is not set, will try later."])
#------------------------------------------------------------------------------- #-------------------------------------------------------------------------------
class api_endpoint_class: class api_endpoint_class:
def __init__(self, db, query, path): def __init__(self, db, query, path, is_ad_hoc_user_event=False):
global apiEndpoints global apiEndpoints
current_time = timeNowTZ()
self.db = db self.db = db
self.query = query self.query = query
self.jsonData = db.get_table_as_json(self.query).json self.jsonData = db.get_table_as_json(self.query).json
self.path = path self.path = path
self.fileName = path.split('/')[-1] self.fileName = path.split('/')[-1]
self.hash = hash(json.dumps(self.jsonData)) self.hash = hash(json.dumps(self.jsonData))
self.debounce_interval = 5 # Time to wait before writing
self.last_update_time = current_time - datetime.timedelta(minutes=1) # Last time data was updated
self.is_ad_hoc_user_event = is_ad_hoc_user_event
# check if the endpoint needs to be updated # Check if the endpoint needs to be updated
found = False found = False
changed = False changed = False
changedIndex = -1 changedIndex = -1
index = 0 index = 0
# search previous endpoint states to check if API needs updating # Search previous endpoint states to check if API needs updating
for endpoint in apiEndpoints: for endpoint in apiEndpoints:
# match sql and API endpoint path # Match SQL and API endpoint path
if endpoint.query == self.query and endpoint.path == self.path: if endpoint.query == self.query and endpoint.path == self.path:
found = True found = True
if endpoint.hash != self.hash: if endpoint.hash != self.hash:
@@ -97,19 +107,88 @@ class api_endpoint_class:
index = index + 1 index = index + 1
# check if API endpoints have changed or if it's a new one # Check if API endpoints have changed or if it's a new one
if not found or changed: if not found or changed:
mylog('verbose', [f'[API] Updating {self.fileName} file in /api']) mylog('trace', [f'[API] api_endpoint_class: Updating {self.fileName}'])
write_file(self.path, json.dumps(self.jsonData)) if not found:
if not found:
apiEndpoints.append(self) apiEndpoints.append(self)
elif changed and changedIndex != -1 and changedIndex < len(apiEndpoints): elif changed and changedIndex != -1 and changedIndex < len(apiEndpoints):
# update hash # Update hash and data
apiEndpoints[changedIndex].hash = self.hash apiEndpoints[changedIndex].hash = self.hash
apiEndpoints[changedIndex].jsonData = self.jsonData
mylog('trace', [f'[API] api_endpoint_class: Updating hash {self.hash}'])
else: else:
mylog('minimal', [f'[API] ⚠ ERROR Updating {self.fileName}']) mylog('none', [f'[API] ⚠ ERROR Updating {self.fileName}'])
# Needs to be called for initial updates
self.try_write()
def try_write(self):
current_time = timeNowTZ()
# Debugging info to understand the issue
# mylog('verbose', [f'[API] api_endpoint_class: {self.fileName} is_ad_hoc_user_event {self.is_ad_hoc_user_event} last_update_time={self.last_update_time}, debounce time={self.last_update_time + datetime.timedelta(seconds=self.debounce_interval)}.'])
# Only attempt to write if the debounce time has passed
if current_time > (self.last_update_time + datetime.timedelta(seconds=self.debounce_interval)):
write_file(self.path, json.dumps(self.jsonData))
# mylog('verbose', [f'[API] api_endpoint_class: Writing {self.fileName} after debounce.'])
self.last_update_time = timeNowTZ() # Reset last_update_time after writing
# Update user event execution log
# mylog('verbose', [f'[API] api_endpoint_class: is_ad_hoc_user_event {self.is_ad_hoc_user_event}'])
if self.is_ad_hoc_user_event:
execution_log = ExecutionLog()
execution_log.finalize_event("update_api")
write_notification(f"[Ad-hoc events] Events executed: update_api", "interrupt", timeNowTZ())
else:
# Debugging if write is skipped
mylog('trace', [f'[API] api_endpoint_class: Skipping write for {self.fileName}, debounce time not passed.'])
#===============================================================================
# Periodic Write Functions
#===============================================================================
periodic_write_running = False
periodic_write_thread = None
def periodic_write(interval=1):
"""Periodically checks all endpoints for pending writes."""
global apiEndpoints
while not stop_event.is_set():
with api_lock:
for endpoint in apiEndpoints:
endpoint.try_write() # Attempt to write each endpoint if necessary
time.sleep(interval)
def start_periodic_write(interval=1):
"""Start periodic_write if it's not already running."""
global periodic_write_running, periodic_write_thread
with periodic_write_lock:
if not periodic_write_running:
mylog('trace', ["[API] Starting periodic_write thread."])
periodic_write_running = True
periodic_write_thread = threading.Thread(target=periodic_write, args=(interval,), daemon=True)
periodic_write_thread.start()
else:
mylog('trace', ["[API] periodic_write is already running."])
def stop_periodic_write():
"""Stop the periodic_write thread."""
global periodic_write_running
with periodic_write_lock:
if periodic_write_running:
stop_event.set()
periodic_write_thread.join()
periodic_write_running = False
mylog('trace', ["[API] periodic_write thread stopped."])

69
server/execution_log.py Executable file
View File

@@ -0,0 +1,69 @@
import os
# Register NetAlertX modules
from const import pluginsPath, logPath, applicationPath, reportTemplatesPath
from logger import mylog
class ExecutionLog:
"""
Handles the execution queue log file, allowing reading, writing,
and removing processed events.
"""
def __init__(self):
self.log_path = logPath
self.log_file = os.path.join(self.log_path, "execution_queue.log")
def read_log(self):
"""
Reads the log file and returns all lines.
Returns an empty list if the file doesn't exist.
"""
if not os.path.exists(self.log_file):
return [] # No log file, return empty list
with open(self.log_file, "r") as file:
return file.readlines()
def write_log(self, lines):
"""
Overwrites the log file with the provided lines.
"""
with open(self.log_file, "w") as file:
file.writelines(lines)
def finalize_event(self, event):
"""
Removes the first occurrence of the specified event from the log file.
Retains all other lines untouched.
Returns:
bool: True if the event was found and removed, False otherwise.
"""
if not os.path.exists(self.log_file):
return False # No log file to process
updated_lines = []
removed = False
# Process the log file line by line
with open(self.log_file, "r") as file:
for line in file:
columns = line.strip().split('|')[2:4] # Extract event and param columns
if len(columns) == 2:
event_name, _ = columns
if event_name == event and not removed:
# Skip this line (remove the processed event)
removed = True
continue
updated_lines.append(line)
# Write back the remaining lines
self.write_log(updated_lines)
mylog('minimal', ['[ExecutionLog] Processed event: ', event])
return removed

View File

@@ -16,6 +16,7 @@ from helper import timeNowTZ, updateState, get_file_content, write_file, get_se
from api import update_api from api import update_api
from plugin_utils import logEventStatusCounts, get_plugin_string, get_plugin_setting_obj, print_plugin_info, list_to_csv, combine_plugin_objects, resolve_wildcards_arr, handle_empty, custom_plugin_decoder, decode_and_rename_files from plugin_utils import logEventStatusCounts, get_plugin_string, get_plugin_setting_obj, print_plugin_info, list_to_csv, combine_plugin_objects, resolve_wildcards_arr, handle_empty, custom_plugin_decoder, decode_and_rename_files
from notification import Notification_obj, write_notification from notification import Notification_obj, write_notification
from execution_log import ExecutionLog
#------------------------------------------------------------------------------- #-------------------------------------------------------------------------------
@@ -840,56 +841,57 @@ class plugin_object_class:
# Handling of user initialized front-end events # Handling of user initialized front-end events
#=============================================================================== #===============================================================================
def check_and_run_user_event(db, all_plugins, pluginsState): def check_and_run_user_event(db, all_plugins, pluginsState):
# Check if the log file exists """
logFile = os.path.join(logPath, "execution_queue.log") Process user events from the execution queue log file and notify the user about executed events.
"""
execution_log = ExecutionLog()
# Track if not an API event and list of executed events # Track whether to show notification for executed events
show_events_completed = False
executed_events = [] executed_events = []
if not os.path.exists(logFile): # Read the log file to get the lines
return pluginsState lines = execution_log.read_log()
if not lines:
with open(logFile, "r") as file: return pluginsState # Exit early if the log file is empty
lines = file.readlines()
remaining_lines = []
for line in lines: for line in lines:
# Split the line by '|', and take the third and fourth columns (indices 2 and 3) # Extract event name and parameters from the log line
columns = line.strip().split('|')[2:4] columns = line.strip().split('|')[2:4]
event, param = "", "" event, param = "", ""
if len(columns) == 2: if len(columns) == 2:
event, param = columns event, param = columns
if event == 'test': try:
show_events_completed = True # Process each event type
pluginsState = handle_test(param, db, all_plugins, pluginsState) if event == 'test':
executed_events.append(f"test with param {param}") pluginsState = handle_test(param, db, all_plugins, pluginsState)
elif event == 'run': executed_events.append(f"test with param {param}")
show_events_completed = True execution_log.finalize_event("test")
pluginsState = handle_run(param, db, all_plugins, pluginsState) elif event == 'run':
executed_events.append(f"run with param {param}") pluginsState = handle_run(param, db, all_plugins, pluginsState)
elif event == 'update_api': executed_events.append(f"run with param {param}")
# Update API endpoints execution_log.finalize_event("run")
update_api(db, all_plugins, False, param.split(',')) elif event == 'update_api':
executed_events.append(f"update_api with param {param}") # async handling
else: update_api(db, all_plugins, False, param.split(','), True)
remaining_lines.append(line)
else:
mylog('minimal', ['[check_and_run_user_event] WARNING: Unhandled event in execution queue: ', event, ' | ', param])
execution_log.finalize_event(event) # Finalize unknown events to remove them
except Exception as e:
mylog('none', ['[check_and_run_user_event] ERROR: Error processing event "', event, '" with param "', param, '": ', str(e)])
# Rewrite the log file with remaining lines # Notify user about executed events (if applicable)
with open(logFile, "w") as file: if len(executed_events) > 0 and executed_events:
file.writelines(remaining_lines)
# Only show pop-up if not an API event
if show_events_completed:
executed_events_message = ', '.join(executed_events) executed_events_message = ', '.join(executed_events)
write_notification(f'[Ad-hoc events] Events executed: {executed_events_message}', 'interrupt', timeNowTZ()) mylog('minimal', ['[check_and_run_user_event] INFO: Executed events: ', executed_events_message])
write_notification(f"[Ad-hoc events] Events executed: {executed_events_message}", "interrupt", timeNowTZ())
return pluginsState return pluginsState
#------------------------------------------------------------------------------- #-------------------------------------------------------------------------------
def handle_run(runType, db, all_plugins, pluginsState): def handle_run(runType, db, all_plugins, pluginsState):