GraphQl 0.125 - Threading issues

This commit is contained in:
jokob-sk
2024-11-15 23:27:26 +11:00
parent 60492157d1
commit 70e0542488
5 changed files with 80 additions and 85 deletions

0
front/php/templates/language/ca_ca.json Normal file → Executable file
View File

0
front/php/templates/language/ru_ru.json Normal file → Executable file
View File

View File

@@ -41,12 +41,13 @@ def graphql_endpoint():
def start_server(): def start_server():
"""Function to start the GraphQL server in a background thread.""" """Function to start the GraphQL server in a background thread."""
mylog('verbose', [f'[graphql_server] Starting on port: {GRAPHQL_PORT}'])
state = updateState("GraphQL: Starting", None, None, None, None) state = updateState("GraphQL: Starting", None, None, None, None)
if state.graphQLServerStarted == 0: if state.graphQLServerStarted == 0:
mylog('verbose', [f'[graphql_server] Starting on port: {GRAPHQL_PORT}'])
# Start the Flask app in a separate thread # Start the Flask app in a separate thread
thread = threading.Thread(target=lambda: app.run(host="0.0.0.0", port=GRAPHQL_PORT, debug=True, use_reloader=False)) thread = threading.Thread(target=lambda: app.run(host="0.0.0.0", port=GRAPHQL_PORT, debug=True, use_reloader=False))
thread.start() thread.start()

View File

@@ -1,13 +1,11 @@
""" Colection of functions to support all logging for NetAlertX """
import sys import sys
import io import io
import datetime import datetime
import threading import threading
import queue
import time import time
import conf import conf
from const import * from const import *
# from helper import get_setting_value
#------------------------------------------------------------------------------- #-------------------------------------------------------------------------------
# duplication from helper to avoid circle # duplication from helper to avoid circle
@@ -18,17 +16,15 @@ def timeNowTZ():
else: else:
return datetime.datetime.now().replace(microsecond=0) return datetime.datetime.now().replace(microsecond=0)
#------------------------------------------------------------------------------- #-------------------------------------------------------------------------------
# More verbose as the numbers go up # More verbose as the numbers go up
debugLevels = [ debugLevels = [
('none', 0), ('minimal', 1), ('verbose', 2), ('debug', 3), ('trace', 4) ('none', 0), ('minimal', 1), ('verbose', 2), ('debug', 3), ('trace', 4)
] ]
currentLevel = 0 currentLevel = 0
def mylog(requestedDebugLevel, n): def mylog(requestedDebugLevel, n):
setLvl = 0 setLvl = 0
reqLvl = 0 reqLvl = 0
@@ -43,100 +39,86 @@ def mylog(requestedDebugLevel, n):
file_print (*n) file_print (*n)
#------------------------------------------------------------------------------- #-------------------------------------------------------------------------------
def file_print (*args): # Queue for log messages
log_queue = queue.Queue()
result = timeNowTZ().strftime ('%H:%M:%S') + ' ' # Dedicated thread for writing logs
log_thread = None # Will hold the thread reference
def log_writer():
while True:
log_entry = log_queue.get()
if log_entry is None: # Graceful exit signal
break
with open(logPath + "/app.log", 'a') as log_file:
log_file.write(log_entry + '\n')
#-------------------------------------------------------------------------------
# Function to start the log writer thread if it doesn't exist
def start_log_writer_thread():
global log_thread
if log_thread is None or not log_thread.is_alive():
print("Starting log writer thread...")
log_thread = threading.Thread(target=log_writer, args=(), daemon=True)
log_thread.start()
#-------------------------------------------------------------------------------
def file_print(*args):
result = timeNowTZ().strftime('%H:%M:%S') + ' '
for arg in args: for arg in args:
result += str(arg) result += str(arg)
print(result) print(result)
append_to_file_with_timeout(logPath + "/app.log", result + '\n', 5)
#------------------------------------------------------------------------------- # Ensure the log writer thread is running
# Function to append to the file start_log_writer_thread()
def append_to_file(file_path, data):
try: # Queue the log entry for writing
# Open the file for appending append_to_file_with_timeout( result, 5)
file = open(file_path, "a")
# Write the data to the file
file.write(data)
# Close the file
file.close()
except Exception as e:
print(f"Error appending to file: {e}")
#------------------------------------------------------------------------------- #-------------------------------------------------------------------------------
# Function to append to the file with a timeout # Function to append to the file with a timeout
def append_to_file_with_timeout(file_path, data, timeout): def append_to_file_with_timeout(data, timeout):
# Create a thread for appending to the file try:
append_thread = threading.Thread(target=append_to_file, args=(file_path, data)) log_queue.put(data, timeout=timeout)
except queue.Full:
# Start the thread
append_thread.start()
# Wait for the thread to complete or timeout
append_thread.join(timeout)
# If the thread is still running, it has exceeded the timeout
if append_thread.is_alive():
append_thread.join() # Optionally, you can force it to terminate
# Handle the timeout here, e.g., log an error
print("Appending to file timed out") print("Appending to file timed out")
#------------------------------------------------------------------------------- #-------------------------------------------------------------------------------
def print_log (pText): def print_log(pText):
# Check if logging is active
# Check LOG actived if not conf.LOG_LEVEL == 'debug':
if not conf.LOG_LEVEL == 'debug' :
return return
# Current Time # Current Time
log_timestamp2 = datetime.datetime.now(conf.tz).replace(microsecond=0) log_timestamp2 = datetime.datetime.now(conf.tz).replace(microsecond=0)
# Print line + time + elapsed time + text # Print line + time + text
file_print ('[LOG_LEVEL=debug] ', file_print('[LOG_LEVEL=debug]', log_timestamp2.strftime('%H:%M:%S'), pText)
# log_timestamp2, ' ',
log_timestamp2.strftime ('%H:%M:%S'), ' ',
pText)
return pText return pText
#------------------------------------------------------------------------------- #-------------------------------------------------------------------------------
# textchars = bytearray({7,8,9,10,12,13,27} | set(range(0x20, 0x100)) - {0x7f})
# is_binary_string = lambda bytes: bool(bytes.translate(None, textchars))
def append_file_binary(file_path, input_data): def append_file_binary(file_path, input_data):
with open(file_path, 'ab') as file: with open(file_path, 'ab') as file:
if isinstance(input_data, str): if isinstance(input_data, str):
input_data = input_data.encode('utf-8') # Encode string as bytes input_data = input_data.encode('utf-8') # Encode string as bytes
file.write(input_data) file.write(input_data)
#-------------------------------------------------------------------------------
def logResult(stdout, stderr):
if stderr is not None:
append_file_binary(logPath + '/stderr.log', stderr)
if stdout is not None:
append_file_binary(logPath + '/stdout.log', stdout)
#------------------------------------------------------------------------------- #-------------------------------------------------------------------------------
def logResult (stdout, stderr): def append_line_to_file(pPath, pText):
if stderr != None: # append the line using the correct python version
append_file_binary (logPath + '/stderr.log', stderr)
if stdout != None:
append_file_binary (logPath + '/stdout.log', stdout)
#-------------------------------------------------------------------------------
def append_line_to_file (pPath, pText):
# append the line depending using the correct python version
if sys.version_info < (3, 0): if sys.version_info < (3, 0):
file = io.open (pPath , mode='a', encoding='utf-8') file = io.open(pPath, mode='a', encoding='utf-8')
file.write ( pText.decode('unicode_escape') ) file.write(pText.decode('unicode_escape'))
file.close() file.close()
else: else:
file = open (pPath, 'a', encoding='utf-8') file = open(pPath, 'a', encoding='utf-8')
file.write (pText) file.write(pText)
file.close() file.close()

View File

@@ -4,6 +4,7 @@ import json
import subprocess import subprocess
import datetime import datetime
import base64 import base64
from concurrent.futures import ThreadPoolExecutor, as_completed
from collections import namedtuple from collections import namedtuple
@@ -145,6 +146,18 @@ def run_plugin_scripts(db, all_plugins, runType, pluginsState = plugins_state())
# Function to run a plugin command
def run_plugin(command, set_RUN_TIMEOUT):
try:
return subprocess.check_output(command, universal_newlines=True, stderr=subprocess.STDOUT, timeout=set_RUN_TIMEOUT)
except subprocess.CalledProcessError as e:
mylog('none', [e.output])
mylog('none', ['[Plugins] ⚠ ERROR - enable LOG_LEVEL=debug and check logs'])
return None
except subprocess.TimeoutExpired as timeErr:
mylog('none', [f'[Plugins] ⚠ ERROR - TIMEOUT - the plugin {plugin["unique_prefix"]} forcefully terminated as timeout reached. Increase TIMEOUT setting and scan interval.'])
return None
#------------------------------------------------------------------------------- #-------------------------------------------------------------------------------
# Executes the plugin command specified in the setting with the function specified as CMD # Executes the plugin command specified in the setting with the function specified as CMD
@@ -209,15 +222,14 @@ def execute_plugin(db, all_plugins, plugin, pluginsState = plugins_state() ):
mylog('verbose', ['[Plugins] Executing: ', set_CMD]) mylog('verbose', ['[Plugins] Executing: ', set_CMD])
mylog('debug', ['[Plugins] Resolved : ', command]) mylog('debug', ['[Plugins] Resolved : ', command])
try: # Using ThreadPoolExecutor to handle concurrent subprocesses
# try running a subprocess with a forced timeout in case the subprocess hangs with ThreadPoolExecutor(max_workers=5) as executor:
output = subprocess.check_output(command, universal_newlines=True, stderr=subprocess.STDOUT, timeout=(set_RUN_TIMEOUT)) futures = [executor.submit(run_plugin, command, set_RUN_TIMEOUT)] # Submit the command as a future
except subprocess.CalledProcessError as e:
# An error occurred, handle it for future in as_completed(futures):
mylog('none', [e.output]) output = future.result() # Get the output or error
mylog('none', ['[Plugins] ⚠ ERROR - enable LOG_LEVEL=debug and check logs']) if output is not None:
except subprocess.TimeoutExpired as timeErr: mylog('verbose', [f'[Plugins] Output: {output}'])
mylog('none', [f'[Plugins] ⚠ ERROR - TIMEOUT - the plugin {plugin["unique_prefix"]} forcefully terminated as timeout reached. Increase TIMEOUT setting and scan interval.'])
# Initialize newLines # Initialize newLines
newLines = [] newLines = []