LOG_LEVEL fix, WEBMON timeout multiplier, docs, fix for watched-changed #1053

This commit is contained in:
jokob-sk
2025-05-04 08:52:13 +10:00
parent 9d3a537b10
commit 46cbf85584
24 changed files with 213 additions and 190 deletions

View File

@@ -20,8 +20,150 @@ from notification import Notification_obj, write_notification
from user_events_queue import UserEventsQueue
from crypto_utils import generate_deterministic_guid
# Make sure log level is initialized correctly
Logger(get_setting_value('LOG_LEVEL'))
#-------------------------------------------------------------------------------
class plugin_manager:
def __init__(self, db, all_plugins):
self.db = db
self.all_plugins = all_plugins
# Make sure log level is initialized correctly
Logger(get_setting_value('LOG_LEVEL'))
#-------------------------------------------------------------------------------
def run_plugin_scripts(self, runType):
# Header
updateState("Run: Plugins")
mylog('debug', ['[Plugins] Check if any plugins need to be executed on run type: ', runType])
for plugin in self.all_plugins:
shouldRun = False
prefix = plugin["unique_prefix"]
set = get_plugin_setting_obj(plugin, "RUN")
# mylog('debug', [f'[run_plugin_scripts] plugin: {plugin}'])
# mylog('debug', [f'[run_plugin_scripts] set: {set}'])
if set != None and set['value'] == runType:
if runType != "schedule":
shouldRun = True
elif runType == "schedule":
# run if overdue scheduled time
# check schedules if any contains a unique plugin prefix matching the current plugin
for schd in conf.mySchedules:
if schd.service == prefix:
# Check if schedule overdue
shouldRun = schd.runScheduleCheck()
if shouldRun:
# Header
updateState(f"Plugin: {prefix}")
print_plugin_info(plugin, ['display_name'])
mylog('debug', ['[Plugins] CMD: ', get_plugin_setting_obj(plugin, "CMD")["value"]])
execute_plugin(self.db, self.all_plugins, plugin)
# update last run time
if runType == "schedule":
for schd in conf.mySchedules:
if schd.service == prefix:
# note the last time the scheduled plugin run was executed
schd.last_run = timeNowTZ()
#===============================================================================
# Handling of user initialized front-end events
#===============================================================================
def check_and_run_user_event(self):
"""
Process user events from the execution queue log file and notify the user about executed events.
"""
execution_log = UserEventsQueue()
# Track whether to show notification for executed events
executed_events = []
# Read the log file to get the lines
lines = execution_log.read_log()
if not lines:
mylog('debug', ['[check_and_run_user_event] User Execution Queue is empty'])
return # Exit early if the log file is empty
else:
mylog('debug', ['[check_and_run_user_event] Process User Execution Queue:' + ', '.join(map(str, lines))])
for line in lines:
# Extract event name and parameters from the log line
columns = line.strip().split('|')[2:4]
event, param = "", ""
if len(columns) == 2:
event, param = columns
# Process each event type
if event == 'test':
handle_test(param)
executed_events.append(f"test with param {param}")
execution_log.finalize_event("test")
elif event == 'run':
handle_run(param)
executed_events.append(f"run with param {param}")
execution_log.finalize_event("run")
elif event == 'update_api':
# async handling
update_api(self.db, self.all_plugins, False, param.split(','), True)
else:
mylog('minimal', ['[check_and_run_user_event] WARNING: Unhandled event in execution queue: ', event, ' | ', param])
execution_log.finalize_event(event) # Finalize unknown events to remove them
# Notify user about executed events (if applicable)
if len(executed_events) > 0 and executed_events:
executed_events_message = ', '.join(executed_events)
mylog('minimal', ['[check_and_run_user_event] INFO: Executed events: ', executed_events_message])
write_notification(f"[Ad-hoc events] Events executed: {executed_events_message}", "interrupt", timeNowTZ())
return
#-------------------------------------------------------------------------------
def handle_run(self, runType):
mylog('minimal', ['[', timeNowTZ(), '] START Run: ', runType])
# run the plugin to run
for plugin in self.all_plugins:
if plugin["unique_prefix"] == runType:
execute_plugin(self.db, self.all_plugins, plugin)
mylog('minimal', ['[', timeNowTZ(), '] END Run: ', runType])
return
#-------------------------------------------------------------------------------
def handle_test(self, runType):
mylog('minimal', ['[', timeNowTZ(), '] [Test] START Test: ', runType])
# Prepare test samples
sample_json = json.loads(get_file_content(reportTemplatesPath + 'webhook_json_sample.json'))[0]["body"]["attachments"][0]["text"]
# Create fake notification
notification = Notification_obj(db)
notificationObj = notification.create(sample_json, "")
# Run test
handle_run(runType)
# Remove sample notification
notificationObj.remove(notificationObj.GUID)
mylog('minimal', ['[Test] END Test: ', runType])
return
#-------------------------------------------------------------------------------
class plugin_param:
@@ -103,47 +245,7 @@ class plugin_param:
self.paramValuesCount = paramValuesCount
self.multiplyTimeout = multiplyTimeout
#-------------------------------------------------------------------------------
def run_plugin_scripts(db, all_plugins, runType):
# Header
updateState("Run: Plugins")
mylog('debug', ['[Plugins] Check if any plugins need to be executed on run type: ', runType])
for plugin in all_plugins:
shouldRun = False
prefix = plugin["unique_prefix"]
set = get_plugin_setting_obj(plugin, "RUN")
# mylog('debug', [f'[run_plugin_scripts] plugin: {plugin}'])
# mylog('debug', [f'[run_plugin_scripts] set: {set}'])
if set != None and set['value'] == runType:
if runType != "schedule":
shouldRun = True
elif runType == "schedule":
# run if overdue scheduled time
# check schedules if any contains a unique plugin prefix matching the current plugin
for schd in conf.mySchedules:
if schd.service == prefix:
# Check if schedule overdue
shouldRun = schd.runScheduleCheck()
if shouldRun:
# Header
updateState(f"Plugin: {prefix}")
print_plugin_info(plugin, ['display_name'])
mylog('debug', ['[Plugins] CMD: ', get_plugin_setting_obj(plugin, "CMD")["value"]])
execute_plugin(db, all_plugins, plugin)
# update last run time
if runType == "schedule":
for schd in conf.mySchedules:
if schd.service == prefix:
# note the last time the scheduled plugin run was executed
schd.last_run = timeNowTZ()
# Function to run a plugin command
@@ -448,13 +550,10 @@ def execute_plugin(db, all_plugins, plugin ):
# check if the subprocess / SQL query failed / there was no valid output
if len(sqlParams) == 0:
mylog('none', [f'[Plugins] No output received from the plugin "{plugin["unique_prefix"]}"'])
return
else:
mylog('verbose', ['[Plugins] SUCCESS, received ', len(sqlParams), ' entries'])
mylog('debug', ['[Plugins] sqlParam entries: ', sqlParams])
# process results if any
if len(sqlParams) > 0:
mylog('verbose', [f'[Plugins] SUCCESS for {plugin["unique_prefix"]} received {len(sqlParams)} entries'])
# mylog('debug', ['[Plugins] sqlParam entries: ', sqlParams])
# create objects
process_plugin_events(db, plugin, sqlParams)
@@ -483,7 +582,8 @@ def process_plugin_events(db, plugin, plugEventsArr):
pluginPref = plugin["unique_prefix"]
mylog('debug', ['[Plugins] Processing : ', pluginPref])
mylog('verbose', ['[Plugins] Processing : ', pluginPref])
try:
# Begin a transaction
@@ -497,8 +597,7 @@ def process_plugin_events(db, plugin, plugEventsArr):
for obj in plugObjectsArr:
pluginObjects.append(plugin_object_class(plugin, obj))
# create plugin objects from events - will be processed to find existing objects
for eve in plugEventsArr:
pluginEvents.append(plugin_object_class(plugin, eve))
@@ -506,15 +605,13 @@ def process_plugin_events(db, plugin, plugEventsArr):
mylog('debug', ['[Plugins] Existing objects from Plugins_Objects: ', len(pluginObjects)])
mylog('debug', ['[Plugins] Logged events from the plugin run : ', len(pluginEvents)])
# Loop thru all current events and update the status to "exists" if the event matches an existing object
index = 0
for tmpObjFromEvent in pluginEvents:
# compare hash of the IDs for uniqueness
if any(x.idsHash == tmpObjFromEvent.idsHash for x in pluginObjects):
if any(x.idsHash == tmpObjFromEvent.idsHash for x in pluginObjects):
pluginEvents[index].status = "exists"
index += 1
@@ -526,9 +623,13 @@ def process_plugin_events(db, plugin, plugEventsArr):
if tmpObjFromEvent.status == "exists":
# compare hash of the changed watched columns for uniqueness
if any(x.watchedHash != tmpObjFromEvent.watchedHash for x in pluginObjects):
pluginEvents[index].status = "watched-changed"
# compare hash of the changed watched columns for uniqueness - make sure you compare the values with the same idsHash before checking watchedHash
if any(
x.idsHash == tmpObjFromEvent.idsHash and x.watchedHash != tmpObjFromEvent.watchedHash
for x in pluginObjects
):
pluginEvents[index].status = "watched-changed"
else:
pluginEvents[index].status = "watched-not-changed"
index += 1
@@ -612,9 +713,9 @@ def process_plugin_events(db, plugin, plugEventsArr):
mylog('debug', ['[Plugins] objects_to_insert count: ', len(objects_to_insert)])
mylog('debug', ['[Plugins] objects_to_update count: ', len(objects_to_update)])
mylog('trace', ['[Plugins] objects_to_update: ', objects_to_update])
mylog('trace', ['[Plugins] events_to_insert: ', events_to_insert])
mylog('trace', ['[Plugins] history_to_insert: ', history_to_insert])
# mylog('debug', ['[Plugins] objects_to_update: ', objects_to_update])
# mylog('debug', ['[Plugins] events_to_insert: ', events_to_insert])
# mylog('debug', ['[Plugins] history_to_insert: ', history_to_insert])
logEventStatusCounts('pluginEvents', pluginEvents)
logEventStatusCounts('pluginObjects', pluginObjects)
@@ -838,106 +939,20 @@ class plugin_object_class:
for clmName in self.watchedClmns:
for mapping in indexNameColumnMapping:
if clmName == indexNameColumnMapping[1]:
self.watchedIndxs.append(indexNameColumnMapping[0])
if clmName == mapping[1]:
self.watchedIndxs.append(mapping[0])
tmp = ''
for indx in self.watchedIndxs:
tmp += str(objDbRow[indx])
self.watchedHash = str(hash(tmp))
#===============================================================================
# Handling of user initialized front-end events
#===============================================================================
def check_and_run_user_event(db, all_plugins):
"""
Process user events from the execution queue log file and notify the user about executed events.
"""
execution_log = UserEventsQueue()
# Track whether to show notification for executed events
executed_events = []
# Read the log file to get the lines
lines = execution_log.read_log()
if not lines:
mylog('debug', ['[check_and_run_user_event] User Execution Queue is empty'])
return # Exit early if the log file is empty
else:
mylog('debug', ['[check_and_run_user_event] Process User Execution Queue:' + ', '.join(map(str, lines))])
for line in lines:
# Extract event name and parameters from the log line
columns = line.strip().split('|')[2:4]
event, param = "", ""
if len(columns) == 2:
event, param = columns
# Process each event type
if event == 'test':
handle_test(param, db, all_plugins)
executed_events.append(f"test with param {param}")
execution_log.finalize_event("test")
elif event == 'run':
handle_run(param, db, all_plugins)
executed_events.append(f"run with param {param}")
execution_log.finalize_event("run")
elif event == 'update_api':
# async handling
update_api(db, all_plugins, False, param.split(','), True)
else:
mylog('minimal', ['[check_and_run_user_event] WARNING: Unhandled event in execution queue: ', event, ' | ', param])
execution_log.finalize_event(event) # Finalize unknown events to remove them
# Notify user about executed events (if applicable)
if len(executed_events) > 0 and executed_events:
executed_events_message = ', '.join(executed_events)
mylog('minimal', ['[check_and_run_user_event] INFO: Executed events: ', executed_events_message])
write_notification(f"[Ad-hoc events] Events executed: {executed_events_message}", "interrupt", timeNowTZ())
return
def __repr__(self):
attrs = vars(self)
return f"<PluginObject " + ", ".join(f"{k}={v!r}" for k, v in attrs.items()) + ">"
#-------------------------------------------------------------------------------
def handle_run(runType, db, all_plugins):
mylog('minimal', ['[', timeNowTZ(), '] START Run: ', runType])
# run the plugin to run
for plugin in all_plugins:
if plugin["unique_prefix"] == runType:
execute_plugin(db, all_plugins, plugin)
mylog('minimal', ['[', timeNowTZ(), '] END Run: ', runType])
return
#-------------------------------------------------------------------------------
def handle_test(runType, db, all_plugins):
mylog('minimal', ['[', timeNowTZ(), '] [Test] START Test: ', runType])
# Prepare test samples
sample_json = json.loads(get_file_content(reportTemplatesPath + 'webhook_json_sample.json'))[0]["body"]["attachments"][0]["text"]
# Create fake notification
notification = Notification_obj(db)
notificationObj = notification.create(sample_json, "")
# Run test
handle_run(runType, db, all_plugins)
# Remove sample notification
notificationObj.remove(notificationObj.GUID)
mylog('minimal', ['[Test] END Test: ', runType])
return