plugins fixes + breakup plugins.py

This commit is contained in:
Jokob-sk
2023-08-21 17:28:31 +10:00
parent b732a87409
commit 57f508f15c
8 changed files with 267 additions and 226 deletions

View File

@@ -239,7 +239,7 @@ function generateTabs()
var histCount = 0
var histCountDisplayed = 0
for(i=pluginHistory.length-1;i >= 0;i--) // from latest to the oldest
for(i=0;i < pluginHistory.length ;i++)
{
if(pluginHistory[i].Plugin == prefix)
{

View File

@@ -206,6 +206,7 @@ def main ():
# process all the scanned data into new devices
mylog('debug', [f'[MAIN] processScan: {pluginsState.processScan}'])
if pluginsState.processScan == True:
mylog('debug', "[MAIN] start processig scan results")
pluginsState.processScan = False
@@ -213,23 +214,22 @@ def main ():
# Reporting
if conf.cycle in conf.check_report:
# Check if new devices found
sql.execute (sql_new_devices)
newDevices = sql.fetchall()
db.commitDB()
# new devices were found
if len(newDevices) > 0:
# run all plugins registered to be run when new devices are found
pluginsState = run_plugin_scripts(db, 'on_new_device', pluginsState)
# Check if new devices found
sql.execute (sql_new_devices)
newDevices = sql.fetchall()
db.commitDB()
# new devices were found
if len(newDevices) > 0:
# run all plugins registered to be run when new devices are found
pluginsState = run_plugin_scripts(db, 'on_new_device', pluginsState)
# Scan newly found devices with Nmap if enabled
if conf.NMAP_ACTIVE and len(newDevices) > 0:
performNmapScan( db, newDevices)
# Scan newly found devices with Nmap if enabled
if conf.NMAP_ACTIVE and len(newDevices) > 0:
performNmapScan( db, newDevices)
# send all configured notifications
send_notifications(db)
# send all configured notifications
send_notifications(db)
# clean up the DB once an hour
if conf.last_cleanup + datetime.timedelta(hours = 1) < loop_start_time:

View File

@@ -26,7 +26,6 @@ last_scan_run = ''
last_cleanup = ''
last_update_vendors = ''
last_version_check = ''
check_report = []
arpscan_devices = []
# for MQTT

View File

@@ -42,7 +42,7 @@ sql_settings = "SELECT * FROM Settings"
sql_plugins_objects = "SELECT * FROM Plugins_Objects"
sql_language_strings = "SELECT * FROM Plugins_Language_Strings"
sql_plugins_events = "SELECT * FROM Plugins_Events"
sql_plugins_history = "SELECT * FROM Plugins_History ORDER BY 'Index' DESC"
sql_plugins_history = "SELECT * FROM Plugins_History ORDER BY DateTimeChanged DESC"
sql_new_devices = """SELECT * FROM (
SELECT eve_IP as dev_LastIP, eve_MAC as dev_MAC
FROM Events_Devices

View File

@@ -96,16 +96,27 @@ class DB():
self.sql.execute (f"""DELETE FROM Events
WHERE eve_DateTime <= date('now', '-{str(DAYS_TO_KEEP_EVENTS)} day')""")
# Cleanup Plugin Events History
mylog('verbose', ['[DB Cleanup] Plugins_History: Delete all older than '+str(DAYS_TO_KEEP_EVENTS)+' days (DAYS_TO_KEEP_EVENTS setting)'])
self.sql.execute (f"""DELETE FROM Plugins_History
WHERE DateTimeChanged <= date('now', '{str(DAYS_TO_KEEP_EVENTS)} day')""")
# # Cleanup Plugin Events History
# mylog('verbose', ['[DB Cleanup] Plugins_History: Delete all older than '+str(DAYS_TO_KEEP_EVENTS)+' days (DAYS_TO_KEEP_EVENTS setting)'])
# self.sql.execute (f"""DELETE FROM Plugins_History
# WHERE DateTimeChanged <= date('now', '{str(DAYS_TO_KEEP_EVENTS)} day')""")
# Trim Plugins_History entries to less than PLUGINS_KEEP_HIST setting
mylog('verbose', [f'[DB Cleanup] Plugins_History: Trim Plugins_History entries to less than {str(PLUGINS_KEEP_HIST)} (PLUGINS_KEEP_HIST setting)'])
self.sql.execute (f"""DELETE from Plugins_History where "Index" not in (
SELECT "Index" from Plugins_History
order by "Index" desc limit {str(PLUGINS_KEEP_HIST)})""")
# Trim Plugins_History entries to less than PLUGINS_KEEP_HIST setting per unique "Plugin" column entry
mylog('verbose', [f'[DB Cleanup] Plugins_History: Trim Plugins_History entries to less than {str(PLUGINS_KEEP_HIST)} per Plugin (PLUGINS_KEEP_HIST setting)'])
# Build the SQL query to delete entries that exceed the limit per unique "Plugin" column entry
delete_query = f"""DELETE FROM Plugins_History
WHERE "Index" NOT IN (
SELECT "Index"
FROM (
SELECT "Index",
ROW_NUMBER() OVER(PARTITION BY "Plugin" ORDER BY DateTimeChanged DESC) AS row_num
FROM Plugins_History
) AS ranked_objects
WHERE row_num <= {str(PLUGINS_KEEP_HIST)}
);"""
self.sql.execute(delete_query)
# Cleanup Pholus_Scan
if PHOLUS_DAYS_DATA != 0:
@@ -120,7 +131,8 @@ class DB():
WHERE dev_NewDevice = 1 AND dev_FirstConnection < date('now', '+{str(HRS_TO_KEEP_NEWDEV)} hour')""")
# De-dupe (de-duplicate) from the Plugins_Objects table
# De-dupe (de-duplicate) from the Plugins_Objects table
# TODO This shouldn't be necessary - probably a concurrency bug somewhere in the code :(
mylog('verbose', ['[DB Cleanup] Plugins_Objects: Delete all duplicates'])
self.sql.execute("""
DELETE FROM Plugins_Objects
@@ -245,18 +257,18 @@ class DB():
self.sql.execute("DROP TABLE Settings;")
self.sql.execute("""
CREATE TABLE "Settings" (
"Code_Name" TEXT,
"Display_Name" TEXT,
"Description" TEXT,
"Type" TEXT,
"Options" TEXT,
"RegEx" TEXT,
"Value" TEXT,
"Group" TEXT,
"Events" TEXT
);
""")
CREATE TABLE "Settings" (
"Code_Name" TEXT,
"Display_Name" TEXT,
"Description" TEXT,
"Type" TEXT,
"Options" TEXT,
"RegEx" TEXT,
"Value" TEXT,
"Group" TEXT,
"Events" TEXT
);
""")
# indicates, if Pholus_Scan table is available
pholusScanMissing = self.sql.execute("""
@@ -500,19 +512,13 @@ def insertOnlineHistory(db):
startTime = timeNowTZ()
# Add to History
# only run this if the scans have run
scanCount = db.read_one("SELECT count(*) FROM CurrentScan")
if scanCount[0] == 0 :
mylog('debug',[ '[insertOnlineHistory] - nothing to do, currentScan empty'])
return 0
History_All = db.read("SELECT * FROM Devices")
History_All_Devices = len(History_All)
History_Archived = db.read("SELECT * FROM Devices WHERE dev_Archived = 1")
History_Archived_Devices = len(History_Archived)
History_Online = db.read("SELECT * FROM CurrentScan")
History_Online = db.read("SELECT * FROM Devices WHERE dev_PresentLastScan = 1")
History_Online_Devices = len(History_Online)
History_Offline_Devices = History_All_Devices - History_Archived_Devices - History_Online_Devices

View File

@@ -13,7 +13,8 @@ from helper import collect_lang_strings, updateSubnets, initOrSetParam, isJsonOb
from logger import mylog
from api import update_api
from scheduler import schedule_class
from plugin import get_plugins_configs, print_plugin_info, run_plugin_scripts
from plugin import print_plugin_info, run_plugin_scripts
from plugin_utils import get_plugins_configs
#===============================================================================
# Initialise user defined values
@@ -175,7 +176,6 @@ def importConfigs (db):
# init all time values as we have timezone - all this shoudl be moved into plugin/plugin settings
conf.time_started = datetime.datetime.now(conf.tz)
conf.cycle = ""
conf.check_report = [1, "internet_IP", "update_vendors_silent"]
conf.plugins_once_run = False
#cron_instance = Cron()

View File

@@ -3,7 +3,7 @@ import sqlite3
import json
import subprocess
import datetime
import base64
from collections import namedtuple
# pialert modules
@@ -12,6 +12,7 @@ from const import pluginsPath, logPath
from logger import mylog
from helper import timeNowTZ, updateState, get_file_content, write_file, get_setting, get_setting_value
from api import update_api
from plugin_utils import logEventStatusCounts, get_plugin_string, get_plugin_setting, print_plugin_info, flatten_array, combine_plugin_objects, resolve_wildcards_arr
#-------------------------------------------------------------------------------
@@ -67,77 +68,6 @@ def run_plugin_scripts(db, runType, pluginsState = None):
#-------------------------------------------------------------------------------
def get_plugins_configs():
pluginsList = [] # Create an empty list to store plugin configurations
# Get a list of top-level directories in the specified pluginsPath
dirs = next(os.walk(pluginsPath))[1]
# Loop through each directory (plugin folder) in dirs
for d in dirs:
# Check if the directory name does not start with "__" and does not end with "__ignore"
if not d.startswith("__") and not d.endswith("__ignore"):
# Construct the path to the config.json file within the plugin folder
config_path = os.path.join(pluginsPath, d, "config.json")
# Load the contents of the config.json file as a JSON object and append it to pluginsList
pluginsList.append(json.loads(get_file_content(config_path)))
return pluginsList # Return the list of plugin configurations
#-------------------------------------------------------------------------------
def print_plugin_info(plugin, elements = ['display_name']):
mylog('verbose', ['[Plugins] ---------------------------------------------'])
for el in elements:
res = get_plugin_string(plugin, el)
mylog('verbose', ['[Plugins] ', el ,': ', res])
#-------------------------------------------------------------------------------
# Gets the whole setting object
def get_plugin_setting(plugin, function_key):
result = None
for set in plugin['settings']:
if set["function"] == function_key:
result = set
if result == None:
mylog('debug', ['[Plugins] Setting with "function":"', function_key, '" is missing in plugin: ', get_plugin_string(plugin, 'display_name')])
return result
#-------------------------------------------------------------------------------
# Get localized string value on the top JSON depth, not recursive
def get_plugin_string(props, el):
result = ''
if el in props['localized']:
for val in props[el]:
if val['language_code'] == 'en_us':
result = val['string']
if result == '':
result = 'en_us string missing'
else:
result = props[el]
return result
#-------------------------------------------------------------------------------
# Executes the plugin command specified in the setting with the function specified as CMD
def execute_plugin(db, plugin, pluginsState = plugins_state() ):
@@ -356,21 +286,13 @@ def execute_plugin(db, plugin, pluginsState = plugins_state() ):
mylog('verbose', ['[Plugins] SUCCESS, received ', len(sqlParams), ' entries'])
# process results if any
if len(sqlParams) > 0:
try:
sql.executemany("""INSERT INTO Plugins_History ("Index", "Plugin", "Object_PrimaryID", "Object_SecondaryID", "DateTimeCreated", "DateTimeChanged", "Watched_Value1", "Watched_Value2", "Watched_Value3", "Watched_Value4", "Status" ,"Extra", "UserData", "ForeignKey") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", sqlParams)
db.commitDB()
except sqlite3.Error as e:
db.rollbackDB() # Rollback changes in case of an error
mylog('none', ['[Plugins] ERROR inserting into Plugins_History:', e])
if len(sqlParams) > 0:
# create objects
pluginsState = process_plugin_events(db, plugin, pluginsState, sqlParams)
# update API endpoints
update_api(db, False, ["plugins_events","plugins_objects"])
update_api(db, False, ["plugins_events","plugins_objects", "plugins_history"])
return pluginsState
@@ -432,78 +354,17 @@ def get_plugin_setting_value(plugin, function_key):
#-------------------------------------------------------------------------------
def flatten_array(arr, encodeBase64=False):
tmp = ''
arrayItemStr = ''
mylog('debug', '[Plugins] Flattening the below array')
mylog('debug', f'[Plugins] Convert to Base64: {encodeBase64}')
mylog('debug', arr)
for arrayItem in arr:
# only one column flattening is supported
if isinstance(arrayItem, list):
arrayItemStr = str(arrayItem[0]).replace("'", '') # removing single quotes - not allowed
else:
# is string already
arrayItemStr = arrayItem
tmp += f'{arrayItemStr},'
tmp = tmp[:-1] # Remove last comma ','
mylog('debug', f'[Plugins] Flattened array: {tmp}')
if encodeBase64:
tmp = str(base64.b64encode(tmp.encode('ascii')))
mylog('debug', f'[Plugins] Flattened array (base64): {tmp}')
return tmp
#-------------------------------------------------------------------------------
# Replace {wildcars} with parameters
def resolve_wildcards_arr(commandArr, params):
mylog('debug', ['[Plugins] Pre-Resolved CMD: '] + commandArr)
for param in params:
# mylog('debug', ['[Plugins] key : {', param[0], '}'])
# mylog('debug', ['[Plugins] resolved: ', param[1]])
i = 0
for comPart in commandArr:
commandArr[i] = comPart.replace('{' + param[0] + '}', param[1]).replace('{s-quote}',"'")
i += 1
return commandArr
#-------------------------------------------------------------------------------
# Combine plugin objects, keep user-defined values, created time, changed time if nothing changed and the index
def combine_plugin_objects(old, new):
new.userData = old.userData
new.index = old.index
new.created = old.created
# Keep changed time if nothing changed
if new.status in ['watched-not-changed']:
new.changed = old.changed
# return the new object, with some of the old values
return new
#-------------------------------------------------------------------------------
# Check if watched values changed for the given plugin
def process_plugin_events(db, plugin, pluginsState, plugEventsArr):
sql = db.sql
# Access the connection from the DB instance
@@ -517,25 +378,24 @@ def process_plugin_events(db, plugin, pluginsState, plugEventsArr):
# Begin a transaction
with conn:
# get existing objects
plugObjectsArr = db.get_sql_array ("SELECT * FROM Plugins_Objects where Plugin = '" + str(pluginPref)+"'")
pluginObjects = []
pluginEvents = []
# Create plugin objects from existing database entries
plugObjectsArr = db.get_sql_array ("SELECT * FROM Plugins_Objects where Plugin = '" + str(pluginPref)+"'")
for obj in plugObjectsArr:
pluginObjects.append(plugin_object_class(plugin, obj))
existingPluginObjectsCount = len(pluginObjects)
mylog('debug', ['[Plugins] Existing objects : ', existingPluginObjectsCount])
mylog('debug', ['[Plugins] New and existing events : ', len(plugEventsArr)])
# create plugin objects from events - will be processed to find existing objects
for eve in plugEventsArr:
pluginEvents.append(plugin_object_class(plugin, eve))
mylog('debug', ['[Plugins] Existing objects from Plugins_Objects: ', len(pluginObjects)])
mylog('debug', ['[Plugins] Logged events from the plugin run : ', len(pluginEvents)])
# Loop thru all current events and update the status to "exists" if the event matches an existing object
index = 0
@@ -572,9 +432,11 @@ def process_plugin_events(db, plugin, pluginsState, plugEventsArr):
isMissing = False
if isMissing:
tmpObj.status = "missing-in-last-scan"
tmpObj.changed = timeNowTZ()
mylog('debug', ['[Plugins] Missing from last scan: ', tmpObj.primaryId , tmpObj.secondaryId])
# if wasn't missing before, mark as changed
if tmpObj.status != "missing-in-last-scan":
tmpObj.changed = timeNowTZ().strftime('%Y-%m-%d %H:%M:%S')
tmpObj.status = "missing-in-last-scan"
mylog('debug', [f'[Plugins] Missing from last scan (PrimaryID | SecondaryID): {tmpObj.primaryId} | {tmpObj.secondaryId}'])
# Merge existing plugin objects with newly discovered ones and update existing ones with new values
@@ -601,6 +463,7 @@ def process_plugin_events(db, plugin, pluginsState, plugEventsArr):
# Create lists to hold the data for bulk insertion
objects_to_insert = []
events_to_insert = []
history_to_insert = []
objects_to_update = []
statuses_to_report_on = get_plugin_setting_value(plugin, "REPORT_ON")
@@ -627,11 +490,20 @@ def process_plugin_events(db, plugin, pluginsState, plugEventsArr):
if plugObj.status in statuses_to_report_on:
events_to_insert.append(values)
mylog('debug', ['[Plugins] events_to_insert count: ', len(events_to_insert)])
mylog('debug', ['[Plugins] pluginEvents count : ', len(pluginEvents)])
logEventStatusCounts(pluginEvents)
mylog('debug', ['[Plugins] pluginObjects count: ', len(pluginObjects)])
logEventStatusCounts(pluginObjects)
# combine all DB insert and update events into one for history
history_to_insert.append(values)
mylog('debug', ['[Plugins] pluginEvents count: ', len(pluginEvents)])
mylog('debug', ['[Plugins] pluginObjects count: ', len(pluginObjects)])
mylog('debug', ['[Plugins] events_to_insert count: ', len(events_to_insert)])
mylog('debug', ['[Plugins] history_to_insert count: ', len(history_to_insert)])
mylog('debug', ['[Plugins] objects_to_insert count: ', len(objects_to_insert)])
mylog('debug', ['[Plugins] objects_to_update count: ', len(objects_to_update)])
logEventStatusCounts('pluginEvents', pluginEvents)
logEventStatusCounts('pluginObjects', pluginObjects)
# Bulk insert objects
if objects_to_insert:
@@ -659,6 +531,7 @@ def process_plugin_events(db, plugin, pluginsState, plugEventsArr):
# Bulk insert events
if events_to_insert:
sql.executemany(
"""
INSERT INTO Plugins_Events
@@ -669,6 +542,19 @@ def process_plugin_events(db, plugin, pluginsState, plugEventsArr):
""", events_to_insert
)
# Bulk insert history entries
if history_to_insert:
sql.executemany(
"""
INSERT INTO Plugins_History
("Plugin", "Object_PrimaryID", "Object_SecondaryID", "DateTimeCreated",
"DateTimeChanged", "Watched_Value1", "Watched_Value2", "Watched_Value3",
"Watched_Value4", "Status", "Extra", "UserData", "ForeignKey")
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", history_to_insert
)
# Commit changes to the database
db.commitDB()
@@ -789,8 +675,8 @@ class plugin_object_class:
if self.status not in ["exists", "watched-changed", "watched-not-changed", "new", "not-processed", "missing-in-last-scan"]:
raise ValueError("Invalid status value for plugin object:", self.status)
# self.idsHash = str(hash(str(self.primaryId) + str(self.secondaryId)))
self.idsHash = str(self.primaryId) + str(self.secondaryId)
self.idsHash = str(hash(str(self.primaryId) + str(self.secondaryId)))
# self.idsHash = str(self.primaryId) + str(self.secondaryId)
self.watchedClmns = []
self.watchedIndxs = []
@@ -814,16 +700,3 @@ class plugin_object_class:
self.watchedHash = str(hash(tmp))
#-------------------------------------------------------------------------------
def logEventStatusCounts(pluginEvents):
status_counts = {} # Dictionary to store counts for each status
for event in pluginEvents:
status = event.status
if status in status_counts:
status_counts[status] += 1
else:
status_counts[status] = 1
for status, count in status_counts.items():
mylog('debug', [f'Status "{status}": {count} events'])

163
pialert/plugin_utils.py Normal file
View File

@@ -0,0 +1,163 @@
import os
import base64
import json
from logger import mylog
from const import pluginsPath, logPath
from helper import timeNowTZ, updateState, get_file_content, write_file, get_setting, get_setting_value
#-------------------------------------------------------------------------------
def logEventStatusCounts(objName, pluginEvents):
status_counts = {} # Dictionary to store counts for each status
for event in pluginEvents:
status = event.status
if status in status_counts:
status_counts[status] += 1
else:
status_counts[status] = 1
for status, count in status_counts.items():
mylog('debug', [f'[Plugins] In {objName} there are {count} events with the status "{status}" '])
#-------------------------------------------------------------------------------
def print_plugin_info(plugin, elements = ['display_name']):
mylog('verbose', ['[Plugins] ---------------------------------------------'])
for el in elements:
res = get_plugin_string(plugin, el)
mylog('verbose', ['[Plugins] ', el ,': ', res])
#-------------------------------------------------------------------------------
# Gets the whole setting object
def get_plugin_setting(plugin, function_key):
result = None
for set in plugin['settings']:
if set["function"] == function_key:
result = set
if result == None:
mylog('debug', ['[Plugins] Setting with "function":"', function_key, '" is missing in plugin: ', get_plugin_string(plugin, 'display_name')])
return result
#-------------------------------------------------------------------------------
# Get localized string value on the top JSON depth, not recursive
def get_plugin_string(props, el):
result = ''
if el in props['localized']:
for val in props[el]:
if val['language_code'] == 'en_us':
result = val['string']
if result == '':
result = 'en_us string missing'
else:
result = props[el]
return result
#-------------------------------------------------------------------------------
def flatten_array(arr, encodeBase64=False):
tmp = ''
arrayItemStr = ''
mylog('debug', '[Plugins] Flattening the below array')
mylog('debug', f'[Plugins] Convert to Base64: {encodeBase64}')
mylog('debug', arr)
for arrayItem in arr:
# only one column flattening is supported
if isinstance(arrayItem, list):
arrayItemStr = str(arrayItem[0]).replace("'", '') # removing single quotes - not allowed
else:
# is string already
arrayItemStr = arrayItem
tmp += f'{arrayItemStr},'
tmp = tmp[:-1] # Remove last comma ','
mylog('debug', f'[Plugins] Flattened array: {tmp}')
if encodeBase64:
tmp = str(base64.b64encode(tmp.encode('ascii')))
mylog('debug', f'[Plugins] Flattened array (base64): {tmp}')
return tmp
#-------------------------------------------------------------------------------
# Combine plugin objects, keep user-defined values, created time, changed time if nothing changed and the index
def combine_plugin_objects(old, new):
new.userData = old.userData
new.index = old.index
new.created = old.created
# Keep changed time if nothing changed
if new.status in ['watched-not-changed']:
new.changed = old.changed
# return the new object, with some of the old values
return new
#-------------------------------------------------------------------------------
# Replace {wildcars} with parameters
def resolve_wildcards_arr(commandArr, params):
mylog('debug', ['[Plugins] Pre-Resolved CMD: '] + commandArr)
for param in params:
# mylog('debug', ['[Plugins] key : {', param[0], '}'])
# mylog('debug', ['[Plugins] resolved: ', param[1]])
i = 0
for comPart in commandArr:
commandArr[i] = comPart.replace('{' + param[0] + '}', param[1]).replace('{s-quote}',"'")
i += 1
return commandArr
#-------------------------------------------------------------------------------
def get_plugins_configs():
pluginsList = [] # Create an empty list to store plugin configurations
# Get a list of top-level directories in the specified pluginsPath
dirs = next(os.walk(pluginsPath))[1]
# Loop through each directory (plugin folder) in dirs
for d in dirs:
# Check if the directory name does not start with "__" and does not end with "__ignore"
if not d.startswith("__") and not d.endswith("__ignore"):
# Construct the path to the config.json file within the plugin folder
config_path = os.path.join(pluginsPath, d, "config.json")
# Load the contents of the config.json file as a JSON object and append it to pluginsList
pluginsList.append(json.loads(get_file_content(config_path)))
return pluginsList # Return the list of plugin configurations