mirror of
https://github.com/jokob-sk/NetAlertX.git
synced 2025-12-07 09:36:05 -08:00
Fix debounce of api points to address Disk IO #914
This commit is contained in:
@@ -84,8 +84,6 @@ def main ():
|
|||||||
|
|
||||||
mylog('debug', '[MAIN] Starting loop')
|
mylog('debug', '[MAIN] Starting loop')
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
all_plugins = None
|
all_plugins = None
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
|
|||||||
@@ -24,7 +24,7 @@ stop_event = threading.Event() # Event to signal thread termination
|
|||||||
#===============================================================================
|
#===============================================================================
|
||||||
# API
|
# API
|
||||||
#===============================================================================
|
#===============================================================================
|
||||||
def update_api(db, all_plugins, isNotification=False, updateOnlyDataSources=[], is_ad_hoc_user_event=False):
|
def update_api(db, all_plugins, updateOnlyDataSources=[], is_ad_hoc_user_event=False):
|
||||||
mylog('debug', ['[API] Update API starting'])
|
mylog('debug', ['[API] Update API starting'])
|
||||||
|
|
||||||
# Start periodic write if not running
|
# Start periodic write if not running
|
||||||
@@ -88,13 +88,13 @@ class api_endpoint_class:
|
|||||||
self.fileName = path.split('/')[-1]
|
self.fileName = path.split('/')[-1]
|
||||||
self.hash = hash(json.dumps(self.jsonData))
|
self.hash = hash(json.dumps(self.jsonData))
|
||||||
self.debounce_interval = 5 # Time to wait before writing
|
self.debounce_interval = 5 # Time to wait before writing
|
||||||
self.last_update_time = current_time - datetime.timedelta(minutes=1) # Last time data was updated
|
self.changeDetectedWhen = None
|
||||||
|
# self.last_update_time = current_time - datetime.timedelta(minutes=1) # Last time data was updated
|
||||||
self.is_ad_hoc_user_event = is_ad_hoc_user_event
|
self.is_ad_hoc_user_event = is_ad_hoc_user_event
|
||||||
|
self.needsUpdate = False
|
||||||
|
|
||||||
# Check if the endpoint needs to be updated
|
# Check if the endpoint needs to be updated
|
||||||
found = False
|
found = False
|
||||||
changed = False
|
|
||||||
changedIndex = -1
|
|
||||||
index = 0
|
index = 0
|
||||||
|
|
||||||
# Search previous endpoint states to check if API needs updating
|
# Search previous endpoint states to check if API needs updating
|
||||||
@@ -102,28 +102,27 @@ class api_endpoint_class:
|
|||||||
# Match SQL and API endpoint path
|
# Match SQL and API endpoint path
|
||||||
if endpoint.query == self.query and endpoint.path == self.path:
|
if endpoint.query == self.query and endpoint.path == self.path:
|
||||||
found = True
|
found = True
|
||||||
|
mylog('trace', [f'[API] api_endpoint_class: Hashes (file|old|new): ({self.fileName}|{endpoint.hash}|{self.hash})'])
|
||||||
if endpoint.hash != self.hash:
|
if endpoint.hash != self.hash:
|
||||||
changed = True
|
self.needsUpdate = True
|
||||||
changedIndex = index
|
# Only update changeDetectedWhen if it hasn't been set recently
|
||||||
|
if not self.changeDetectedWhen or current_time > (self.changeDetectedWhen + datetime.timedelta(seconds=self.debounce_interval)):
|
||||||
|
self.changeDetectedWhen = current_time # Set timestamp for change detection
|
||||||
|
if index < len(apiEndpoints):
|
||||||
|
apiEndpoints[index] = self
|
||||||
|
# check end of bounds and replace
|
||||||
|
if index < len(apiEndpoints):
|
||||||
|
apiEndpoints[index] = self
|
||||||
|
|
||||||
index = index + 1
|
index = index + 1
|
||||||
|
|
||||||
# Check if API endpoints have changed or if it's a new one
|
|
||||||
if not found or changed:
|
|
||||||
|
|
||||||
mylog('trace', [f'[API] api_endpoint_class: Updating {self.fileName}'])
|
# needs also an update if new endpoint
|
||||||
|
if not found:
|
||||||
if not found:
|
self.needsUpdate = True
|
||||||
apiEndpoints.append(self)
|
# Only update changeDetectedWhen if it hasn't been set recently
|
||||||
|
if not self.changeDetectedWhen or current_time > (self.changeDetectedWhen + datetime.timedelta(seconds=self.debounce_interval)):
|
||||||
elif changed and changedIndex != -1 and changedIndex < len(apiEndpoints):
|
self.changeDetectedWhen = current_time # Initialize timestamp for new endpoint
|
||||||
# Update hash and data
|
apiEndpoints.append(self)
|
||||||
apiEndpoints[changedIndex].hash = self.hash
|
|
||||||
apiEndpoints[changedIndex].jsonData = self.jsonData
|
|
||||||
|
|
||||||
mylog('trace', [f'[API] api_endpoint_class: Updating hash {self.hash}'])
|
|
||||||
else:
|
|
||||||
mylog('none', [f'[API] ⚠ ERROR Updating {self.fileName}'])
|
|
||||||
|
|
||||||
# Needs to be called for initial updates
|
# Needs to be called for initial updates
|
||||||
self.try_write()
|
self.try_write()
|
||||||
@@ -133,12 +132,16 @@ class api_endpoint_class:
|
|||||||
current_time = timeNowTZ()
|
current_time = timeNowTZ()
|
||||||
|
|
||||||
# Debugging info to understand the issue
|
# Debugging info to understand the issue
|
||||||
# mylog('verbose', [f'[API] api_endpoint_class: {self.fileName} is_ad_hoc_user_event {self.is_ad_hoc_user_event} last_update_time={self.last_update_time}, debounce time={self.last_update_time + datetime.timedelta(seconds=self.debounce_interval)}.'])
|
# mylog('debug', [f'[API] api_endpoint_class: {self.fileName} is_ad_hoc_user_event {self.is_ad_hoc_user_event} last_update_time={self.last_update_time}, debounce time={self.last_update_time + datetime.timedelta(seconds=self.debounce_interval)}.'])
|
||||||
|
|
||||||
# Only attempt to write if the debounce time has passed
|
# Only attempt to write if the debounce time has passed
|
||||||
if current_time > (self.last_update_time + datetime.timedelta(seconds=self.debounce_interval)):
|
if self.needsUpdate and (self.changeDetectedWhen is None or current_time > (self.changeDetectedWhen + datetime.timedelta(seconds=self.debounce_interval))):
|
||||||
|
|
||||||
|
mylog('debug', [f'[API] api_endpoint_class: Writing {self.fileName} after debounce.'])
|
||||||
|
|
||||||
write_file(self.path, json.dumps(self.jsonData))
|
write_file(self.path, json.dumps(self.jsonData))
|
||||||
# mylog('verbose', [f'[API] api_endpoint_class: Writing {self.fileName} after debounce.'])
|
|
||||||
|
self.needsUpdate = False
|
||||||
self.last_update_time = timeNowTZ() # Reset last_update_time after writing
|
self.last_update_time = timeNowTZ() # Reset last_update_time after writing
|
||||||
|
|
||||||
# Update user event execution log
|
# Update user event execution log
|
||||||
|
|||||||
@@ -514,10 +514,10 @@ def update_devices_names (db):
|
|||||||
foundNsLookup = 0
|
foundNsLookup = 0
|
||||||
foundNbtLookup = 0
|
foundNbtLookup = 0
|
||||||
|
|
||||||
# Gen unknown devices
|
# Gen unknown devices
|
||||||
sql.execute ("SELECT * FROM Devices WHERE devName IN ('(unknown)','', '(name not found)') AND devLastIP <> '-'")
|
device_handler = Device_obj(db)
|
||||||
unknownDevices = sql.fetchall()
|
# Retrieve devices
|
||||||
db.commitDB()
|
unknownDevices = device_handler.getUnknown()
|
||||||
|
|
||||||
# skip checks if no unknown devices
|
# skip checks if no unknown devices
|
||||||
if len(unknownDevices) == 0:
|
if len(unknownDevices) == 0:
|
||||||
|
|||||||
@@ -166,7 +166,7 @@ class Query(ObjectType):
|
|||||||
if options.search:
|
if options.search:
|
||||||
# Define static list of searchable fields
|
# Define static list of searchable fields
|
||||||
searchable_fields = [
|
searchable_fields = [
|
||||||
"devName", "devMac", "devOwner", "devType", "devVendor", "devLastIP"
|
"devName", "devMac", "devOwner", "devType", "devVendor", "devLastIP",
|
||||||
"devGroup", "devComments", "devLocation", "devStatus",
|
"devGroup", "devComments", "devLocation", "devStatus",
|
||||||
"devSSID", "devSite", "devSourcePlugin", "devSyncHubNode"
|
"devSSID", "devSite", "devSourcePlugin", "devSyncHubNode"
|
||||||
]
|
]
|
||||||
|
|||||||
@@ -383,7 +383,7 @@ def importConfigs (db, all_plugins):
|
|||||||
db.commitDB()
|
db.commitDB()
|
||||||
|
|
||||||
# update only the settings datasource
|
# update only the settings datasource
|
||||||
update_api(db, all_plugins, False, ["settings"])
|
update_api(db, all_plugins, ["settings"])
|
||||||
|
|
||||||
# run plugins that are modifying the config
|
# run plugins that are modifying the config
|
||||||
run_plugin_scripts(db, all_plugins, 'before_config_save' )
|
run_plugin_scripts(db, all_plugins, 'before_config_save' )
|
||||||
|
|||||||
@@ -467,7 +467,7 @@ def execute_plugin(db, all_plugins, plugin, pluginsState = plugins_state() ):
|
|||||||
pluginsState = process_plugin_events(db, plugin, pluginsState, sqlParams)
|
pluginsState = process_plugin_events(db, plugin, pluginsState, sqlParams)
|
||||||
|
|
||||||
# update API endpoints
|
# update API endpoints
|
||||||
update_api(db, all_plugins, False, ["plugins_events","plugins_objects", "plugins_history", "appevents"])
|
update_api(db, all_plugins, ["plugins_events","plugins_objects", "plugins_history", "appevents"])
|
||||||
|
|
||||||
return pluginsState
|
return pluginsState
|
||||||
|
|
||||||
@@ -873,7 +873,7 @@ def check_and_run_user_event(db, all_plugins, pluginsState):
|
|||||||
execution_log.finalize_event("run")
|
execution_log.finalize_event("run")
|
||||||
elif event == 'update_api':
|
elif event == 'update_api':
|
||||||
# async handling
|
# async handling
|
||||||
update_api(db, all_plugins, False, param.split(','), True)
|
update_api(db, all_plugins, param.split(','), True)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
mylog('minimal', ['[check_and_run_user_event] WARNING: Unhandled event in execution queue: ', event, ' | ', param])
|
mylog('minimal', ['[check_and_run_user_event] WARNING: Unhandled event in execution queue: ', event, ' | ', param])
|
||||||
|
|||||||
Reference in New Issue
Block a user