import os import sqlite3 import json import subprocess import datetime import base64 from collections import namedtuple # pialert modules import conf from const import pluginsPath, logPath from logger import mylog from helper import timeNowTZ, updateState, get_file_content, write_file, get_setting, get_setting_value from api import update_api #------------------------------------------------------------------------------- class plugins_state: def __init__(self, processScan = False): self.processScan = processScan #------------------------------------------------------------------------------- def run_plugin_scripts(db, runType, pluginsState = None): if pluginsState == None: mylog('debug', ['[Plugins] pluginsState initialized ']) pluginsState = plugins_state() # Header updateState(db,"Run: Plugins") mylog('debug', ['[Plugins] Check if any plugins need to be executed on run type: ', runType]) for plugin in conf.plugins: shouldRun = False prefix = plugin["unique_prefix"] set = get_plugin_setting(plugin, "RUN") if set != None and set['value'] == runType: if runType != "schedule": shouldRun = True elif runType == "schedule": # run if overdue scheduled time # check schedules if any contains a unique plugin prefix matching the current plugin for schd in conf.mySchedules: if schd.service == prefix: # Check if schedule overdue shouldRun = schd.runScheduleCheck() if shouldRun: # Header updateState(db,f"Plugins: {prefix}") print_plugin_info(plugin, ['display_name']) mylog('debug', ['[Plugins] CMD: ', get_plugin_setting(plugin, "CMD")["value"]]) pluginsState = execute_plugin(db, plugin, pluginsState) # update last run time if runType == "schedule": for schd in conf.mySchedules: if schd.service == prefix: # note the last time the scheduled plugin run was executed schd.last_run = timeNowTZ() return pluginsState #------------------------------------------------------------------------------- def get_plugins_configs(): pluginsList = [] # Create an empty list to store plugin configurations # Get a list of top-level directories in the specified pluginsPath dirs = next(os.walk(pluginsPath))[1] # Loop through each directory (plugin folder) in dirs for d in dirs: # Check if the directory name does not start with "__" and does not end with "__ignore" if not d.startswith("__") and not d.endswith("__ignore"): # Construct the path to the config.json file within the plugin folder config_path = os.path.join(pluginsPath, d, "config.json") # Load the contents of the config.json file as a JSON object and append it to pluginsList pluginsList.append(json.loads(get_file_content(config_path))) return pluginsList # Return the list of plugin configurations #------------------------------------------------------------------------------- def print_plugin_info(plugin, elements = ['display_name']): mylog('verbose', ['[Plugins] ---------------------------------------------']) for el in elements: res = get_plugin_string(plugin, el) mylog('verbose', ['[Plugins] ', el ,': ', res]) #------------------------------------------------------------------------------- # Gets the whole setting object def get_plugin_setting(plugin, function_key): result = None for set in plugin['settings']: if set["function"] == function_key: result = set if result == None: mylog('debug', ['[Plugins] Setting with "function":"', function_key, '" is missing in plugin: ', get_plugin_string(plugin, 'display_name')]) return result #------------------------------------------------------------------------------- # Get localized string value on the top JSON depth, not recursive def get_plugin_string(props, el): result = '' if el in props['localized']: for val in props[el]: if val['language_code'] == 'en_us': result = val['string'] if result == '': result = 'en_us string missing' else: result = props[el] return result #------------------------------------------------------------------------------- # Executes the plugin command specified in the setting with the function specified as CMD def execute_plugin(db, plugin, pluginsState = plugins_state() ): sql = db.sql # ------- necessary settings check -------- set = get_plugin_setting(plugin, "CMD") # handle missing "function":"CMD" setting if set == None: return set_CMD = set["value"] set = get_plugin_setting(plugin, "RUN_TIMEOUT") # handle missing "function":"_TIMEOUT" setting if set == None: set_RUN_TIMEOUT = 10 else: set_RUN_TIMEOUT = set["value"] mylog('debug', ['[Plugins] Timeout: ', set_RUN_TIMEOUT]) # Prepare custom params params = [] if "params" in plugin: for param in plugin["params"]: resolved = "" # Get setting value if param["type"] == "setting": resolved = get_setting(param["value"]) if resolved != None: resolved = passable_string_from_setting(resolved) # Get Sql result if param["type"] == "sql": resolved = flatten_array(db.get_sql_array(param["value"])) if resolved == None: mylog('none', [f'[Plugins] The parameter "name":"{param["name"]}" for "value": {param["value"]} was resolved as None']) else: params.append( [param["name"], resolved] ) # build SQL query parameters to insert into the DB sqlParams = [] # script if plugin['data_source'] == 'script': # ------- prepare params -------- # prepare command from plugin settings, custom parameters command = resolve_wildcards_arr(set_CMD.split(), params) # Execute command mylog('verbose', ['[Plugins] Executing: ', set_CMD]) mylog('debug', ['[Plugins] Resolved : ', command]) try: # try runnning a subprocess with a forced timeout in case the subprocess hangs output = subprocess.check_output (command, universal_newlines=True, stderr=subprocess.STDOUT, timeout=(set_RUN_TIMEOUT)) except subprocess.CalledProcessError as e: # An error occured, handle it mylog('none', [e.output]) mylog('none', ['[Plugins] Error - enable LOG_LEVEL=debug and check logs']) except subprocess.TimeoutExpired as timeErr: mylog('none', ['[Plugins] TIMEOUT - the process forcefully terminated as timeout reached']) # check the last run output # Initialize newLines newLines = [] # Create the file path file_path = os.path.join(pluginsPath, plugin["code_name"], 'last_result.log') # Check if the file exists if os.path.exists(file_path): # File exists, open it and read its contents with open(file_path, 'r+') as f: newLines = f.read().split('\n') # if the script produced some outpout, clean it up to ensure it's the correct format # cleanup - select only lines containing a separator to filter out unnecessary data newLines = list(filter(lambda x: '|' in x, newLines)) for line in newLines: columns = line.split("|") # There has to be always 9 columns if len(columns) == 9: # Construct a tuple of values to be inserted into the database table. sqlParams.append( ( plugin["unique_prefix"], # "Plugin" column value from the plugin dictionary columns[0], # "Object_PrimaryID" value from columns list columns[1], # "Object_SecondaryID" value from columns list 'null', # Placeholder for "DateTimeCreated" column columns[2], # "DateTimeChanged" value from columns list columns[3], # "Watched_Value1" value from columns list columns[4], # "Watched_Value2" value from columns list columns[5], # "Watched_Value3" value from columns list columns[6], # "Watched_Value4" value from columns list 0, # Placeholder for "Status" column columns[7], # "Extra" value from columns list 'null', # Placeholder for "UserData" column columns[8] # "ForeignKey" value from columns list ) ) else: mylog('none', ['[Plugins] Skipped invalid line in the output: ', line]) else: mylog('debug', [f'[Plugins] The file {file_path} does not exist']) # pialert-db-query if plugin['data_source'] == 'pialert-db-query': # replace single quotes wildcards q = set_CMD.replace("{s-quote}", '\'') # Execute command mylog('verbose', ['[Plugins] Executing: ', q]) # set_CMD should contain a SQL query arr = db.get_sql_array (q) for row in arr: # There has to be always 9 columns if len(row) == 9 and (row[0] in ['','null']) == False : # Construct a tuple of values to be inserted into the database table. sqlParams.append( ( plugin["unique_prefix"], # "Plugin" plugin dictionary row[0], # "Object_PrimaryID" row handle_empty(row[1]), # "Object_SecondaryID" column after handling empty values 'null', # Placeholder "DateTimeCreated" column row[2], # "DateTimeChanged" row row[3], # "Watched_Value1" row row[4], # "Watched_Value2" row handle_empty(row[5]), # "Watched_Value3" column after handling empty values handle_empty(row[6]), # "Watched_Value4" column after handling empty values 0, # Placeholder "Status" column row[7], # "Extra" row 'null', # Placeholder "UserData" column row[8] # "ForeignKey" row ) ) else: mylog('none', ['[Plugins] Skipped invalid sql result']) # pialert-db-query if plugin['data_source'] == 'sqlite-db-query': # replace single quotes wildcards # set_CMD should contain a SQL query q = set_CMD.replace("{s-quote}", '\'') # Execute command mylog('verbose', ['[Plugins] Executing: ', q]) # ------- necessary settings check -------- set = get_plugin_setting(plugin, "DB_PATH") # handle missing "function":"DB_PATH" setting if set == None: mylog('none', ['[Plugins] Error: DB_PATH setting for plugin type sqlite-db-query missing.']) return fullSqlitePath = set["value"] # try attaching the sqlite DB try: sql.execute ("ATTACH DATABASE '"+ fullSqlitePath +"' AS EXTERNAL") except sqlite3.Error as e: mylog('none',[ '[Plugin] - ATTACH DATABASE failed with SQL ERROR: ', e]) arr = db.get_sql_array (q) for row in arr: # There has to be always 9 columns if len(row) == 9 and (row[0] in ['','null']) == False : # Create a tuple containing values to be inserted into the database. # Each value corresponds to a column in the table in the order of the columns. sqlParams.append( (plugin["unique_prefix"], # "Plugin" row[0], # "Object_PrimaryID" handle_empty(row[1]), # "Object_SecondaryID" 'null', # "DateTimeCreated" column (null placeholder) row[2], # "DateTimeChanged" row[3], # "Watched_Value1" row[4], # "Watched_Value2" handle_empty(row[5]), # "Watched_Value3" handle_empty(row[6]), # "Watched_Value4" 0, # "Status" column (placeholder) row[7], # "Extra" 'null', # "UserData" column (null placeholder) row[8])) # "ForeignKey" else: mylog('none', ['[Plugins] Skipped invalid sql result']) # check if the subprocess / SQL query failed / there was no valid output if len(sqlParams) == 0: mylog('none', ['[Plugins] No output received from the plugin ', plugin["unique_prefix"], ' - enable LOG_LEVEL=debug and check logs']) return else: mylog('verbose', ['[Plugins] SUCCESS, received ', len(sqlParams), ' entries']) # process results if any if len(sqlParams) > 0: sql.executemany ("""INSERT INTO Plugins_Events ("Plugin", "Object_PrimaryID", "Object_SecondaryID", "DateTimeCreated", "DateTimeChanged", "Watched_Value1", "Watched_Value2", "Watched_Value3", "Watched_Value4", "Status" ,"Extra", "UserData", "ForeignKey") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", sqlParams) db.commitDB() try: sql.executemany("""INSERT INTO Plugins_History ("Plugin", "Object_PrimaryID", "Object_SecondaryID", "DateTimeCreated", "DateTimeChanged", "Watched_Value1", "Watched_Value2", "Watched_Value3", "Watched_Value4", "Status" ,"Extra", "UserData", "ForeignKey") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", sqlParams) db.commitDB() except sqlite3.Error as e: db.rollbackDB() # Rollback changes in case of an error mylog('none', ['[Plugins] ERROR inserting into Plugins_History:', e]) # create objects pluginsState = process_plugin_events(db, plugin, pluginsState) # update API endpoints update_api(db, False, ["plugins_events","plugins_objects"]) return pluginsState #------------------------------------------------------------------------------- def custom_plugin_decoder(pluginDict): return namedtuple('X', pluginDict.keys())(*pluginDict.values()) #------------------------------------------------------------------------------- # Handle empty value def handle_empty(value): if value == '' or value is None: value = 'null' return value #------------------------------------------------------------------------------- # Flattens a setting to make it passable to a script def passable_string_from_setting(globalSetting): setVal = globalSetting[6] # setting value setTyp = globalSetting[3] # setting type noConversion = ['text', 'string', 'integer', 'boolean', 'password', 'readonly', 'integer.select', 'text.select', 'integer.checkbox' ] arrayConversion = ['text.multiselect', 'list'] arrayConversionBase64 = ['subnets'] jsonConversion = ['.template'] mylog('debug', f'[Plugins] setTyp: {setTyp}') if setTyp in noConversion: return setVal if setTyp in arrayConversion: return flatten_array(setVal) if setTyp in arrayConversionBase64: return flatten_array(setVal, encodeBase64 = True) for item in jsonConversion: if setTyp.endswith(item): return json.dumps(setVal) mylog('none', ['[Plugins] ERROR: Parameter not converted.']) #------------------------------------------------------------------------------- # Gets the setting value def get_plugin_setting_value(plugin, function_key): resultObj = get_plugin_setting(plugin, function_key) if resultObj != None: return resultObj["value"] return None #------------------------------------------------------------------------------- def flatten_array(arr, encodeBase64=False): tmp = '' arrayItemStr = '' mylog('debug', '[Plugins] Flattening the below array') mylog('debug', f'[Plugins] Convert to Base64: {encodeBase64}') mylog('debug', arr) for arrayItem in arr: # only one column flattening is supported if isinstance(arrayItem, list): arrayItemStr = str(arrayItem[0]).replace("'", '') # removing single quotes - not allowed else: # is string already arrayItemStr = arrayItem tmp += f'{arrayItemStr},' tmp = tmp[:-1] # Remove last comma ',' mylog('debug', f'[Plugins] Flattened array: {tmp}') if encodeBase64: tmp = str(base64.b64encode(tmp.encode('ascii'))) mylog('debug', f'[Plugins] Flattened array (base64): {tmp}') return tmp #------------------------------------------------------------------------------- # Replace {wildcars} with parameters def resolve_wildcards_arr(commandArr, params): mylog('debug', ['[Plugins] Pre-Resolved CMD: '] + commandArr) for param in params: # mylog('debug', ['[Plugins] key : {', param[0], '}']) # mylog('debug', ['[Plugins] resolved: ', param[1]]) i = 0 for comPart in commandArr: commandArr[i] = comPart.replace('{' + param[0] + '}', param[1]).replace('{s-quote}',"'") i += 1 return commandArr #------------------------------------------------------------------------------- # Combine plugin objects, keep user-defined values, created time, changed time if nothing changed and the index def combine_plugin_objects(old, new): new.userData = old.userData new.index = old.index new.created = old.created # Keep changed time if nothing changed if new.status in ['watched-not-changed']: new.changed = old.changed # return the new object, with some of the old values return new #------------------------------------------------------------------------------- # Check if watched values changed for the given plugin def process_plugin_events(db, plugin, pluginsState): sql = db.sql # Access the connection from the DB instance conn = db.sql_connection pluginPref = plugin["unique_prefix"] mylog('debug', ['[Plugins] Processing : ', pluginPref]) try: # Begin a transaction with conn: plugObjectsArr = db.get_sql_array ("SELECT * FROM Plugins_Objects where Plugin = '" + str(pluginPref)+"'") plugEventsArr = db.get_sql_array ("SELECT * FROM Plugins_Events where Plugin = '" + str(pluginPref)+"'") pluginObjects = [] pluginEvents = [] for obj in plugObjectsArr: pluginObjects.append(plugin_object_class(plugin, obj)) existingPluginObjectsCount = len(pluginObjects) mylog('debug', ['[Plugins] Existing objects : ', existingPluginObjectsCount]) mylog('debug', ['[Plugins] New and existing events : ', len(plugEventsArr)]) # set status as new - will be changed later if conditions are fulfilled, e.g. entry found for eve in plugEventsArr: tmpObject = plugin_object_class(plugin, eve) tmpObject.status = "new" pluginEvents.append(tmpObject) # Update the status to "exists" index = 0 for tmpObjFromEvent in pluginEvents: # compare hash of the IDs for uniqueness if any(x.idsHash == tmpObject.idsHash for x in pluginObjects): # mylog('debug', ['[Plugins] Found existing object']) pluginEvents[index].status = "exists" index += 1 # Loop thru events and update the one that exist to determine if watched columns changed index = 0 for tmpObjFromEvent in pluginEvents: if tmpObjFromEvent.status == "exists": # compare hash of the changed watched columns for uniqueness if any(x.watchedHash != tmpObject.watchedHash for x in pluginObjects): pluginEvents[index].status = "watched-changed" else: pluginEvents[index].status = "watched-not-changed" index += 1 # Merge existing plugin objects with newly discovered ones and update existing ones with new values for eveObj in pluginEvents: if eveObj.status == 'new': pluginObjects.append(eveObj) else: index = 0 for plugObj in pluginObjects: # find corresponding object for the event and merge if plugObj.idsHash == eveObj.idsHash: pluginObjects[index] = combine_plugin_objects(plugObj, eveObj) index += 1 # Update the DB # ---------------------------- # Update the Plugin_Objects # Create lists to hold the data for bulk insertion objects_to_insert = [] events_to_insert = [] for plugObj in pluginObjects: createdTime = plugObj.changed if plugObj.status == 'new' else plugObj.created values = ( plugObj.pluginPref, plugObj.primaryId, plugObj.secondaryId, createdTime, plugObj.changed, plugObj.watched1, plugObj.watched2, plugObj.watched3, plugObj.watched4, plugObj.status, plugObj.extra, plugObj.userData, plugObj.foreignKey ) if plugObj.status == 'new': objects_to_insert.append(values) else: objects_to_insert.append(values + (plugObj.index,)) # Include index for UPDATE if plugObj.status == 'new': events_to_insert.append(values) elif plugObj.status in get_plugin_setting_value(plugin, "REPORT_ON"): events_to_insert.append(values) # Bulk insert/update objects if objects_to_insert: sql.executemany( """ INSERT INTO Plugins_Objects ("Plugin", "Object_PrimaryID", "Object_SecondaryID", "DateTimeCreated", "DateTimeChanged", "Watched_Value1", "Watched_Value2", "Watched_Value3", "Watched_Value4", "Status", "Extra", "UserData", "ForeignKey", "Index") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT("Index") DO UPDATE SET "Plugin" = excluded.Plugin, "DateTimeChanged" = excluded.DateTimeChanged, "Watched_Value1" = excluded.Watched_Value1, "Watched_Value2" = excluded.Watched_Value2, "Watched_Value3" = excluded.Watched_Value3, "Watched_Value4" = excluded.Watched_Value4, "Status" = excluded.Status, "Extra" = excluded.Extra, "ForeignKey" = excluded.ForeignKey """, objects_to_insert ) # Bulk insert events (insert only events if they are to be reported on) if events_to_insert: sql.executemany( """ INSERT INTO Plugins_Events ("Plugin", "Object_PrimaryID", "Object_SecondaryID", "DateTimeCreated", "DateTimeChanged", "Watched_Value1", "Watched_Value2", "Watched_Value3", "Watched_Value4", "Status", "Extra", "UserData", "ForeignKey") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, events_to_insert ) # Commit changes to the database db.commitDB() except Exception as e: # Rollback the transaction in case of an error conn.rollback() mylog('none', ['[Plugins] SQL transaction error: ', e]) raise e # Perform database table mapping if enabled for the plugin if len(pluginEvents) > 0 and "mapped_to_table" in plugin: # Initialize an empty list to store SQL parameters. sqlParams = [] # Get the database table name from the 'mapped_to_table' key in the 'plugin' dictionary. dbTable = plugin['mapped_to_table'] # Log a debug message indicating the mapping of objects to the database table. mylog('debug', ['[Plugins] Mapping objects to database table: ', dbTable]) # Initialize lists to hold mapped column names, columnsStr, and valuesStr for SQL query. mappedCols = [] columnsStr = '' valuesStr = '' # Loop through the 'database_column_definitions' in the 'plugin' dictionary to collect mapped columns. # Build the columnsStr and valuesStr for the SQL query. for clmn in plugin['database_column_definitions']: if 'mapped_to_column' in clmn: mappedCols.append(clmn) columnsStr = f'{columnsStr}, "{clmn["mapped_to_column"]}"' valuesStr = f'{valuesStr}, ?' # Remove the first ',' from columnsStr and valuesStr. if len(columnsStr) > 0: columnsStr = columnsStr[1:] valuesStr = valuesStr[1:] # Map the column names to plugin object event values and create a list of tuples 'sqlParams'. for plgEv in pluginEvents: tmpList = [] for col in mappedCols: if col['column'] == 'Index': tmpList.append(plgEv.index) elif col['column'] == 'Plugin': tmpList.append(plgEv.pluginPref) elif col['column'] == 'Object_PrimaryID': tmpList.append(plgEv.primaryId) elif col['column'] == 'Object_SecondaryID': tmpList.append(plgEv.secondaryId) elif col['column'] == 'DateTimeCreated': tmpList.append(plgEv.created) elif col['column'] == 'DateTimeChanged': tmpList.append(plgEv.changed) elif col['column'] == 'Watched_Value1': tmpList.append(plgEv.watched1) elif col['column'] == 'Watched_Value2': tmpList.append(plgEv.watched2) elif col['column'] == 'Watched_Value3': tmpList.append(plgEv.watched3) elif col['column'] == 'Watched_Value4': tmpList.append(plgEv.watched4) elif col['column'] == 'UserData': tmpList.append(plgEv.userData) elif col['column'] == 'Extra': tmpList.append(plgEv.extra) elif col['column'] == 'Status': tmpList.append(plgEv.status) # Append the mapped values to the list 'sqlParams' as a tuple. sqlParams.append(tuple(tmpList)) # Generate the SQL INSERT query using the collected information. q = f'INSERT into {dbTable} ({columnsStr}) VALUES ({valuesStr})' # Log a debug message showing the generated SQL query for mapping. mylog('debug', ['[Plugins] SQL query for mapping: ', q]) # Execute the SQL query using 'sql.executemany()' and the 'sqlParams' list of tuples. # This will insert multiple rows into the database in one go. sql.executemany(q, sqlParams) db.commitDB() # perform scan if mapped to CurrentScan table if dbTable == 'CurrentScan': pluginsState.processScan = True db.commitDB() return pluginsState #------------------------------------------------------------------------------- class plugin_object_class: def __init__(self, plugin, objDbRow): self.index = objDbRow[0] self.pluginPref = objDbRow[1] self.primaryId = objDbRow[2] self.secondaryId = objDbRow[3] self.created = objDbRow[4] self.changed = objDbRow[5] self.watched1 = objDbRow[6] self.watched2 = objDbRow[7] self.watched3 = objDbRow[8] self.watched4 = objDbRow[9] self.status = objDbRow[10] self.extra = objDbRow[11] self.userData = objDbRow[12] self.foreignKey = objDbRow[13] # self.idsHash = str(hash(str(self.primaryId) + str(self.secondaryId))) self.idsHash = str(self.primaryId) + str(self.secondaryId) self.watchedClmns = [] self.watchedIndxs = [] setObj = get_plugin_setting(plugin, 'WATCH') indexNameColumnMapping = [(6, 'Watched_Value1' ), (7, 'Watched_Value2' ), (8, 'Watched_Value3' ), (9, 'Watched_Value4' )] if setObj is not None: self.watchedClmns = setObj["value"] for clmName in self.watchedClmns: for mapping in indexNameColumnMapping: if clmName == indexNameColumnMapping[1]: self.watchedIndxs.append(indexNameColumnMapping[0]) tmp = '' for indx in self.watchedIndxs: tmp += str(objDbRow[indx]) self.watchedHash = str(hash(tmp))