From 78f71abd315f05dca8dcaf03e7850b2ac7848ee2 Mon Sep 17 00:00:00 2001 From: Data-Monkey Date: Sun, 21 May 2023 15:03:16 +1000 Subject: [PATCH] moved database out --- pialert/database.py | 352 +++++++++++++++++++++++++++++++ pialert/helper.py | 2 +- pialert/pialert.py | 502 ++++++++------------------------------------ 3 files changed, 440 insertions(+), 416 deletions(-) create mode 100644 pialert/database.py diff --git a/pialert/database.py b/pialert/database.py new file mode 100644 index 00000000..ccf3db9a --- /dev/null +++ b/pialert/database.py @@ -0,0 +1,352 @@ +""" all things database to support Pi.Alert """ + +import sqlite3 + +from const import fullDbPath +from logger import print_log, mylog + +#=============================================================================== +# SQL queries +#=============================================================================== +sql_devices_all = "select dev_MAC, dev_Name, dev_DeviceType, dev_Vendor, dev_Group, dev_FirstConnection, dev_LastConnection, dev_LastIP, dev_StaticIP, dev_PresentLastScan, dev_LastNotification, dev_NewDevice, dev_Network_Node_MAC_ADDR, dev_Network_Node_port, dev_Icon from Devices" +sql_devices_stats = "SELECT Online_Devices as online, Down_Devices as down, All_Devices as 'all', Archived_Devices as archived, (select count(*) from Devices a where dev_NewDevice = 1 ) as new, (select count(*) from Devices a where dev_Name = '(unknown)' or dev_Name = '(name not found)' ) as unknown from Online_History order by Scan_Date desc limit 1" +sql_nmap_scan_all = "SELECT * FROM Nmap_Scan" +sql_pholus_scan_all = "SELECT * FROM Pholus_Scan" +sql_events_pending_alert = "SELECT * FROM Events where eve_PendingAlertEmail is not 0" +sql_settings = "SELECT * FROM Settings" +sql_plugins_objects = "SELECT * FROM Plugins_Objects" +sql_language_strings = "SELECT * FROM Plugins_Language_Strings" +sql_plugins_events = "SELECT * FROM Plugins_Events" +sql_plugins_history = "SELECT * FROM Plugins_History ORDER BY 'Index' DESC" +sql_new_devices = """SELECT * FROM ( SELECT eve_IP as dev_LastIP, eve_MAC as dev_MAC FROM Events_Devices + WHERE eve_PendingAlertEmail = 1 + AND eve_EventType = 'New Device' + ORDER BY eve_DateTime ) t1 + LEFT JOIN + ( + SELECT dev_Name, dev_MAC as dev_MAC_t2 FROM Devices + ) t2 + ON t1.dev_MAC = t2.dev_MAC_t2""" + + +class DB(): + + def __init__(self): + self.sql = None + self.sql_connection = None + + def openDB (self): + # Check if DB is open + if self.sql_connection != None : + mylog('debug','openDB: databse already open') + return + + mylog('none', 'Opening DB' ) + # Open DB and Cursor + self.sql_connection = sqlite3.connect (fullDbPath, isolation_level=None) + self.sql_connection.execute('pragma journal_mode=wal') # + self.sql_connection.text_factory = str + self.sql_connection.row_factory = sqlite3.Row + self.sql = self.sql_connection.cursor() + + + #------------------------------------------------------------------------------- + def commitDB (self): + if self.sql_connection == None : + mylog('debug','commitDB: databse is not open') + return + mylog('debug','commitDB: comiting DB changes') + + # Commit changes to DB + self.sql_connection.commit() + + #------------------------------------------------------------------------------- + def get_sql_array(self, query): + if self.sql_connection == None : + mylog('debug','getQueryArray: databse is not open') + return + + self.sql.execute(query) + rows = self.sql.fetchall() + self.commitDB() + + # convert result into list of lists + arr = [] + for row in rows: + r_temp = [] + for column in row: + r_temp.append(column) + arr.append(r_temp) + + return arr + + + + + + + +#------------------------------------------------------------------------------- +def initOrSetParam(db, parID, parValue): + sql_connection = db.sql_connection + sql = db.sql + + sql.execute ("INSERT INTO Parameters(par_ID, par_Value) VALUES('"+str(parID)+"', '"+str(parValue)+"') ON CONFLICT(par_ID) DO UPDATE SET par_Value='"+str(parValue)+"' where par_ID='"+str(parID)+"'") + + db.commitDB() + +#------------------------------------------------------------------------------- +def updateState(db, newState): + + db.sql.execute ("UPDATE Parameters SET par_Value='"+ newState +"' WHERE par_ID='Back_App_State'") + + db.commitDB() + + + +#------------------------------------------------------------------------------- +def upgradeDB(db: DB()): + sql = db.sql #TO-DO + + # indicates, if Online_History table is available + onlineHistoryAvailable = db.sql.execute(""" + SELECT name FROM sqlite_master WHERE type='table' + AND name='Online_History'; + """).fetchall() != [] + + # Check if it is incompatible (Check if table has all required columns) + isIncompatible = False + + if onlineHistoryAvailable : + isIncompatible = sql.execute (""" + SELECT COUNT(*) AS CNTREC FROM pragma_table_info('Online_History') WHERE name='Archived_Devices' + """).fetchone()[0] == 0 + + # Drop table if available, but incompatible + if onlineHistoryAvailable and isIncompatible: + mylog('none','[upgradeDB] Table is incompatible, Dropping the Online_History table') + sql.execute("DROP TABLE Online_History;") + onlineHistoryAvailable = False + + if onlineHistoryAvailable == False : + sql.execute(""" + CREATE TABLE "Online_History" ( + "Index" INTEGER, + "Scan_Date" TEXT, + "Online_Devices" INTEGER, + "Down_Devices" INTEGER, + "All_Devices" INTEGER, + "Archived_Devices" INTEGER, + PRIMARY KEY("Index" AUTOINCREMENT) + ); + """) + + # Alter Devices table + # dev_Network_Node_MAC_ADDR column + dev_Network_Node_MAC_ADDR_missing = sql.execute (""" + SELECT COUNT(*) AS CNTREC FROM pragma_table_info('Devices') WHERE name='dev_Network_Node_MAC_ADDR' + """).fetchone()[0] == 0 + + if dev_Network_Node_MAC_ADDR_missing : + mylog('verbose', ["[upgradeDB] Adding dev_Network_Node_MAC_ADDR to the Devices table"]) + sql.execute(""" + ALTER TABLE "Devices" ADD "dev_Network_Node_MAC_ADDR" TEXT + """) + + # dev_Network_Node_port column + dev_Network_Node_port_missing = sql.execute (""" + SELECT COUNT(*) AS CNTREC FROM pragma_table_info('Devices') WHERE name='dev_Network_Node_port' + """).fetchone()[0] == 0 + + if dev_Network_Node_port_missing : + mylog('verbose', ["[upgradeDB] Adding dev_Network_Node_port to the Devices table"]) + sql.execute(""" + ALTER TABLE "Devices" ADD "dev_Network_Node_port" INTEGER + """) + + # dev_Icon column + dev_Icon_missing = sql.execute (""" + SELECT COUNT(*) AS CNTREC FROM pragma_table_info('Devices') WHERE name='dev_Icon' + """).fetchone()[0] == 0 + + if dev_Icon_missing : + mylog('verbose', ["[upgradeDB] Adding dev_Icon to the Devices table"]) + sql.execute(""" + ALTER TABLE "Devices" ADD "dev_Icon" TEXT + """) + + # indicates, if Settings table is available + settingsMissing = sql.execute(""" + SELECT name FROM sqlite_master WHERE type='table' + AND name='Settings'; + """).fetchone() == None + + # Re-creating Settings table + mylog('verbose', ["[upgradeDB] Re-creating Settings table"]) + + if settingsMissing == False: + sql.execute("DROP TABLE Settings;") + + sql.execute(""" + CREATE TABLE "Settings" ( + "Code_Name" TEXT, + "Display_Name" TEXT, + "Description" TEXT, + "Type" TEXT, + "Options" TEXT, + "RegEx" TEXT, + "Value" TEXT, + "Group" TEXT, + "Events" TEXT + ); + """) + + # indicates, if Pholus_Scan table is available + pholusScanMissing = sql.execute(""" + SELECT name FROM sqlite_master WHERE type='table' + AND name='Pholus_Scan'; + """).fetchone() == None + + # if pholusScanMissing == False: + # # Re-creating Pholus_Scan table + # sql.execute("DROP TABLE Pholus_Scan;") + # pholusScanMissing = True + + if pholusScanMissing: + mylog('verbose', ["[upgradeDB] Re-creating Pholus_Scan table"]) + sql.execute(""" + CREATE TABLE "Pholus_Scan" ( + "Index" INTEGER, + "Info" TEXT, + "Time" TEXT, + "MAC" TEXT, + "IP_v4_or_v6" TEXT, + "Record_Type" TEXT, + "Value" TEXT, + "Extra" TEXT, + PRIMARY KEY("Index" AUTOINCREMENT) + ); + """) + + # indicates, if Nmap_Scan table is available + nmapScanMissing = sql.execute(""" + SELECT name FROM sqlite_master WHERE type='table' + AND name='Nmap_Scan'; + """).fetchone() == None + + # Re-creating Parameters table + mylog('verbose', ["[upgradeDB] Re-creating Parameters table"]) + sql.execute("DROP TABLE Parameters;") + + sql.execute(""" + CREATE TABLE "Parameters" ( + "par_ID" TEXT PRIMARY KEY, + "par_Value" TEXT + ); + """) + + # Initialize Parameters if unavailable + initOrSetParam(db, 'Back_App_State','Initializing') + + # if nmapScanMissing == False: + # # Re-creating Nmap_Scan table + # sql.execute("DROP TABLE Nmap_Scan;") + # nmapScanMissing = True + + if nmapScanMissing: + mylog('verbose', ["[upgradeDB] Re-creating Nmap_Scan table"]) + sql.execute(""" + CREATE TABLE "Nmap_Scan" ( + "Index" INTEGER, + "MAC" TEXT, + "Port" TEXT, + "Time" TEXT, + "State" TEXT, + "Service" TEXT, + "Extra" TEXT, + PRIMARY KEY("Index" AUTOINCREMENT) + ); + """) + + # Plugin state + sql_Plugins_Objects = """ CREATE TABLE IF NOT EXISTS Plugins_Objects( + "Index" INTEGER, + Plugin TEXT NOT NULL, + Object_PrimaryID TEXT NOT NULL, + Object_SecondaryID TEXT NOT NULL, + DateTimeCreated TEXT NOT NULL, + DateTimeChanged TEXT NOT NULL, + Watched_Value1 TEXT NOT NULL, + Watched_Value2 TEXT NOT NULL, + Watched_Value3 TEXT NOT NULL, + Watched_Value4 TEXT NOT NULL, + Status TEXT NOT NULL, + Extra TEXT NOT NULL, + UserData TEXT NOT NULL, + ForeignKey TEXT NOT NULL, + PRIMARY KEY("Index" AUTOINCREMENT) + ); """ + sql.execute(sql_Plugins_Objects) + + # Plugin execution results + sql_Plugins_Events = """ CREATE TABLE IF NOT EXISTS Plugins_Events( + "Index" INTEGER, + Plugin TEXT NOT NULL, + Object_PrimaryID TEXT NOT NULL, + Object_SecondaryID TEXT NOT NULL, + DateTimeCreated TEXT NOT NULL, + DateTimeChanged TEXT NOT NULL, + Watched_Value1 TEXT NOT NULL, + Watched_Value2 TEXT NOT NULL, + Watched_Value3 TEXT NOT NULL, + Watched_Value4 TEXT NOT NULL, + Status TEXT NOT NULL, + Extra TEXT NOT NULL, + UserData TEXT NOT NULL, + ForeignKey TEXT NOT NULL, + PRIMARY KEY("Index" AUTOINCREMENT) + ); """ + sql.execute(sql_Plugins_Events) + + # Plugin execution history + sql_Plugins_History = """ CREATE TABLE IF NOT EXISTS Plugins_History( + "Index" INTEGER, + Plugin TEXT NOT NULL, + Object_PrimaryID TEXT NOT NULL, + Object_SecondaryID TEXT NOT NULL, + DateTimeCreated TEXT NOT NULL, + DateTimeChanged TEXT NOT NULL, + Watched_Value1 TEXT NOT NULL, + Watched_Value2 TEXT NOT NULL, + Watched_Value3 TEXT NOT NULL, + Watched_Value4 TEXT NOT NULL, + Status TEXT NOT NULL, + Extra TEXT NOT NULL, + UserData TEXT NOT NULL, + ForeignKey TEXT NOT NULL, + PRIMARY KEY("Index" AUTOINCREMENT) + ); """ + sql.execute(sql_Plugins_History) + + # Dynamically generated language strings + # indicates, if Language_Strings table is available + languageStringsMissing = sql.execute(""" + SELECT name FROM sqlite_master WHERE type='table' + AND name='Plugins_Language_Strings'; + """).fetchone() == None + + if languageStringsMissing == False: + sql.execute("DROP TABLE Plugins_Language_Strings;") + + sql.execute(""" CREATE TABLE IF NOT EXISTS Plugins_Language_Strings( + "Index" INTEGER, + Language_Code TEXT NOT NULL, + String_Key TEXT NOT NULL, + String_Value TEXT NOT NULL, + Extra TEXT NOT NULL, + PRIMARY KEY("Index" AUTOINCREMENT) + ); """) + + db.commitDB() + + diff --git a/pialert/helper.py b/pialert/helper.py index 3423be04..b92fdc57 100644 --- a/pialert/helper.py +++ b/pialert/helper.py @@ -29,7 +29,7 @@ def updateSubnets(SCAN_SUBNETS): #------------------------------------------------------------------------------- # check RW access of DB and config file def checkPermissionsOK(): - global confR_access, confW_access, dbR_access, dbW_access + #global confR_access, confW_access, dbR_access, dbW_access confR_access = (os.access(fullConfPath, os.R_OK)) confW_access = (os.access(fullConfPath, os.W_OK)) diff --git a/pialert/pialert.py b/pialert/pialert.py index d763833d..f6a80896 100755 --- a/pialert/pialert.py +++ b/pialert/pialert.py @@ -46,33 +46,7 @@ import multiprocessing from const import * from logger import mylog, print_log, logResult from helper import checkPermissionsOK, fixPermissions, timeNow, updateSubnets - - - -#=============================================================================== -# SQL queries -#=============================================================================== - -sql_devices_all = "select dev_MAC, dev_Name, dev_DeviceType, dev_Vendor, dev_Group, dev_FirstConnection, dev_LastConnection, dev_LastIP, dev_StaticIP, dev_PresentLastScan, dev_LastNotification, dev_NewDevice, dev_Network_Node_MAC_ADDR, dev_Network_Node_port, dev_Icon from Devices" -sql_devices_stats = "SELECT Online_Devices as online, Down_Devices as down, All_Devices as 'all', Archived_Devices as archived, (select count(*) from Devices a where dev_NewDevice = 1 ) as new, (select count(*) from Devices a where dev_Name = '(unknown)' or dev_Name = '(name not found)' ) as unknown from Online_History order by Scan_Date desc limit 1" -sql_nmap_scan_all = "SELECT * FROM Nmap_Scan" -sql_pholus_scan_all = "SELECT * FROM Pholus_Scan" -sql_events_pending_alert = "SELECT * FROM Events where eve_PendingAlertEmail is not 0" -sql_settings = "SELECT * FROM Settings" -sql_plugins_objects = "SELECT * FROM Plugins_Objects" -sql_language_strings = "SELECT * FROM Plugins_Language_Strings" -sql_plugins_events = "SELECT * FROM Plugins_Events" -sql_plugins_history = "SELECT * FROM Plugins_History ORDER BY 'Index' DESC" -sql_new_devices = """SELECT * FROM ( SELECT eve_IP as dev_LastIP, eve_MAC as dev_MAC FROM Events_Devices - WHERE eve_PendingAlertEmail = 1 - AND eve_EventType = 'New Device' - ORDER BY eve_DateTime ) t1 - LEFT JOIN - ( - SELECT dev_Name, dev_MAC as dev_MAC_t2 FROM Devices - ) t2 - ON t1.dev_MAC = t2.dev_MAC_t2""" - +from database import * # Global variables @@ -118,6 +92,7 @@ def initialiseFile(pathToCheck, defaultFile): # check and initialize pialert.conf (confR_access, dbR_access) = checkPermissionsOK() # Initial check + if confR_access == False: initialiseFile(fullConfPath, "/home/pi/pialert/back/pialert.conf_bak" ) @@ -134,38 +109,7 @@ fixPermissions() # We need access to the DB to save new values so need to define DB access methods first #------------------------------------------------------------------------------- -def openDB (): - global sql_connection - global sql - # Check if DB is open - if sql_connection != None : - return - - # Log - print_log ('Opening DB') # TO-DO should be changed to mylog - - # Open DB and Cursor - sql_connection = sqlite3.connect (fullDbPath, isolation_level=None) - sql_connection.execute('pragma journal_mode=wal') # - sql_connection.text_factory = str - sql_connection.row_factory = sqlite3.Row - sql = sql_connection.cursor() - -#------------------------------------------------------------------------------- -def commitDB (): - global sql_connection - global sql - - # Check if DB is open - if sql_connection == None : - return - - # Log - # print_log ('Commiting DB changes') - - # Commit changes to DB - sql_connection.commit() #------------------------------------------------------------------------------- # Import user values @@ -189,7 +133,7 @@ def ccd(key, default, config, name, inputtype, options, group, events=[], desc = #------------------------------------------------------------------------------- -def importConfigs (): +def importConfigs (db): # Specify globals so they can be overwritten with the new config global lastTimeImported, mySettings, mySettingsSQLsafe, plugins, plugins_once_run @@ -359,7 +303,7 @@ def importConfigs (): # if plugin["enabled"] == 'true': # collect plugin level language strings - collect_lang_strings(plugin, pref) + collect_lang_strings(db, plugin, pref) for set in plugin["settings"]: setFunction = set["function"] @@ -377,7 +321,7 @@ def importConfigs (): mySchedules.append(schedule_class(pref, newSchedule, newSchedule.next(), False)) # Collect settings related language strings - collect_lang_strings(set, pref + "_" + set["function"]) + collect_lang_strings(db, set, pref + "_" + set["function"]) plugins_once_run = False # ----------------- @@ -398,7 +342,8 @@ def importConfigs (): # Is used to display a message in the UI when old (outdated) settings are loaded initOrSetParam("Back_Settings_Imported",(round(time.time() * 1000),) ) - commitDB() + #commitDB(sql_connection) + db.commitDB() # update only the settings datasource update_api(False, ["settings"]) @@ -431,16 +376,19 @@ def main (): # second set of global variables global startTime, log_timestamp, sql_connection, sql, plugins_once_run - # DB - sql_connection = None - sql = None - # Open DB once and keep open # Opening / closing DB frequently actually casues more issues - openDB() # main + db = DB() + print(db, db.sql, db.sql_connection ) + db.openDB() + print(db, db.sql, db.sql_connection ) + + # To-Do replace the following to lines with the db dict or class + sql_connection = db.sql_connection + sql = db.sql # Upgrade DB if needed - upgradeDB() + upgradeDB(db) while True: @@ -448,7 +396,7 @@ def main (): time_started = datetime.datetime.now() # re-load user configuration and plugins - importConfigs() + importConfigs(db) # Handle plugins executed ONCE if ENABLE_PLUGINS and plugins_once_run == False: @@ -468,7 +416,7 @@ def main (): last_run = time_started # Header - updateState("Process: Start") + updateState(db,"Process: Start") mylog('verbose', ['[', timeNow(), '] Process: Start']) # Timestamp @@ -565,7 +513,7 @@ def main (): # Check if new devices found sql.execute (sql_new_devices) newDevices = sql.fetchall() - commitDB() + db.commitDB() # new devices were found if len(newDevices) > 0: @@ -587,7 +535,7 @@ def main (): cleanup_database() # Commit SQL - commitDB() + db.commitDB() # Final message if cycle != "": @@ -598,7 +546,7 @@ def main (): cycle = "" # Footer - updateState("Process: Wait") + updateState(db,"Process: Wait") mylog('verbose', ['[', timeNow(), '] Process: Wait']) else: # do something @@ -614,7 +562,7 @@ def main (): def check_internet_IP (): # Header - updateState("Scan: Internet IP") + updateState(sql_connection,"Scan: Internet IP") mylog('verbose', ['[', startTime, '] Check Internet IP:']) # Get Internet IP @@ -720,7 +668,7 @@ def set_dynamic_DNS_IP (): return curl_output #------------------------------------------------------------------------------- -def get_previous_internet_IP (): +def get_previous_internet_IP (db): previous_IP = '0.0.0.0' @@ -728,7 +676,7 @@ def get_previous_internet_IP (): sql.execute ("SELECT dev_LastIP FROM Devices WHERE dev_MAC = 'Internet' ") result = sql.fetchone() - commitDB() + db.commitDB() if result is not None and len(result) > 0 : previous_IP = result[0] @@ -737,7 +685,7 @@ def get_previous_internet_IP (): return previous_IP #------------------------------------------------------------------------------- -def save_new_internet_IP (pNewIP): +def save_new_internet_IP (db, pNewIP): # Log new IP into logfile append_line_to_file (logPath + '/IP_changes.log', '['+str(startTime) +']\t'+ pNewIP +'\n') @@ -757,7 +705,7 @@ def save_new_internet_IP (pNewIP): (pNewIP,) ) # commit changes - commitDB() + db.commitDB() #------------------------------------------------------------------------------- def check_IP_format (pIP): @@ -777,9 +725,9 @@ def check_IP_format (pIP): #=============================================================================== # Cleanup / upkeep database #=============================================================================== -def cleanup_database (): +def cleanup_database (db): # Header - updateState("Upkeep: Clean DB") + updateState(sql_connection,"Upkeep: Clean DB") mylog('verbose', ['[', startTime, '] Upkeep Database:' ]) # Cleanup Online History @@ -825,14 +773,14 @@ def cleanup_database (): mylog('verbose', [' Shrink Database']) sql.execute ("VACUUM;") - commitDB() + db.commitDB() #=============================================================================== # UPDATE DEVICE MAC VENDORS #=============================================================================== -def update_devices_MAC_vendors (pArg = ''): +def update_devices_MAC_vendors (db, pArg = ''): # Header - updateState("Upkeep: Vendors") + updateState(sql_connection,"Upkeep: Vendors") mylog('verbose', ['[', startTime, '] Upkeep - Update HW Vendors:' ]) # Update vendors DB (iab oui) @@ -878,7 +826,7 @@ def update_devices_MAC_vendors (pArg = ''): recordsToUpdate ) # Commit DB - commitDB() + db.commitDB() if len(recordsToUpdate) > 0: return True @@ -920,11 +868,11 @@ def query_MAC_vendor (pMAC): #=============================================================================== # SCAN NETWORK #=============================================================================== -def scan_network (): +def scan_network (db): reporting = False # Header - updateState("Scan: Network") + updateState(sql_connection,"Scan: Network") mylog('verbose', ['[', startTime, '] Scan Devices:' ]) # Query ScanCycle properties @@ -935,7 +883,7 @@ def scan_network (): mylog('none', [' Exiting...\n']) return False - commitDB() + db.commitDB() # ScanCycle data cycle_interval = scanCycle_data['cic_EveryXmin'] @@ -951,13 +899,13 @@ def scan_network (): if PIHOLE_ACTIVE : mylog('verbose', [' Pi-hole start']) copy_pihole_network() - commitDB() + db.commitDB() # DHCP Leases method if DHCP_ACTIVE : mylog('verbose', [' DHCP Leases start']) read_DHCP_leases () - commitDB() + db.commitDB() # Load current scan data mylog('verbose', [' Processing scan results']) @@ -1007,7 +955,7 @@ def scan_network (): skip_repeated_notifications () # Commit changes - commitDB() + db.commitDB() # Run splugin scripts which are set to run every timne after a scan finished if ENABLE_PLUGINS: @@ -1536,7 +1484,7 @@ def update_devices_data_from_scan (): print_log ('Update devices end') #------------------------------------------------------------------------------- -def update_devices_names (): +def update_devices_names (db): # Initialize variables recordsToUpdate = [] recordsNotFound = [] @@ -1550,7 +1498,7 @@ def update_devices_names (): # BUGFIX #97 - Updating name of Devices w/o IP sql.execute ("SELECT * FROM Devices WHERE dev_Name IN ('(unknown)','', '(name not found)') AND dev_LastIP <> '-'") unknownDevices = sql.fetchall() - commitDB() + db.commitDB() # perform Pholus scan if (unknown) devices found if PHOLUS_ACTIVE and (len(unknownDevices) > 0 or PHOLUS_FORCE): @@ -1566,7 +1514,7 @@ def update_devices_names (): # get names from Pholus scan sql.execute ('SELECT * FROM Pholus_Scan where "Record_Type"="Answer"') pholusResults = list(sql.fetchall()) - commitDB() + db.commitDB() # Number of entries from previous Pholus scans mylog('verbose', [" Pholus entries from prev scans: ", len(pholusResults)]) @@ -1603,11 +1551,11 @@ def update_devices_names (): sql.executemany ("UPDATE Devices SET dev_Name = ? WHERE dev_MAC = ? ", recordsNotFound ) # update names of devices which we were bale to resolve sql.executemany ("UPDATE Devices SET dev_Name = ? WHERE dev_MAC = ? ", recordsToUpdate ) - commitDB() + db.commitDB() #------------------------------------------------------------------------------- -def performNmapScan(devicesToScan): +def performNmapScan(db, devicesToScan): global changedPorts_json_struc @@ -1619,7 +1567,7 @@ def performNmapScan(devicesToScan): devTotal = len(devicesToScan) - updateState("Scan: Nmap") + updateState(sql_connection,"Scan: Nmap") mylog('verbose', ['[', timeNow(), '] Scan: Nmap for max ', str(timeoutSec), 's ('+ str(round(int(timeoutSec) / 60, 1)) +'min) per device']) @@ -1777,11 +1725,11 @@ def performNmapScan(devicesToScan): # Delete old entries if available if len(indexesToDelete) > 0: sql.execute ("DELETE FROM Nmap_Scan where \"Index\" in (" + indexesToDelete[:-1] +")") - commitDB () + db.commitDB() # Insert new values into the DB sql.executemany ("""INSERT INTO Nmap_Scan ("MAC", "Time", "Port", "State", "Service", "Extra") VALUES (?, ?, ?, ?, ?, ?)""", params) - commitDB () + db.commitDB() #------------------------------------------------------------------------------- class nmap_entry: @@ -1797,7 +1745,7 @@ class nmap_entry: self.hash = str(mac) + str(port)+ str(state)+ str(service) #------------------------------------------------------------------------------- -def performPholusScan (timeoutSec): +def performPholusScan (db, timeoutSec): # scan every interface for subnet in userSubnets: @@ -1812,7 +1760,7 @@ def performPholusScan (timeoutSec): interface = temp[1].strip() # logging & updating app state - updateState("Scan: Pholus") + updateState(sql_connection,"Scan: Pholus") mylog('info', ['[', timeNow(), '] Scan: Pholus for ', str(timeoutSec), 's ('+ str(round(int(timeoutSec) / 60, 1)) +'min)']) mylog('verbose', [" Pholus scan on [interface] ", interface, " [mask] " , mask]) @@ -1858,7 +1806,7 @@ def performPholusScan (timeoutSec): if len(params) > 0: sql.executemany ("""INSERT INTO Pholus_Scan ("Info", "Time", "MAC", "IP_v4_or_v6", "Record_Type", "Value", "Extra") VALUES (?, ?, ?, ?, ?, ?, ?)""", params) - commitDB () + db.commitDB() #------------------------------------------------------------------------------- def cleanResult(str): @@ -1990,7 +1938,7 @@ def resolve_device_name_dig (pMAC, pIP): return -1 #------------------------------------------------------------------------------- -def void_ghost_disconnections (): +def void_ghost_disconnections (db): # Void connect ghost events (disconnect event exists in last X min.) print_log ('Void - 1 Connect ghost events') sql.execute ("""UPDATE Events SET eve_PairEventRowid = Null, @@ -2049,10 +1997,10 @@ def void_ghost_disconnections (): ) """, (cycle, startTime) ) print_log ('Void end') - commitDB() + db.commitDB() #------------------------------------------------------------------------------- -def pair_sessions_events (): +def pair_sessions_events (db): # NOT NECESSARY FOR INCREMENTAL UPDATE # print_log ('Pair session - 1 Clean') # sql.execute ("""UPDATE Events @@ -2088,10 +2036,10 @@ def pair_sessions_events (): """ ) print_log ('Pair session end') - commitDB() + db.commitDB() #------------------------------------------------------------------------------- -def create_sessions_snapshot (): +def create_sessions_snapshot (db): # Clean sessions snapshot print_log ('Sessions Snapshot - 1 Clean') @@ -2103,12 +2051,12 @@ def create_sessions_snapshot (): SELECT * FROM Convert_Events_to_Sessions""" ) print_log ('Sessions end') - commitDB() + db.commitDB() #------------------------------------------------------------------------------- -def skip_repeated_notifications (): +def skip_repeated_notifications (db): # Skip repeated notifications # due strfime : Overflow --> use "strftime / 60" @@ -2126,7 +2074,7 @@ def skip_repeated_notifications (): """ ) print_log ('Skip Repeated end') - commitDB() + db.commitDB() #=============================================================================== @@ -2135,7 +2083,7 @@ def skip_repeated_notifications (): # create a json for webhook and mqtt notifications to provide further integration options json_final = [] -def send_notifications (): +def send_notifications (db): global mail_text, mail_html, json_final, changedPorts_json_struc, partial_html, partial_txt, partial_json deviceUrl = REPORT_DASHBOARD_URL + '/deviceDetails.php?mac=' @@ -2303,38 +2251,38 @@ def send_notifications (): mylog('none', [' Changes detected, sending reports']) if REPORT_MAIL and check_config('email'): - updateState("Send: Email") + updateState(sql_connection,"Send: Email") mylog('info', [' Sending report by Email']) send_email (mail_text, mail_html) else : mylog('verbose', [' Skip email']) if REPORT_APPRISE and check_config('apprise'): - updateState("Send: Apprise") + updateState(sql_connection,"Send: Apprise") mylog('info', [' Sending report by Apprise']) send_apprise (mail_html, mail_text) else : mylog('verbose', [' Skip Apprise']) if REPORT_WEBHOOK and check_config('webhook'): - updateState("Send: Webhook") + updateState(sql_connection,"Send: Webhook") mylog('info', [' Sending report by Webhook']) send_webhook (json_final, mail_text) else : mylog('verbose', [' Skip webhook']) if REPORT_NTFY and check_config('ntfy'): - updateState("Send: NTFY") + updateState(sql_connection,"Send: NTFY") mylog('info', [' Sending report by NTFY']) send_ntfy (mail_text) else : mylog('verbose', [' Skip NTFY']) if REPORT_PUSHSAFER and check_config('pushsafer'): - updateState("Send: PUSHSAFER") + updateState(sql_connection,"Send: PUSHSAFER") mylog('info', [' Sending report by PUSHSAFER']) send_pushsafer (mail_text) else : mylog('verbose', [' Skip PUSHSAFER']) # Update MQTT entities if REPORT_MQTT and check_config('mqtt'): - updateState("Send: MQTT") + updateState(sql_connection,"Send: MQTT") mylog('info', [' Establishing MQTT thread']) mqtt_start() else : @@ -2359,7 +2307,7 @@ def send_notifications (): mylog('info', ['[', timeNow(), '] Notifications: ', sql.rowcount]) # Commit changes - commitDB() + db.commitDB() #------------------------------------------------------------------------------- def construct_notifications(sqlQuery, tableTitle, skipText = False, suppliedJsonStruct = None): @@ -2904,266 +2852,6 @@ def mqtt_start(): # time.sleep(10) -#=============================================================================== -# DB -#=============================================================================== -#------------------------------------------------------------------------------- -def upgradeDB (): - - # indicates, if Online_History table is available - onlineHistoryAvailable = sql.execute(""" - SELECT name FROM sqlite_master WHERE type='table' - AND name='Online_History'; - """).fetchall() != [] - - # Check if it is incompatible (Check if table has all required columns) - isIncompatible = False - - if onlineHistoryAvailable : - isIncompatible = sql.execute (""" - SELECT COUNT(*) AS CNTREC FROM pragma_table_info('Online_History') WHERE name='Archived_Devices' - """).fetchone()[0] == 0 - - # Drop table if available, but incompatible - if onlineHistoryAvailable and isIncompatible: - mylog('none','[upgradeDB] Table is incompatible, Dropping the Online_History table') - sql.execute("DROP TABLE Online_History;") - onlineHistoryAvailable = False - - if onlineHistoryAvailable == False : - sql.execute(""" - CREATE TABLE "Online_History" ( - "Index" INTEGER, - "Scan_Date" TEXT, - "Online_Devices" INTEGER, - "Down_Devices" INTEGER, - "All_Devices" INTEGER, - "Archived_Devices" INTEGER, - PRIMARY KEY("Index" AUTOINCREMENT) - ); - """) - - # Alter Devices table - # dev_Network_Node_MAC_ADDR column - dev_Network_Node_MAC_ADDR_missing = sql.execute (""" - SELECT COUNT(*) AS CNTREC FROM pragma_table_info('Devices') WHERE name='dev_Network_Node_MAC_ADDR' - """).fetchone()[0] == 0 - - if dev_Network_Node_MAC_ADDR_missing : - mylog('verbose', ["[upgradeDB] Adding dev_Network_Node_MAC_ADDR to the Devices table"]) - sql.execute(""" - ALTER TABLE "Devices" ADD "dev_Network_Node_MAC_ADDR" TEXT - """) - - # dev_Network_Node_port column - dev_Network_Node_port_missing = sql.execute (""" - SELECT COUNT(*) AS CNTREC FROM pragma_table_info('Devices') WHERE name='dev_Network_Node_port' - """).fetchone()[0] == 0 - - if dev_Network_Node_port_missing : - mylog('verbose', ["[upgradeDB] Adding dev_Network_Node_port to the Devices table"]) - sql.execute(""" - ALTER TABLE "Devices" ADD "dev_Network_Node_port" INTEGER - """) - - # dev_Icon column - dev_Icon_missing = sql.execute (""" - SELECT COUNT(*) AS CNTREC FROM pragma_table_info('Devices') WHERE name='dev_Icon' - """).fetchone()[0] == 0 - - if dev_Icon_missing : - mylog('verbose', ["[upgradeDB] Adding dev_Icon to the Devices table"]) - sql.execute(""" - ALTER TABLE "Devices" ADD "dev_Icon" TEXT - """) - - # indicates, if Settings table is available - settingsMissing = sql.execute(""" - SELECT name FROM sqlite_master WHERE type='table' - AND name='Settings'; - """).fetchone() == None - - # Re-creating Settings table - mylog('verbose', ["[upgradeDB] Re-creating Settings table"]) - - if settingsMissing == False: - sql.execute("DROP TABLE Settings;") - - sql.execute(""" - CREATE TABLE "Settings" ( - "Code_Name" TEXT, - "Display_Name" TEXT, - "Description" TEXT, - "Type" TEXT, - "Options" TEXT, - "RegEx" TEXT, - "Value" TEXT, - "Group" TEXT, - "Events" TEXT - ); - """) - - # indicates, if Pholus_Scan table is available - pholusScanMissing = sql.execute(""" - SELECT name FROM sqlite_master WHERE type='table' - AND name='Pholus_Scan'; - """).fetchone() == None - - # if pholusScanMissing == False: - # # Re-creating Pholus_Scan table - # sql.execute("DROP TABLE Pholus_Scan;") - # pholusScanMissing = True - - if pholusScanMissing: - mylog('verbose', ["[upgradeDB] Re-creating Pholus_Scan table"]) - sql.execute(""" - CREATE TABLE "Pholus_Scan" ( - "Index" INTEGER, - "Info" TEXT, - "Time" TEXT, - "MAC" TEXT, - "IP_v4_or_v6" TEXT, - "Record_Type" TEXT, - "Value" TEXT, - "Extra" TEXT, - PRIMARY KEY("Index" AUTOINCREMENT) - ); - """) - - # indicates, if Nmap_Scan table is available - nmapScanMissing = sql.execute(""" - SELECT name FROM sqlite_master WHERE type='table' - AND name='Nmap_Scan'; - """).fetchone() == None - - # Re-creating Parameters table - mylog('verbose', ["[upgradeDB] Re-creating Parameters table"]) - sql.execute("DROP TABLE Parameters;") - - sql.execute(""" - CREATE TABLE "Parameters" ( - "par_ID" TEXT PRIMARY KEY, - "par_Value" TEXT - ); - """) - - # Initialize Parameters if unavailable - initOrSetParam('Back_App_State','Initializing') - - # if nmapScanMissing == False: - # # Re-creating Nmap_Scan table - # sql.execute("DROP TABLE Nmap_Scan;") - # nmapScanMissing = True - - if nmapScanMissing: - mylog('verbose', ["[upgradeDB] Re-creating Nmap_Scan table"]) - sql.execute(""" - CREATE TABLE "Nmap_Scan" ( - "Index" INTEGER, - "MAC" TEXT, - "Port" TEXT, - "Time" TEXT, - "State" TEXT, - "Service" TEXT, - "Extra" TEXT, - PRIMARY KEY("Index" AUTOINCREMENT) - ); - """) - - # Plugin state - sql_Plugins_Objects = """ CREATE TABLE IF NOT EXISTS Plugins_Objects( - "Index" INTEGER, - Plugin TEXT NOT NULL, - Object_PrimaryID TEXT NOT NULL, - Object_SecondaryID TEXT NOT NULL, - DateTimeCreated TEXT NOT NULL, - DateTimeChanged TEXT NOT NULL, - Watched_Value1 TEXT NOT NULL, - Watched_Value2 TEXT NOT NULL, - Watched_Value3 TEXT NOT NULL, - Watched_Value4 TEXT NOT NULL, - Status TEXT NOT NULL, - Extra TEXT NOT NULL, - UserData TEXT NOT NULL, - ForeignKey TEXT NOT NULL, - PRIMARY KEY("Index" AUTOINCREMENT) - ); """ - sql.execute(sql_Plugins_Objects) - - # Plugin execution results - sql_Plugins_Events = """ CREATE TABLE IF NOT EXISTS Plugins_Events( - "Index" INTEGER, - Plugin TEXT NOT NULL, - Object_PrimaryID TEXT NOT NULL, - Object_SecondaryID TEXT NOT NULL, - DateTimeCreated TEXT NOT NULL, - DateTimeChanged TEXT NOT NULL, - Watched_Value1 TEXT NOT NULL, - Watched_Value2 TEXT NOT NULL, - Watched_Value3 TEXT NOT NULL, - Watched_Value4 TEXT NOT NULL, - Status TEXT NOT NULL, - Extra TEXT NOT NULL, - UserData TEXT NOT NULL, - ForeignKey TEXT NOT NULL, - PRIMARY KEY("Index" AUTOINCREMENT) - ); """ - sql.execute(sql_Plugins_Events) - - # Plugin execution history - sql_Plugins_History = """ CREATE TABLE IF NOT EXISTS Plugins_History( - "Index" INTEGER, - Plugin TEXT NOT NULL, - Object_PrimaryID TEXT NOT NULL, - Object_SecondaryID TEXT NOT NULL, - DateTimeCreated TEXT NOT NULL, - DateTimeChanged TEXT NOT NULL, - Watched_Value1 TEXT NOT NULL, - Watched_Value2 TEXT NOT NULL, - Watched_Value3 TEXT NOT NULL, - Watched_Value4 TEXT NOT NULL, - Status TEXT NOT NULL, - Extra TEXT NOT NULL, - UserData TEXT NOT NULL, - ForeignKey TEXT NOT NULL, - PRIMARY KEY("Index" AUTOINCREMENT) - ); """ - sql.execute(sql_Plugins_History) - - # Dynamically generated language strings - # indicates, if Language_Strings table is available - languageStringsMissing = sql.execute(""" - SELECT name FROM sqlite_master WHERE type='table' - AND name='Plugins_Language_Strings'; - """).fetchone() == None - - if languageStringsMissing == False: - sql.execute("DROP TABLE Plugins_Language_Strings;") - - sql.execute(""" CREATE TABLE IF NOT EXISTS Plugins_Language_Strings( - "Index" INTEGER, - Language_Code TEXT NOT NULL, - String_Key TEXT NOT NULL, - String_Value TEXT NOT NULL, - Extra TEXT NOT NULL, - PRIMARY KEY("Index" AUTOINCREMENT) - ); """) - - commitDB () - -#------------------------------------------------------------------------------- -def initOrSetParam(parID, parValue): - - sql.execute ("INSERT INTO Parameters(par_ID, par_Value) VALUES('"+str(parID)+"', '"+str(parValue)+"') ON CONFLICT(par_ID) DO UPDATE SET par_Value='"+str(parValue)+"' where par_ID='"+str(parID)+"'") - - commitDB () - -#------------------------------------------------------------------------------- -def updateState(newState): - - sql.execute ("UPDATE Parameters SET par_Value='"+ newState +"' WHERE par_ID='Back_App_State'") - - commitDB () #=============================================================================== @@ -3442,43 +3130,27 @@ def to_text(_json): return payloadData #------------------------------------------------------------------------------- -def get_device_stats(): +def get_device_stats(db): # columns = ["online","down","all","archived","new","unknown"] sql.execute(sql_devices_stats) row = sql.fetchone() - commitDB() + db.commitDB() return row #------------------------------------------------------------------------------- -def get_all_devices(): +def get_all_devices(db): sql.execute(sql_devices_all) row = sql.fetchall() - commitDB() + db.commitDB() return row #------------------------------------------------------------------------------- -def get_sql_array(query): - sql.execute(query) - - rows = sql.fetchall() - - commitDB() - - # convert result into list of lists - arr = [] - for row in rows: - r_temp = [] - for column in row: - r_temp.append(column) - arr.append(r_temp) - - return arr #------------------------------------------------------------------------------- @@ -3499,7 +3171,7 @@ def hide_email(email): return email #------------------------------------------------------------------------------- -def check_and_run_event(): +def check_and_run_event(db): sql.execute(""" select * from Parameters where par_ID = "Front_Event" """) rows = sql.fetchall() @@ -3519,7 +3191,7 @@ def check_and_run_event(): sql.execute ("UPDATE Parameters SET par_Value='finished' WHERE par_ID='Front_Event'") # commit to DB - commitDB () + db.commitDB() #------------------------------------------------------------------------------- def handle_run(runType): @@ -3592,7 +3264,7 @@ def get_setting(key): return result #------------------------------------------------------------------------------- -def isNewVersion(): +def isNewVersion(db): global newVersionAvailable if newVersionAvailable == False: @@ -3621,7 +3293,7 @@ def isNewVersion(): if realeaseTimestamp > buildTimestamp + 600: mylog('none', [" New version of the container available!"]) newVersionAvailable = True - initOrSetParam('Back_New_Version_Available', str(newVersionAvailable)) + updateState(db, 'Back_New_Version_Available', str(newVersionAvailable)) return newVersionAvailable @@ -3645,19 +3317,19 @@ def get_plugins_configs(): return pluginsList #------------------------------------------------------------------------------- -def collect_lang_strings(json, pref): +def collect_lang_strings(db, json, pref): for prop in json["localized"]: for language_string in json[prop]: - import_language_string(language_string["language_code"], pref + "_" + prop, language_string["string"]) + import_language_string(db, language_string["language_code"], pref + "_" + prop, language_string["string"]) #------------------------------------------------------------------------------- -def import_language_string(code, key, value, extra = ""): +def import_language_string(db, code, key, value, extra = ""): sql.execute ("""INSERT INTO Plugins_Language_Strings ("Language_Code", "String_Key", "String_Value", "Extra") VALUES (?, ?, ?, ?)""", (str(code), str(key), str(value), str(extra))) - commitDB () + db.commitDB() #------------------------------------------------------------------------------- @@ -3670,7 +3342,7 @@ def run_plugin_scripts(runType): global plugins, tz, mySchedules # Header - updateState("Run: Plugins") + updateState(sql_connection,"Run: Plugins") mylog('debug', [' [Plugins] Check if any plugins need to be executed on run type: ', runType]) @@ -3703,7 +3375,7 @@ def run_plugin_scripts(runType): #------------------------------------------------------------------------------- # Executes the plugin command specified in the setting with the function specified as CMD -def execute_plugin(plugin): +def execute_plugin(db, plugin): # ------- necessary settings check -------- set = get_plugin_setting(plugin, "CMD") @@ -3740,7 +3412,7 @@ def execute_plugin(plugin): # Get Sql result if param["type"] == "sql": - resolved = flatten_array(get_sql_array(param["value"])) + resolved = flatten_array(db.get_sql_array(param["value"])) if resolved == None: mylog('none', [' [Plugins] The parameter "name":"', param["name"], '" was resolved as None']) @@ -3802,7 +3474,7 @@ def execute_plugin(plugin): mylog('verbose', [' [Plugins] Executing: ', q]) # set_CMD should contain a SQL query - arr = get_sql_array (q) + arr = db.get_sql_array (q) for row in arr: # There has to be always 9 columns @@ -3822,9 +3494,9 @@ def execute_plugin(plugin): # process results if any if len(sqlParams) > 0: sql.executemany ("""INSERT INTO Plugins_Events ("Plugin", "Object_PrimaryID", "Object_SecondaryID", "DateTimeCreated", "DateTimeChanged", "Watched_Value1", "Watched_Value2", "Watched_Value3", "Watched_Value4", "Status" ,"Extra", "UserData", "ForeignKey") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", sqlParams) - commitDB () + db.commitDB() sql.executemany ("""INSERT INTO Plugins_History ("Plugin", "Object_PrimaryID", "Object_SecondaryID", "DateTimeCreated", "DateTimeChanged", "Watched_Value1", "Watched_Value2", "Watched_Value3", "Watched_Value4", "Status" ,"Extra", "UserData", "ForeignKey") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", sqlParams) - commitDB () + db.commitDB() process_plugin_events(plugin) @@ -3842,7 +3514,7 @@ def handle_empty(value): #------------------------------------------------------------------------------- # Check if watched values changed for the given plugin -def process_plugin_events(plugin): +def process_plugin_events(db, plugin): global pluginObjects, pluginEvents @@ -3850,8 +3522,8 @@ def process_plugin_events(plugin): mylog('debug', [' [Plugins] Processing : ', pluginPref]) - plugObjectsArr = get_sql_array ("SELECT * FROM Plugins_Objects where Plugin = '" + str(pluginPref)+"'") - plugEventsArr = get_sql_array ("SELECT * FROM Plugins_Events where Plugin = '" + str(pluginPref)+"'") + plugObjectsArr = db.get_sql_array ("SELECT * FROM Plugins_Objects where Plugin = '" + str(pluginPref)+"'") + plugEventsArr = db.get_sql_array ("SELECT * FROM Plugins_Events where Plugin = '" + str(pluginPref)+"'") pluginObjects = [] pluginEvents = [] @@ -4004,7 +3676,7 @@ def process_plugin_events(plugin): sql.executemany (q, sqlParams) - commitDB() + db.commitDB() #------------------------------------------------------------------------------- class plugin_object_class: