From 03163e424f79cfda8c8541990b1ea941dd58ea8c Mon Sep 17 00:00:00 2001 From: Data-Monkey Date: Sun, 21 May 2023 21:22:51 +1000 Subject: [PATCH] more splitting done --- db/pialert.db | Bin 245760 -> 245760 bytes pialert/conf.py | 95 ++++ pialert/config.py | 22 - pialert/database.py | 31 +- pialert/files.py | 26 + pialert/helper.py | 389 +++++++++++++- pialert/internet.py | 195 +++++++ pialert/logger.py | 15 +- pialert/pialert.py | 1179 +------------------------------------------ pialert/plugin.py | 556 ++++++++++++++++++++ 10 files changed, 1314 insertions(+), 1194 deletions(-) create mode 100644 pialert/conf.py delete mode 100644 pialert/config.py create mode 100644 pialert/files.py create mode 100644 pialert/internet.py create mode 100644 pialert/plugin.py diff --git a/db/pialert.db b/db/pialert.db index 4b5bd1db4b2a54485a683c8efda866c6452e39e6..16f9e46fa7d7de23b58a85777e24cdcfca00ccd2 100755 GIT binary patch delta 436 zcmZo@;BRQ)pCB!`oq>VD28f}6MQ@^xG2`})2`lZHU-IACEGTe+Ux0^&Q+uOzi7 zFSSIBg>&+o{Su2=0t%SLcylJRAJAsv&Dq#!!K)x6#?D~O$ic{AWME{hYhbEtWT;?h zVP#}!Wn#+2BqKW6;hgB^g=gc07{xX{_*pM(9tE`3)RY%UGVyO?;D5-!4QSyielJZX zW=mbC#N_Pw;M9_m%)IpCc+cE|{GyW76b(Z&3llRlb4w#53rhlm4XAojI%`jbH0;38W zkO73-jV3Vqam00I7@G&zm@b~c>@m=OC=Tqc8#GA%*lP84j2J14GXRMjb8yoL3F*heC zZEu>v_?VHoks)FF+*ynbY{rRf;_ix*Yko*fZ=S^{&JA=V2M{wf>Bn!sIFC`Ear=#J H%o>~k%8hrz delta 383 zcmZo@;BRQ)pCBzbkAZ=~28f}6dC^22W5#(K6IR+Y-{JqfSy12tzlsi0Kz{Ed^g?|$NCZPFE{3<{JemO>FO?9Wl85M<0jZI7qOiYb+G@ OY}~`P{l+$C4Nd@~)oBF) diff --git a/pialert/conf.py b/pialert/conf.py new file mode 100644 index 00000000..71fe51a7 --- /dev/null +++ b/pialert/conf.py @@ -0,0 +1,95 @@ +""" config related functions for Pi.Alert """ + +mySettings = [] + +# General +ENABLE_ARPSCAN = True +SCAN_SUBNETS = ['192.168.1.0/24 --interface=eth1', '192.168.1.0/24 --interface=eth0'] +LOG_LEVEL = 'verbose' +TIMEZONE = 'Europe/Berlin' +ENABLE_PLUGINS = True +PIALERT_WEB_PROTECTION = False +PIALERT_WEB_PASSWORD = '8d969eef6ecad3c29a3a629280e686cf0c3f5d5a86aff3ca12020c923adc6c92' +INCLUDED_SECTIONS = ['internet', 'new_devices', 'down_devices', 'events', 'ports'] +SCAN_CYCLE_MINUTES = 5 +DAYS_TO_KEEP_EVENTS = 90 +REPORT_DASHBOARD_URL = 'http://pi.alert/' +DIG_GET_IP_ARG = '-4 myip.opendns.com @resolver1.opendns.com' +UI_LANG = 'English' +UI_PRESENCE = ['online', 'offline', 'archived'] + +tz = '' + +# Email +REPORT_MAIL = False +SMTP_SERVER = '' +SMTP_PORT = 587 +REPORT_TO = 'user@gmail.com' +REPORT_FROM = 'Pi.Alert ' +SMTP_SKIP_LOGIN = False +SMTP_USER = '' +SMTP_PASS = '' +SMTP_SKIP_TLS = False +SMTP_FORCE_SSL = False + +# Webhooks +REPORT_WEBHOOK = False +WEBHOOK_URL = '' +WEBHOOK_PAYLOAD = 'json' +WEBHOOK_REQUEST_METHOD = 'GET' + +# Apprise +REPORT_APPRISE = False +APPRISE_HOST = '' +APPRISE_URL = '' +APPRISE_PAYLOAD = 'html' + +# NTFY +REPORT_NTFY = False +NTFY_HOST ='https://ntfy.sh' +NTFY_TOPIC ='' +NTFY_USER = '' +NTFY_PASSWORD = '' + +# PUSHSAFER +REPORT_PUSHSAFER = False +PUSHSAFER_TOKEN = 'ApiKey' + +# MQTT +REPORT_MQTT = False +MQTT_BROKER = '' +MQTT_PORT = 1883 +MQTT_USER = '' +MQTT_PASSWORD = '' +MQTT_QOS = 0 +MQTT_DELAY_SEC = 2 + +# DynDNS +DDNS_ACTIVE = False +DDNS_DOMAIN = 'your_domain.freeddns.org' +DDNS_USER = 'dynu_user' +DDNS_PASSWORD = 'A0000000B0000000C0000000D0000000' +DDNS_UPDATE_URL = 'https://api.dynu.com/nic/update?' + +# PiHole +PIHOLE_ACTIVE = False +DHCP_ACTIVE = False + +# PHOLUS +PHOLUS_ACTIVE = False +PHOLUS_TIMEOUT = 20 +PHOLUS_FORCE = False +PHOLUS_RUN = 'once' +PHOLUS_RUN_TIMEOUT = 600 +PHOLUS_RUN_SCHD = '0 4 * * *' +PHOLUS_DAYS_DATA = 0 + +# Nmap +NMAP_ACTIVE = True +NMAP_TIMEOUT = 150 +NMAP_RUN = 'none' +NMAP_RUN_SCHD = '0 2 * * *' +NMAP_ARGS = '-p -10000' + +# API +API_CUSTOM_SQL = 'SELECT * FROM Devices WHERE dev_PresentLastScan = 0' \ No newline at end of file diff --git a/pialert/config.py b/pialert/config.py deleted file mode 100644 index e0da2934..00000000 --- a/pialert/config.py +++ /dev/null @@ -1,22 +0,0 @@ - - -global mySettings, mySettingsSQLsafe -#------------------------------------------------------------------------------- -# Import user values -# Check config dictionary -def ccd(key, default, config, name, inputtype, options, group, events=[], desc = "", regex = ""): - result = default - - # use existing value if already supplied, otherwise default value is used - if key in config: - result = config[key] - - global mySettings - - if inputtype == 'text': - result = result.replace('\'', "{s-quote}") - - mySettingsSQLsafe.append((key, name, desc, inputtype, options, regex, str(result), group, str(events))) - mySettings.append((key, name, desc, inputtype, options, regex, result, group, str(events))) - - return result \ No newline at end of file diff --git a/pialert/database.py b/pialert/database.py index 071cfbaa..5c750d69 100644 --- a/pialert/database.py +++ b/pialert/database.py @@ -2,8 +2,10 @@ import sqlite3 +# pialert modules from const import fullDbPath -from logger import print_log, mylog +from logger import mylog +from helper import initOrSetParam, json_struc, row_to_json #=============================================================================== @@ -141,14 +143,7 @@ class DB(): -#------------------------------------------------------------------------------- -def initOrSetParam(db, parID, parValue): - sql_connection = db.sql_connection - sql = db.sql - sql.execute ("INSERT INTO Parameters(par_ID, par_Value) VALUES('"+str(parID)+"', '"+str(parValue)+"') ON CONFLICT(par_ID) DO UPDATE SET par_Value='"+str(parValue)+"' where par_ID='"+str(parID)+"'") - - db.commitDB() #------------------------------------------------------------------------------- def updateState(db, newState): @@ -159,6 +154,26 @@ def updateState(db, newState): db.commitDB() +#------------------------------------------------------------------------------- +def get_table_as_json(db, sqlQuery): + + db.sql.execute(sqlQuery) + + columnNames = list(map(lambda x: x[0], db.sql.description)) + + rows = db.sql.fetchall() + + result = {"data":[]} + + for row in rows: + tmp = row_to_json(columnNames, row) + result["data"].append(tmp) + return json_struc(result, columnNames) + + + + + #------------------------------------------------------------------------------- diff --git a/pialert/files.py b/pialert/files.py new file mode 100644 index 00000000..e283c59c --- /dev/null +++ b/pialert/files.py @@ -0,0 +1,26 @@ +import io +import sys + + +#------------------------------------------------------------------------------- +def write_file (pPath, pText): + # Write the text depending using the correct python version + if sys.version_info < (3, 0): + file = io.open (pPath , mode='w', encoding='utf-8') + file.write ( pText.decode('unicode_escape') ) + file.close() + else: + file = open (pPath, 'w', encoding='utf-8') + if pText is None: + pText = "" + file.write (pText) + file.close() + +#------------------------------------------------------------------------------- +def get_file_content(path): + + f = open(path, 'r') + content = f.read() + f.close() + + return content \ No newline at end of file diff --git a/pialert/helper.py b/pialert/helper.py index 51eb358e..2723228d 100644 --- a/pialert/helper.py +++ b/pialert/helper.py @@ -1,10 +1,28 @@ """ Colection of generic functions to support Pi.Alert """ + import datetime import os +import sys +import io +import re import subprocess +from cron_converter import Cron +from pytz import timezone +from datetime import timedelta +import json +import time +from pathlib import Path + + + from const import * -from logger import mylog, logResult +from logger import mylog, logResult, print_log +from conf import tz +from files import write_file +# from api import update_api # to avoid circular reference +from plugin import get_plugins_configs, get_setting, print_plugin_info + #------------------------------------------------------------------------------- def timeNow(): @@ -103,4 +121,371 @@ def filePermissions(): initialiseFile(fullDbPath, "/home/pi/pialert/back/pialert.db_bak") # last attempt - fixPermissions() \ No newline at end of file + fixPermissions() + +#------------------------------------------------------------------------------- +class schedule_class: + def __init__(self, service, scheduleObject, last_next_schedule, was_last_schedule_used, last_run = 0): + self.service = service + self.scheduleObject = scheduleObject + self.last_next_schedule = last_next_schedule + self.last_run = last_run + self.was_last_schedule_used = was_last_schedule_used + def runScheduleCheck(self): + + result = False + + # Initialize the last run time if never run before + if self.last_run == 0: + self.last_run = (datetime.datetime.now(tz) - timedelta(days=365)).replace(microsecond=0) + + # get the current time with the currently specified timezone + nowTime = datetime.datetime.now(tz).replace(microsecond=0) + + # Run the schedule if the current time is past the schedule time we saved last time and + # (maybe the following check is unnecessary:) + # if the last run is past the last time we run a scheduled Pholus scan + if nowTime > self.last_next_schedule and self.last_run < self.last_next_schedule: + print_log(f'Scheduler run for {self.service}: YES') + self.was_last_schedule_used = True + result = True + else: + print_log(f'Scheduler run for {self.service}: NO') + + if self.was_last_schedule_used: + self.was_last_schedule_used = False + self.last_next_schedule = self.scheduleObject.next() + + return result + + + + +#------------------------------------------------------------------------------- + +def bytes_to_string(value): + # if value is of type bytes, convert to string + if isinstance(value, bytes): + value = value.decode('utf-8') + return value + +#------------------------------------------------------------------------------- + +def if_byte_then_to_str(input): + if isinstance(input, bytes): + input = input.decode('utf-8') + input = bytes_to_string(re.sub('[^a-zA-Z0-9-_\s]', '', str(input))) + return input + +#------------------------------------------------------------------------------- +def collect_lang_strings(db, json, pref): + + for prop in json["localized"]: + for language_string in json[prop]: + import_language_string(db, language_string["language_code"], pref + "_" + prop, language_string["string"]) + + +#------------------------------------------------------------------------------- +def initOrSetParam(db, parID, parValue): + sql_connection = db.sql_connection + sql = db.sql + + sql.execute ("INSERT INTO Parameters(par_ID, par_Value) VALUES('"+str(parID)+"', '"+str(parValue)+"') ON CONFLICT(par_ID) DO UPDATE SET par_Value='"+str(parValue)+"' where par_ID='"+str(parID)+"'") + + db.commitDB() + + +#=============================================================================== +# Initialise user defined values +#=============================================================================== +# We need access to the DB to save new values so need to define DB access methods first +#------------------------------------------------------------------------------- + +#------------------------------------------------------------------------------- +# Import user values +# Check config dictionary +def ccd(key, default, config, name, inputtype, options, group, events=[], desc = "", regex = ""): + result = default + + # use existing value if already supplied, otherwise default value is used + if key in config: + result = config[key] + + if inputtype == 'text': + result = result.replace('\'', "{s-quote}") + + mySettingsSQLsafe.append((key, name, desc, inputtype, options, regex, str(result), group, str(events))) + mySettings.append((key, name, desc, inputtype, options, regex, result, group, str(events))) + + return result +#------------------------------------------------------------------------------- + +def importConfigs (db): + + sql = db.sql + + # Specify globals so they can be overwritten with the new config + global lastTimeImported, mySettings, mySettingsSQLsafe, plugins, plugins_once_run + lastTimeImported = 0 + # General + global ENABLE_ARPSCAN, SCAN_SUBNETS, LOG_LEVEL, TIMEZONE, ENABLE_PLUGINS, PIALERT_WEB_PROTECTION, PIALERT_WEB_PASSWORD, INCLUDED_SECTIONS, SCAN_CYCLE_MINUTES, DAYS_TO_KEEP_EVENTS, REPORT_DASHBOARD_URL, DIG_GET_IP_ARG, UI_LANG + # Email + global REPORT_MAIL, SMTP_SERVER, SMTP_PORT, REPORT_TO, REPORT_FROM, SMTP_SKIP_LOGIN, SMTP_USER, SMTP_PASS, SMTP_SKIP_TLS, SMTP_FORCE_SSL + # Webhooks + global REPORT_WEBHOOK, WEBHOOK_URL, WEBHOOK_PAYLOAD, WEBHOOK_REQUEST_METHOD + # Apprise + global REPORT_APPRISE, APPRISE_HOST, APPRISE_URL, APPRISE_PAYLOAD + # NTFY + global REPORT_NTFY, NTFY_HOST, NTFY_TOPIC, NTFY_USER, NTFY_PASSWORD + # PUSHSAFER + global REPORT_PUSHSAFER, PUSHSAFER_TOKEN + # MQTT + global REPORT_MQTT, MQTT_BROKER, MQTT_PORT, MQTT_USER, MQTT_PASSWORD, MQTT_QOS, MQTT_DELAY_SEC + # DynDNS + global DDNS_ACTIVE, DDNS_DOMAIN, DDNS_USER, DDNS_PASSWORD, DDNS_UPDATE_URL + # PiHole + global PIHOLE_ACTIVE, DHCP_ACTIVE + # Pholus + global PHOLUS_ACTIVE, PHOLUS_TIMEOUT, PHOLUS_FORCE, PHOLUS_DAYS_DATA, PHOLUS_RUN, PHOLUS_RUN_SCHD, PHOLUS_RUN_TIMEOUT + # Nmap + global NMAP_ACTIVE, NMAP_TIMEOUT, NMAP_RUN, NMAP_RUN_SCHD, NMAP_ARGS + # API + global API_CUSTOM_SQL + + # get config file + config_file = Path(fullConfPath) + + # Skip import if last time of import is NEWER than file age + if (os.path.getmtime(config_file) < lastTimeImported) : + return + + mySettings = [] # reset settings + mySettingsSQLsafe = [] # same as above but safe to be passed into a SQL query + + # load the variables from pialert.conf + code = compile(config_file.read_text(), config_file.name, "exec") + c_d = {} # config dictionary + exec(code, {"__builtins__": {}}, c_d) + + # Import setting if found in the dictionary + # General + ENABLE_ARPSCAN = ccd('ENABLE_ARPSCAN', True , c_d, 'Enable arpscan', 'boolean', '', 'General', ['run']) + SCAN_SUBNETS = ccd('SCAN_SUBNETS', ['192.168.1.0/24 --interface=eth1', '192.168.1.0/24 --interface=eth0'] , c_d, 'Subnets to scan', 'subnets', '', 'General') + LOG_LEVEL = ccd('LOG_LEVEL', 'verbose' , c_d, 'Log verboseness', 'selecttext', "['none', 'minimal', 'verbose', 'debug']", 'General') + TIMEZONE = ccd('TIMEZONE', 'Europe/Berlin' , c_d, 'Time zone', 'text', '', 'General') + ENABLE_PLUGINS = ccd('ENABLE_PLUGINS', True , c_d, 'Enable plugins', 'boolean', '', 'General') + PIALERT_WEB_PROTECTION = ccd('PIALERT_WEB_PROTECTION', False , c_d, 'Enable logon', 'boolean', '', 'General') + PIALERT_WEB_PASSWORD = ccd('PIALERT_WEB_PASSWORD', '8d969eef6ecad3c29a3a629280e686cf0c3f5d5a86aff3ca12020c923adc6c92' , c_d, 'Logon password', 'readonly', '', 'General') + INCLUDED_SECTIONS = ccd('INCLUDED_SECTIONS', ['internet', 'new_devices', 'down_devices', 'events', 'ports'] , c_d, 'Notify on', 'multiselect', "['internet', 'new_devices', 'down_devices', 'events', 'ports', 'plugins']", 'General') + SCAN_CYCLE_MINUTES = ccd('SCAN_CYCLE_MINUTES', 5 , c_d, 'Scan cycle delay (m)', 'integer', '', 'General') + DAYS_TO_KEEP_EVENTS = ccd('DAYS_TO_KEEP_EVENTS', 90 , c_d, 'Delete events days', 'integer', '', 'General') + REPORT_DASHBOARD_URL = ccd('REPORT_DASHBOARD_URL', 'http://pi.alert/' , c_d, 'PiAlert URL', 'text', '', 'General') + DIG_GET_IP_ARG = ccd('DIG_GET_IP_ARG', '-4 myip.opendns.com @resolver1.opendns.com' , c_d, 'DIG arguments', 'text', '', 'General') + UI_LANG = ccd('UI_LANG', 'English' , c_d, 'Language Interface', 'selecttext', "['English', 'German', 'Spanish']", 'General') + UI_PRESENCE = ccd('UI_PRESENCE', ['online', 'offline', 'archived'] , c_d, 'Include in presence', 'multiselect', "['online', 'offline', 'archived']", 'General') + + # Email + REPORT_MAIL = ccd('REPORT_MAIL', False , c_d, 'Enable email', 'boolean', '', 'Email', ['test']) + SMTP_SERVER = ccd('SMTP_SERVER', '' , c_d,'SMTP server URL', 'text', '', 'Email') + SMTP_PORT = ccd('SMTP_PORT', 587 , c_d, 'SMTP port', 'integer', '', 'Email') + REPORT_TO = ccd('REPORT_TO', 'user@gmail.com' , c_d, 'Email to', 'text', '', 'Email') + REPORT_FROM = ccd('REPORT_FROM', 'Pi.Alert ' , c_d, 'Email Subject', 'text', '', 'Email') + SMTP_SKIP_LOGIN = ccd('SMTP_SKIP_LOGIN', False , c_d, 'SMTP skip login', 'boolean', '', 'Email') + SMTP_USER = ccd('SMTP_USER', '' , c_d, 'SMTP user', 'text', '', 'Email') + SMTP_PASS = ccd('SMTP_PASS', '' , c_d, 'SMTP password', 'password', '', 'Email') + SMTP_SKIP_TLS = ccd('SMTP_SKIP_TLS', False , c_d, 'SMTP skip TLS', 'boolean', '', 'Email') + SMTP_FORCE_SSL = ccd('SMTP_FORCE_SSL', False , c_d, 'Force SSL', 'boolean', '', 'Email') + + # Webhooks + REPORT_WEBHOOK = ccd('REPORT_WEBHOOK', False , c_d, 'Enable Webhooks', 'boolean', '', 'Webhooks', ['test']) + WEBHOOK_URL = ccd('WEBHOOK_URL', '' , c_d, 'Target URL', 'text', '', 'Webhooks') + WEBHOOK_PAYLOAD = ccd('WEBHOOK_PAYLOAD', 'json' , c_d, 'Payload type', 'selecttext', "['json', 'html', 'text']", 'Webhooks') + WEBHOOK_REQUEST_METHOD = ccd('WEBHOOK_REQUEST_METHOD', 'GET' , c_d, 'Req type', 'selecttext', "['GET', 'POST', 'PUT']", 'Webhooks') + + # Apprise + REPORT_APPRISE = ccd('REPORT_APPRISE', False , c_d, 'Enable Apprise', 'boolean', '', 'Apprise', ['test']) + APPRISE_HOST = ccd('APPRISE_HOST', '' , c_d, 'Apprise host URL', 'text', '', 'Apprise') + APPRISE_URL = ccd('APPRISE_URL', '' , c_d, 'Apprise notification URL', 'text', '', 'Apprise') + APPRISE_PAYLOAD = ccd('APPRISE_PAYLOAD', 'html' , c_d, 'Payload type', 'selecttext', "['html', 'text']", 'Apprise') + + # NTFY + REPORT_NTFY = ccd('REPORT_NTFY', False , c_d, 'Enable NTFY', 'boolean', '', 'NTFY', ['test']) + NTFY_HOST = ccd('NTFY_HOST', 'https://ntfy.sh' , c_d, 'NTFY host URL', 'text', '', 'NTFY') + NTFY_TOPIC = ccd('NTFY_TOPIC', '' , c_d, 'NTFY topic', 'text', '', 'NTFY') + NTFY_USER = ccd('NTFY_USER', '' , c_d, 'NTFY user', 'text', '', 'NTFY') + NTFY_PASSWORD = ccd('NTFY_PASSWORD', '' , c_d, 'NTFY password', 'password', '', 'NTFY') + + # PUSHSAFER + REPORT_PUSHSAFER = ccd('REPORT_PUSHSAFER', False , c_d, 'Enable PUSHSAFER', 'boolean', '', 'PUSHSAFER', ['test']) + PUSHSAFER_TOKEN = ccd('PUSHSAFER_TOKEN', 'ApiKey' , c_d, 'PUSHSAFER token', 'text', '', 'PUSHSAFER') + + # MQTT + REPORT_MQTT = ccd('REPORT_MQTT', False , c_d, 'Enable MQTT', 'boolean', '', 'MQTT') + MQTT_BROKER = ccd('MQTT_BROKER', '' , c_d, 'MQTT broker', 'text', '', 'MQTT') + MQTT_PORT = ccd('MQTT_PORT', 1883 , c_d, 'MQTT broker port', 'integer', '', 'MQTT') + MQTT_USER = ccd('MQTT_USER', '' , c_d, 'MQTT user', 'text', '', 'MQTT') + MQTT_PASSWORD = ccd('MQTT_PASSWORD', '' , c_d, 'MQTT password', 'password', '', 'MQTT') + MQTT_QOS = ccd('MQTT_QOS', 0 , c_d, 'MQTT Quality of Service', 'selectinteger', "['0', '1', '2']", 'MQTT') + MQTT_DELAY_SEC = ccd('MQTT_DELAY_SEC', 2 , c_d, 'MQTT delay', 'selectinteger', "['2', '3', '4', '5']", 'MQTT') + + # DynDNS + DDNS_ACTIVE = ccd('DDNS_ACTIVE', False , c_d, 'Enable DynDNS', 'boolean', '', 'DynDNS') + DDNS_DOMAIN = ccd('DDNS_DOMAIN', 'your_domain.freeddns.org' , c_d, 'DynDNS domain URL', 'text', '', 'DynDNS') + DDNS_USER = ccd('DDNS_USER', 'dynu_user' , c_d, 'DynDNS user', 'text', '', 'DynDNS') + DDNS_PASSWORD = ccd('DDNS_PASSWORD', 'A0000000B0000000C0000000D0000000' , c_d, 'DynDNS password', 'password', '', 'DynDNS') + DDNS_UPDATE_URL = ccd('DDNS_UPDATE_URL', 'https://api.dynu.com/nic/update?' , c_d, 'DynDNS update URL', 'text', '', 'DynDNS') + + # PiHole + PIHOLE_ACTIVE = ccd('PIHOLE_ACTIVE', False, c_d, 'Enable PiHole mapping', 'boolean', '', 'PiHole') + DHCP_ACTIVE = ccd('DHCP_ACTIVE', False , c_d, 'Enable PiHole DHCP', 'boolean', '', 'PiHole') + + # PHOLUS + PHOLUS_ACTIVE = ccd('PHOLUS_ACTIVE', False , c_d, 'Enable Pholus scans', 'boolean', '', 'Pholus') + PHOLUS_TIMEOUT = ccd('PHOLUS_TIMEOUT', 20 , c_d, 'Pholus timeout', 'integer', '', 'Pholus') + PHOLUS_FORCE = ccd('PHOLUS_FORCE', False , c_d, 'Pholus force check', 'boolean', '', 'Pholus') + PHOLUS_RUN = ccd('PHOLUS_RUN', 'once' , c_d, 'Pholus enable schedule', 'selecttext', "['none', 'once', 'schedule']", 'Pholus') + PHOLUS_RUN_TIMEOUT = ccd('PHOLUS_RUN_TIMEOUT', 600 , c_d, 'Pholus timeout schedule', 'integer', '', 'Pholus') + PHOLUS_RUN_SCHD = ccd('PHOLUS_RUN_SCHD', '0 4 * * *' , c_d, 'Pholus schedule', 'text', '', 'Pholus') + PHOLUS_DAYS_DATA = ccd('PHOLUS_DAYS_DATA', 0 , c_d, 'Pholus keep days', 'integer', '', 'Pholus') + + # Nmap + NMAP_ACTIVE = ccd('NMAP_ACTIVE', True , c_d, 'Enable Nmap scans', 'boolean', '', 'Nmap') + NMAP_TIMEOUT = ccd('NMAP_TIMEOUT', 150 , c_d, 'Nmap timeout', 'integer', '', 'Nmap') + NMAP_RUN = ccd('NMAP_RUN', 'none' , c_d, 'Nmap enable schedule', 'selecttext', "['none', 'once', 'schedule']", 'Nmap') + NMAP_RUN_SCHD = ccd('NMAP_RUN_SCHD', '0 2 * * *' , c_d, 'Nmap schedule', 'text', '', 'Nmap') + NMAP_ARGS = ccd('NMAP_ARGS', '-p -10000' , c_d, 'Nmap custom arguments', 'text', '', 'Nmap') + + # API + API_CUSTOM_SQL = ccd('API_CUSTOM_SQL', 'SELECT * FROM Devices WHERE dev_PresentLastScan = 0' , c_d, 'Custom endpoint', 'text', '', 'API') + + # Prepare scheduler + global tz, mySchedules, plugins + + # Init timezone in case it changed + tz = timezone(TIMEZONE) + + # reset schedules + mySchedules = [] + + # init pholus schedule + pholusSchedule = Cron(PHOLUS_RUN_SCHD).schedule(start_date=datetime.datetime.now(tz)) + mySchedules.append(schedule_class("pholus", pholusSchedule, pholusSchedule.next(), False)) + + # init nmap schedule + nmapSchedule = Cron(NMAP_RUN_SCHD).schedule(start_date=datetime.datetime.now(tz)) + mySchedules.append(schedule_class("nmap", nmapSchedule, nmapSchedule.next(), False)) + + # Format and prepare the list of subnets + userSubnets = updateSubnets(SCAN_SUBNETS) + + + + # Plugins START + # ----------------- + if ENABLE_PLUGINS: + plugins = get_plugins_configs() + + mylog('none', ['[', timeNow(), '] Plugins: Number of dynamically loaded plugins: ', len(plugins)]) + + # handle plugins + for plugin in plugins: + print_plugin_info(plugin, ['display_name','description']) + + pref = plugin["unique_prefix"] + + # if plugin["enabled"] == 'true': + + # collect plugin level language strings + collect_lang_strings(db, plugin, pref) + + for set in plugin["settings"]: + setFunction = set["function"] + # Setting code name / key + key = pref + "_" + setFunction + + v = ccd(key, set["default_value"], c_d, set["name"][0]["string"], set["type"] , str(set["options"]), pref) + + # Save the user defined value into the object + set["value"] = v + + # Setup schedules + if setFunction == 'RUN_SCHD': + newSchedule = Cron(v).schedule(start_date=datetime.datetime.now(tz)) + mySchedules.append(schedule_class(pref, newSchedule, newSchedule.next(), False)) + + # Collect settings related language strings + collect_lang_strings(db, set, pref + "_" + set["function"]) + + plugins_once_run = False + # ----------------- + # Plugins END + + + + + + # Insert settings into the DB + sql.execute ("DELETE FROM Settings") + sql.executemany ("""INSERT INTO Settings ("Code_Name", "Display_Name", "Description", "Type", "Options", + "RegEx", "Value", "Group", "Events" ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""", mySettingsSQLsafe) + + # Used to determine the next import + lastTimeImported = time.time() + + # Is used to display a message in the UI when old (outdated) settings are loaded + initOrSetParam(db, "Back_Settings_Imported",(round(time.time() * 1000),) ) + + #commitDB(sql_connection) + db.commitDB() + + # update only the settings datasource + # update_api(False, ["settings"]) + # TO DO this creates a circular reference between API and HELPER ! + + mylog('info', ['[', timeNow(), '] Config: Imported new config']) + + +#------------------------------------------------------------------------------- +class json_struc: + def __init__(self, jsn, columnNames): + self.json = jsn + self.columnNames = columnNames + +#------------------------------------------------------------------------------- +# Creates a JSON object from a DB row +def row_to_json(names, row): + + rowEntry = {} + + index = 0 + for name in names: + rowEntry[name]= if_byte_then_to_str(row[name]) + index += 1 + + return rowEntry + +#------------------------------------------------------------------------------- +def import_language_string(db, code, key, value, extra = ""): + + db.sql.execute ("""INSERT INTO Plugins_Language_Strings ("Language_Code", "String_Key", "String_Value", "Extra") VALUES (?, ?, ?, ?)""", (str(code), str(key), str(value), str(extra))) + + db.commitDB() + + + +#------------------------------------------------------------------------------- +# Make a regular expression +# for validating an Ip-address +ipRegex = "^((25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])\.){3}(25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])$" + +# Define a function to +# validate an Ip address +def checkIPV4(ip): + # pass the regular expression + # and the string in search() method + if(re.search(ipRegex, ip)): + return True + else: + return False + + diff --git a/pialert/internet.py b/pialert/internet.py new file mode 100644 index 00000000..0f58f6b9 --- /dev/null +++ b/pialert/internet.py @@ -0,0 +1,195 @@ +""" internet related functions to support Pi.Alert """ + +import subprocess +import re + +# pialert modules +from database import updateState +from helper import timeNow +from logger import append_line_to_file, mylog +from const import logPath +from conf import DDNS_ACTIVE, DDNS_DOMAIN, DDNS_UPDATE_URL, DDNS_PASSWORD, DDNS_USER + + + +# need to find a better way to deal with settings ! +#global DDNS_ACTIVE, DDNS_DOMAIN, DDNS_UPDATE_URL, DDNS_USER, DDNS_PASSWORD + + +#=============================================================================== +# INTERNET IP CHANGE +#=============================================================================== +def check_internet_IP (db, DIG_GET_IP_ARG): + + # Header + updateState(db,"Scan: Internet IP") + mylog('verbose', ['[', timeNow(), '] Check Internet IP:']) + + # Get Internet IP + mylog('verbose', [' Retrieving Internet IP:']) + internet_IP = get_internet_IP(DIG_GET_IP_ARG) + # TESTING - Force IP + # internet_IP = "1.2.3.4" + + # Check result = IP + if internet_IP == "" : + mylog('none', [' Error retrieving Internet IP']) + mylog('none', [' Exiting...']) + return False + mylog('verbose', [' ', internet_IP]) + + # Get previous stored IP + mylog('verbose', [' Retrieving previous IP:']) + previous_IP = get_previous_internet_IP (db) + mylog('verbose', [' ', previous_IP]) + + # Check IP Change + if internet_IP != previous_IP : + mylog('info', [' New internet IP: ', internet_IP]) + save_new_internet_IP (db, internet_IP) + + else : + mylog('verbose', [' No changes to perform']) + + # Get Dynamic DNS IP + if DDNS_ACTIVE : + mylog('verbose', [' Retrieving Dynamic DNS IP']) + dns_IP = get_dynamic_DNS_IP() + + # Check Dynamic DNS IP + if dns_IP == "" or dns_IP == "0.0.0.0" : + mylog('info', [' Error retrieving Dynamic DNS IP']) + mylog('info', [' ', dns_IP]) + + # Check DNS Change + if dns_IP != internet_IP : + mylog('info', [' Updating Dynamic DNS IP']) + message = set_dynamic_DNS_IP () + mylog('info', [' ', message]) + else : + mylog('verbose', [' No changes to perform']) + else : + mylog('verbose', [' Skipping Dynamic DNS update']) + + + +#------------------------------------------------------------------------------- +def get_internet_IP (DIG_GET_IP_ARG): + # BUGFIX #46 - curl http://ipv4.icanhazip.com repeatedly is very slow + # Using 'dig' + dig_args = ['dig', '+short'] + DIG_GET_IP_ARG.strip().split() + try: + cmd_output = subprocess.check_output (dig_args, universal_newlines=True) + except subprocess.CalledProcessError as e: + mylog('none', [e.output]) + cmd_output = '' # no internet + + # Check result is an IP + IP = check_IP_format (cmd_output) + + # Handle invalid response + if IP == '': + IP = '0.0.0.0' + + return IP + +#------------------------------------------------------------------------------- +def get_previous_internet_IP (db): + + previous_IP = '0.0.0.0' + + # get previous internet IP stored in DB + db.sql.execute ("SELECT dev_LastIP FROM Devices WHERE dev_MAC = 'Internet' ") + result = db.sql.fetchone() + + db.commitDB() + + if result is not None and len(result) > 0 : + previous_IP = result[0] + + # return previous IP + return previous_IP + + + +#------------------------------------------------------------------------------- +def save_new_internet_IP (db, pNewIP): + # Log new IP into logfile + append_line_to_file (logPath + '/IP_changes.log', + '['+str(timeNow()) +']\t'+ pNewIP +'\n') + + prevIp = get_previous_internet_IP(db) + # Save event + db.sql.execute ("""INSERT INTO Events (eve_MAC, eve_IP, eve_DateTime, + eve_EventType, eve_AdditionalInfo, + eve_PendingAlertEmail) + VALUES ('Internet', ?, ?, 'Internet IP Changed', + 'Previous Internet IP: '|| ?, 1) """, + (pNewIP, timeNow(), prevIp) ) + + # Save new IP + db.sql.execute ("""UPDATE Devices SET dev_LastIP = ? + WHERE dev_MAC = 'Internet' """, + (pNewIP,) ) + + # commit changes + db.commitDB() + +#------------------------------------------------------------------------------- +def check_IP_format (pIP): + # Check IP format + IPv4SEG = r'(?:25[0-5]|(?:2[0-4]|1{0,1}[0-9]){0,1}[0-9])' + IPv4ADDR = r'(?:(?:' + IPv4SEG + r'\.){3,3}' + IPv4SEG + r')' + IP = re.search(IPv4ADDR, pIP) + + # Return error if not IP + if IP is None : + return "" + + # Return IP + return IP.group(0) + + + +#------------------------------------------------------------------------------- +def get_dynamic_DNS_IP (): + # Using OpenDNS server + # dig_args = ['dig', '+short', DDNS_DOMAIN, '@resolver1.opendns.com'] + + # Using default DNS server + dig_args = ['dig', '+short', DDNS_DOMAIN] + + try: + # try runnning a subprocess + dig_output = subprocess.check_output (dig_args, universal_newlines=True) + except subprocess.CalledProcessError as e: + # An error occured, handle it + mylog('none', [e.output]) + dig_output = '' # probably no internet + + # Check result is an IP + IP = check_IP_format (dig_output) + + # Handle invalid response + if IP == '': + IP = '0.0.0.0' + + return IP + +#------------------------------------------------------------------------------- +def set_dynamic_DNS_IP (): + try: + # try runnning a subprocess + # Update Dynamic IP + curl_output = subprocess.check_output (['curl', '-s', + DDNS_UPDATE_URL + + 'username=' + DDNS_USER + + '&password=' + DDNS_PASSWORD + + '&hostname=' + DDNS_DOMAIN], + universal_newlines=True) + except subprocess.CalledProcessError as e: + # An error occured, handle it + mylog('none', [e.output]) + curl_output = "" + + return curl_output diff --git a/pialert/logger.py b/pialert/logger.py index 154a1082..d090f79d 100644 --- a/pialert/logger.py +++ b/pialert/logger.py @@ -1,6 +1,8 @@ """ Colection of functions to support all logging for Pi.Alert """ - +import sys +import io import datetime + from const import * @@ -77,3 +79,14 @@ def logResult (stdout, stderr): if stdout != None: append_file_binary (logPath + '/stdout.log', stdout) +#------------------------------------------------------------------------------- +def append_line_to_file (pPath, pText): + # append the line depending using the correct python version + if sys.version_info < (3, 0): + file = io.open (pPath , mode='a', encoding='utf-8') + file.write ( pText.decode('unicode_escape') ) + file.close() + else: + file = open (pPath, 'a', encoding='utf-8') + file.write (pText) + file.close() \ No newline at end of file diff --git a/pialert/pialert.py b/pialert/pialert.py index cc0edbb8..33fbe889 100755 --- a/pialert/pialert.py +++ b/pialert/pialert.py @@ -38,16 +38,25 @@ from paho.mqtt import client as mqtt_client import threading from pathlib import Path from cron_converter import Cron -from pytz import timezone + from json2table import convert import hashlib import multiprocessing + +# pialert modules from const import * -from logger import mylog, print_log, logResult -from helper import filePermissions, timeNow, updateSubnets +from conf import * +# from config import DIG_GET_IP_ARG, ENABLE_PLUGINS +from logger import append_line_to_file, mylog, print_log, logResult +from helper import bytes_to_string, checkIPV4, filePermissions, importConfigs, timeNow, updateSubnets, write_file from database import * -from config import ccd +from internet import check_IP_format, check_internet_IP, get_internet_IP +from api import update_api +from files import get_file_content +from plugin import execute_plugin, get_plugin_setting, plugin_object_class, print_plugin_info + + # Global variables @@ -65,242 +74,6 @@ sql_connection = None - - -#=============================================================================== -# Initialise user defined values -#=============================================================================== -# We need access to the DB to save new values so need to define DB access methods first -#------------------------------------------------------------------------------- - - -#------------------------------------------------------------------------------- - -def importConfigs (db): - - # Specify globals so they can be overwritten with the new config - global lastTimeImported, mySettings, mySettingsSQLsafe, plugins, plugins_once_run - # General - global ENABLE_ARPSCAN, SCAN_SUBNETS, LOG_LEVEL, TIMEZONE, ENABLE_PLUGINS, PIALERT_WEB_PROTECTION, PIALERT_WEB_PASSWORD, INCLUDED_SECTIONS, SCAN_CYCLE_MINUTES, DAYS_TO_KEEP_EVENTS, REPORT_DASHBOARD_URL, DIG_GET_IP_ARG, UI_LANG - # Email - global REPORT_MAIL, SMTP_SERVER, SMTP_PORT, REPORT_TO, REPORT_FROM, SMTP_SKIP_LOGIN, SMTP_USER, SMTP_PASS, SMTP_SKIP_TLS, SMTP_FORCE_SSL - # Webhooks - global REPORT_WEBHOOK, WEBHOOK_URL, WEBHOOK_PAYLOAD, WEBHOOK_REQUEST_METHOD - # Apprise - global REPORT_APPRISE, APPRISE_HOST, APPRISE_URL, APPRISE_PAYLOAD - # NTFY - global REPORT_NTFY, NTFY_HOST, NTFY_TOPIC, NTFY_USER, NTFY_PASSWORD - # PUSHSAFER - global REPORT_PUSHSAFER, PUSHSAFER_TOKEN - # MQTT - global REPORT_MQTT, MQTT_BROKER, MQTT_PORT, MQTT_USER, MQTT_PASSWORD, MQTT_QOS, MQTT_DELAY_SEC - # DynDNS - global DDNS_ACTIVE, DDNS_DOMAIN, DDNS_USER, DDNS_PASSWORD, DDNS_UPDATE_URL - # PiHole - global PIHOLE_ACTIVE, DHCP_ACTIVE - # Pholus - global PHOLUS_ACTIVE, PHOLUS_TIMEOUT, PHOLUS_FORCE, PHOLUS_DAYS_DATA, PHOLUS_RUN, PHOLUS_RUN_SCHD, PHOLUS_RUN_TIMEOUT - # Nmap - global NMAP_ACTIVE, NMAP_TIMEOUT, NMAP_RUN, NMAP_RUN_SCHD, NMAP_ARGS - # API - global API_CUSTOM_SQL - - # get config file - config_file = Path(fullConfPath) - - # Skip import if last time of import is NEWER than file age - if (os.path.getmtime(config_file) < lastTimeImported) : - return - - mySettings = [] # reset settings - mySettingsSQLsafe = [] # same as above but safe to be passed into a SQL query - - # load the variables from pialert.conf - code = compile(config_file.read_text(), config_file.name, "exec") - c_d = {} # config dictionary - exec(code, {"__builtins__": {}}, c_d) - - # Import setting if found in the dictionary - # General - ENABLE_ARPSCAN = ccd('ENABLE_ARPSCAN', True , c_d, 'Enable arpscan', 'boolean', '', 'General', ['run']) - SCAN_SUBNETS = ccd('SCAN_SUBNETS', ['192.168.1.0/24 --interface=eth1', '192.168.1.0/24 --interface=eth0'] , c_d, 'Subnets to scan', 'subnets', '', 'General') - LOG_LEVEL = ccd('LOG_LEVEL', 'verbose' , c_d, 'Log verboseness', 'selecttext', "['none', 'minimal', 'verbose', 'debug']", 'General') - TIMEZONE = ccd('TIMEZONE', 'Europe/Berlin' , c_d, 'Time zone', 'text', '', 'General') - ENABLE_PLUGINS = ccd('ENABLE_PLUGINS', True , c_d, 'Enable plugins', 'boolean', '', 'General') - PIALERT_WEB_PROTECTION = ccd('PIALERT_WEB_PROTECTION', False , c_d, 'Enable logon', 'boolean', '', 'General') - PIALERT_WEB_PASSWORD = ccd('PIALERT_WEB_PASSWORD', '8d969eef6ecad3c29a3a629280e686cf0c3f5d5a86aff3ca12020c923adc6c92' , c_d, 'Logon password', 'readonly', '', 'General') - INCLUDED_SECTIONS = ccd('INCLUDED_SECTIONS', ['internet', 'new_devices', 'down_devices', 'events', 'ports'] , c_d, 'Notify on', 'multiselect', "['internet', 'new_devices', 'down_devices', 'events', 'ports', 'plugins']", 'General') - SCAN_CYCLE_MINUTES = ccd('SCAN_CYCLE_MINUTES', 5 , c_d, 'Scan cycle delay (m)', 'integer', '', 'General') - DAYS_TO_KEEP_EVENTS = ccd('DAYS_TO_KEEP_EVENTS', 90 , c_d, 'Delete events days', 'integer', '', 'General') - REPORT_DASHBOARD_URL = ccd('REPORT_DASHBOARD_URL', 'http://pi.alert/' , c_d, 'PiAlert URL', 'text', '', 'General') - DIG_GET_IP_ARG = ccd('DIG_GET_IP_ARG', '-4 myip.opendns.com @resolver1.opendns.com' , c_d, 'DIG arguments', 'text', '', 'General') - UI_LANG = ccd('UI_LANG', 'English' , c_d, 'Language Interface', 'selecttext', "['English', 'German', 'Spanish']", 'General') - UI_PRESENCE = ccd('UI_PRESENCE', ['online', 'offline', 'archived'] , c_d, 'Include in presence', 'multiselect', "['online', 'offline', 'archived']", 'General') - - # Email - REPORT_MAIL = ccd('REPORT_MAIL', False , c_d, 'Enable email', 'boolean', '', 'Email', ['test']) - SMTP_SERVER = ccd('SMTP_SERVER', '' , c_d,'SMTP server URL', 'text', '', 'Email') - SMTP_PORT = ccd('SMTP_PORT', 587 , c_d, 'SMTP port', 'integer', '', 'Email') - REPORT_TO = ccd('REPORT_TO', 'user@gmail.com' , c_d, 'Email to', 'text', '', 'Email') - REPORT_FROM = ccd('REPORT_FROM', 'Pi.Alert ' , c_d, 'Email Subject', 'text', '', 'Email') - SMTP_SKIP_LOGIN = ccd('SMTP_SKIP_LOGIN', False , c_d, 'SMTP skip login', 'boolean', '', 'Email') - SMTP_USER = ccd('SMTP_USER', '' , c_d, 'SMTP user', 'text', '', 'Email') - SMTP_PASS = ccd('SMTP_PASS', '' , c_d, 'SMTP password', 'password', '', 'Email') - SMTP_SKIP_TLS = ccd('SMTP_SKIP_TLS', False , c_d, 'SMTP skip TLS', 'boolean', '', 'Email') - SMTP_FORCE_SSL = ccd('SMTP_FORCE_SSL', False , c_d, 'Force SSL', 'boolean', '', 'Email') - - # Webhooks - REPORT_WEBHOOK = ccd('REPORT_WEBHOOK', False , c_d, 'Enable Webhooks', 'boolean', '', 'Webhooks', ['test']) - WEBHOOK_URL = ccd('WEBHOOK_URL', '' , c_d, 'Target URL', 'text', '', 'Webhooks') - WEBHOOK_PAYLOAD = ccd('WEBHOOK_PAYLOAD', 'json' , c_d, 'Payload type', 'selecttext', "['json', 'html', 'text']", 'Webhooks') - WEBHOOK_REQUEST_METHOD = ccd('WEBHOOK_REQUEST_METHOD', 'GET' , c_d, 'Req type', 'selecttext', "['GET', 'POST', 'PUT']", 'Webhooks') - - # Apprise - REPORT_APPRISE = ccd('REPORT_APPRISE', False , c_d, 'Enable Apprise', 'boolean', '', 'Apprise', ['test']) - APPRISE_HOST = ccd('APPRISE_HOST', '' , c_d, 'Apprise host URL', 'text', '', 'Apprise') - APPRISE_URL = ccd('APPRISE_URL', '' , c_d, 'Apprise notification URL', 'text', '', 'Apprise') - APPRISE_PAYLOAD = ccd('APPRISE_PAYLOAD', 'html' , c_d, 'Payload type', 'selecttext', "['html', 'text']", 'Apprise') - - # NTFY - REPORT_NTFY = ccd('REPORT_NTFY', False , c_d, 'Enable NTFY', 'boolean', '', 'NTFY', ['test']) - NTFY_HOST = ccd('NTFY_HOST', 'https://ntfy.sh' , c_d, 'NTFY host URL', 'text', '', 'NTFY') - NTFY_TOPIC = ccd('NTFY_TOPIC', '' , c_d, 'NTFY topic', 'text', '', 'NTFY') - NTFY_USER = ccd('NTFY_USER', '' , c_d, 'NTFY user', 'text', '', 'NTFY') - NTFY_PASSWORD = ccd('NTFY_PASSWORD', '' , c_d, 'NTFY password', 'password', '', 'NTFY') - - # PUSHSAFER - REPORT_PUSHSAFER = ccd('REPORT_PUSHSAFER', False , c_d, 'Enable PUSHSAFER', 'boolean', '', 'PUSHSAFER', ['test']) - PUSHSAFER_TOKEN = ccd('PUSHSAFER_TOKEN', 'ApiKey' , c_d, 'PUSHSAFER token', 'text', '', 'PUSHSAFER') - - # MQTT - REPORT_MQTT = ccd('REPORT_MQTT', False , c_d, 'Enable MQTT', 'boolean', '', 'MQTT') - MQTT_BROKER = ccd('MQTT_BROKER', '' , c_d, 'MQTT broker', 'text', '', 'MQTT') - MQTT_PORT = ccd('MQTT_PORT', 1883 , c_d, 'MQTT broker port', 'integer', '', 'MQTT') - MQTT_USER = ccd('MQTT_USER', '' , c_d, 'MQTT user', 'text', '', 'MQTT') - MQTT_PASSWORD = ccd('MQTT_PASSWORD', '' , c_d, 'MQTT password', 'password', '', 'MQTT') - MQTT_QOS = ccd('MQTT_QOS', 0 , c_d, 'MQTT Quality of Service', 'selectinteger', "['0', '1', '2']", 'MQTT') - MQTT_DELAY_SEC = ccd('MQTT_DELAY_SEC', 2 , c_d, 'MQTT delay', 'selectinteger', "['2', '3', '4', '5']", 'MQTT') - - # DynDNS - DDNS_ACTIVE = ccd('DDNS_ACTIVE', False , c_d, 'Enable DynDNS', 'boolean', '', 'DynDNS') - DDNS_DOMAIN = ccd('DDNS_DOMAIN', 'your_domain.freeddns.org' , c_d, 'DynDNS domain URL', 'text', '', 'DynDNS') - DDNS_USER = ccd('DDNS_USER', 'dynu_user' , c_d, 'DynDNS user', 'text', '', 'DynDNS') - DDNS_PASSWORD = ccd('DDNS_PASSWORD', 'A0000000B0000000C0000000D0000000' , c_d, 'DynDNS password', 'password', '', 'DynDNS') - DDNS_UPDATE_URL = ccd('DDNS_UPDATE_URL', 'https://api.dynu.com/nic/update?' , c_d, 'DynDNS update URL', 'text', '', 'DynDNS') - - # PiHole - PIHOLE_ACTIVE = ccd('PIHOLE_ACTIVE', False, c_d, 'Enable PiHole mapping', 'boolean', '', 'PiHole') - DHCP_ACTIVE = ccd('DHCP_ACTIVE', False , c_d, 'Enable PiHole DHCP', 'boolean', '', 'PiHole') - - # PHOLUS - PHOLUS_ACTIVE = ccd('PHOLUS_ACTIVE', False , c_d, 'Enable Pholus scans', 'boolean', '', 'Pholus') - PHOLUS_TIMEOUT = ccd('PHOLUS_TIMEOUT', 20 , c_d, 'Pholus timeout', 'integer', '', 'Pholus') - PHOLUS_FORCE = ccd('PHOLUS_FORCE', False , c_d, 'Pholus force check', 'boolean', '', 'Pholus') - PHOLUS_RUN = ccd('PHOLUS_RUN', 'once' , c_d, 'Pholus enable schedule', 'selecttext', "['none', 'once', 'schedule']", 'Pholus') - PHOLUS_RUN_TIMEOUT = ccd('PHOLUS_RUN_TIMEOUT', 600 , c_d, 'Pholus timeout schedule', 'integer', '', 'Pholus') - PHOLUS_RUN_SCHD = ccd('PHOLUS_RUN_SCHD', '0 4 * * *' , c_d, 'Pholus schedule', 'text', '', 'Pholus') - PHOLUS_DAYS_DATA = ccd('PHOLUS_DAYS_DATA', 0 , c_d, 'Pholus keep days', 'integer', '', 'Pholus') - - # Nmap - NMAP_ACTIVE = ccd('NMAP_ACTIVE', True , c_d, 'Enable Nmap scans', 'boolean', '', 'Nmap') - NMAP_TIMEOUT = ccd('NMAP_TIMEOUT', 150 , c_d, 'Nmap timeout', 'integer', '', 'Nmap') - NMAP_RUN = ccd('NMAP_RUN', 'none' , c_d, 'Nmap enable schedule', 'selecttext', "['none', 'once', 'schedule']", 'Nmap') - NMAP_RUN_SCHD = ccd('NMAP_RUN_SCHD', '0 2 * * *' , c_d, 'Nmap schedule', 'text', '', 'Nmap') - NMAP_ARGS = ccd('NMAP_ARGS', '-p -10000' , c_d, 'Nmap custom arguments', 'text', '', 'Nmap') - - # API - API_CUSTOM_SQL = ccd('API_CUSTOM_SQL', 'SELECT * FROM Devices WHERE dev_PresentLastScan = 0' , c_d, 'Custom endpoint', 'text', '', 'API') - - # Prepare scheduler - global tz, mySchedules, plugins - - # Init timezone in case it changed - tz = timezone(TIMEZONE) - - # reset schedules - mySchedules = [] - - # init pholus schedule - pholusSchedule = Cron(PHOLUS_RUN_SCHD).schedule(start_date=datetime.datetime.now(tz)) - mySchedules.append(schedule_class("pholus", pholusSchedule, pholusSchedule.next(), False)) - - # init nmap schedule - nmapSchedule = Cron(NMAP_RUN_SCHD).schedule(start_date=datetime.datetime.now(tz)) - mySchedules.append(schedule_class("nmap", nmapSchedule, nmapSchedule.next(), False)) - - # Format and prepare the list of subnets - userSubnets = updateSubnets(SCAN_SUBNETS) - - - - # Plugins START - # ----------------- - if ENABLE_PLUGINS: - plugins = get_plugins_configs() - - mylog('none', ['[', timeNow(), '] Plugins: Number of dynamically loaded plugins: ', len(plugins)]) - - # handle plugins - for plugin in plugins: - print_plugin_info(plugin, ['display_name','description']) - - pref = plugin["unique_prefix"] - - # if plugin["enabled"] == 'true': - - # collect plugin level language strings - collect_lang_strings(db, plugin, pref) - - for set in plugin["settings"]: - setFunction = set["function"] - # Setting code name / key - key = pref + "_" + setFunction - - v = ccd(key, set["default_value"], c_d, set["name"][0]["string"], set["type"] , str(set["options"]), pref) - - # Save the user defined value into the object - set["value"] = v - - # Setup schedules - if setFunction == 'RUN_SCHD': - newSchedule = Cron(v).schedule(start_date=datetime.datetime.now(tz)) - mySchedules.append(schedule_class(pref, newSchedule, newSchedule.next(), False)) - - # Collect settings related language strings - collect_lang_strings(db, set, pref + "_" + set["function"]) - - plugins_once_run = False - # ----------------- - # Plugins END - - - - - - # Insert settings into the DB - sql.execute ("DELETE FROM Settings") - sql.executemany ("""INSERT INTO Settings ("Code_Name", "Display_Name", "Description", "Type", "Options", - "RegEx", "Value", "Group", "Events" ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""", mySettingsSQLsafe) - - # Used to determine the next import - lastTimeImported = time.time() - - # Is used to display a message in the UI when old (outdated) settings are loaded - initOrSetParam(db, "Back_Settings_Imported",(round(time.time() * 1000),) ) - - #commitDB(sql_connection) - db.commitDB() - - # update only the settings datasource - update_api(False, ["settings"]) - - mylog('info', ['[', timeNow(), '] Config: Imported new config']) - - - - - - #=============================================================================== #=============================================================================== # MAIN @@ -349,7 +122,7 @@ def main (): #=============================================================================== - # This is the mail loop of Pi.Alert + # This is the main loop of Pi.Alert #=============================================================================== while True: @@ -396,7 +169,7 @@ def main (): if last_internet_IP_scan + datetime.timedelta(minutes=3) < time_started: cycle = 'internet_IP' last_internet_IP_scan = time_started - check_internet_IP() + check_internet_IP(db,DIG_GET_IP_ARG) # Update vendors once a week if last_update_vendors + datetime.timedelta(days = 7) < time_started: @@ -523,180 +296,7 @@ def main (): #loop time.sleep(5) # wait for N seconds - -#=============================================================================== -# INTERNET IP CHANGE -#=============================================================================== -def check_internet_IP (): - # Header - updateState(db,"Scan: Internet IP") - mylog('verbose', ['[', startTime, '] Check Internet IP:']) - - # Get Internet IP - mylog('verbose', [' Retrieving Internet IP:']) - internet_IP = get_internet_IP() - # TESTING - Force IP - # internet_IP = "1.2.3.4" - - # Check result = IP - if internet_IP == "" : - mylog('none', [' Error retrieving Internet IP']) - mylog('none', [' Exiting...']) - return False - mylog('verbose', [' ', internet_IP]) - - # Get previous stored IP - mylog('verbose', [' Retrieving previous IP:']) - previous_IP = get_previous_internet_IP (db) - mylog('verbose', [' ', previous_IP]) - - # Check IP Change - if internet_IP != previous_IP : - mylog('info', [' New internet IP: ', internet_IP]) - save_new_internet_IP (db, internet_IP) - - else : - mylog('verbose', [' No changes to perform']) - - # Get Dynamic DNS IP - if DDNS_ACTIVE : - mylog('verbose', [' Retrieving Dynamic DNS IP']) - dns_IP = get_dynamic_DNS_IP() - - # Check Dynamic DNS IP - if dns_IP == "" or dns_IP == "0.0.0.0" : - mylog('info', [' Error retrieving Dynamic DNS IP']) - mylog('info', [' ', dns_IP]) - - # Check DNS Change - if dns_IP != internet_IP : - mylog('info', [' Updating Dynamic DNS IP']) - message = set_dynamic_DNS_IP () - mylog('info', [' ', message]) - else : - mylog('verbose', [' No changes to perform']) - else : - mylog('verbose', [' Skipping Dynamic DNS update']) - - - -#------------------------------------------------------------------------------- -def get_internet_IP (): - # BUGFIX #46 - curl http://ipv4.icanhazip.com repeatedly is very slow - # Using 'dig' - dig_args = ['dig', '+short'] + DIG_GET_IP_ARG.strip().split() - try: - cmd_output = subprocess.check_output (dig_args, universal_newlines=True) - except subprocess.CalledProcessError as e: - mylog('none', [e.output]) - cmd_output = '' # no internet - - # Check result is an IP - IP = check_IP_format (cmd_output) - - # Handle invalid response - if IP == '': - IP = '0.0.0.0' - - return IP - -#------------------------------------------------------------------------------- -def get_dynamic_DNS_IP (): - # Using OpenDNS server - # dig_args = ['dig', '+short', DDNS_DOMAIN, '@resolver1.opendns.com'] - - # Using default DNS server - dig_args = ['dig', '+short', DDNS_DOMAIN] - - try: - # try runnning a subprocess - dig_output = subprocess.check_output (dig_args, universal_newlines=True) - except subprocess.CalledProcessError as e: - # An error occured, handle it - mylog('none', [e.output]) - dig_output = '' # probably no internet - - # Check result is an IP - IP = check_IP_format (dig_output) - - # Handle invalid response - if IP == '': - IP = '0.0.0.0' - - return IP - -#------------------------------------------------------------------------------- -def set_dynamic_DNS_IP (): - try: - # try runnning a subprocess - # Update Dynamic IP - curl_output = subprocess.check_output (['curl', '-s', - DDNS_UPDATE_URL + - 'username=' + DDNS_USER + - '&password=' + DDNS_PASSWORD + - '&hostname=' + DDNS_DOMAIN], - universal_newlines=True) - except subprocess.CalledProcessError as e: - # An error occured, handle it - mylog('none', [e.output]) - curl_output = "" - - return curl_output - -#------------------------------------------------------------------------------- -def get_previous_internet_IP (db): - - previous_IP = '0.0.0.0' - - # get previous internet IP stored in DB - sql.execute ("SELECT dev_LastIP FROM Devices WHERE dev_MAC = 'Internet' ") - result = sql.fetchone() - - db.commitDB() - - if result is not None and len(result) > 0 : - previous_IP = result[0] - - # return previous IP - return previous_IP - -#------------------------------------------------------------------------------- -def save_new_internet_IP (db, pNewIP): - # Log new IP into logfile - append_line_to_file (logPath + '/IP_changes.log', - '['+str(startTime) +']\t'+ pNewIP +'\n') - - prevIp = get_previous_internet_IP(db) - # Save event - sql.execute ("""INSERT INTO Events (eve_MAC, eve_IP, eve_DateTime, - eve_EventType, eve_AdditionalInfo, - eve_PendingAlertEmail) - VALUES ('Internet', ?, ?, 'Internet IP Changed', - 'Previous Internet IP: '|| ?, 1) """, - (pNewIP, startTime, prevIp) ) - - # Save new IP - sql.execute ("""UPDATE Devices SET dev_LastIP = ? - WHERE dev_MAC = 'Internet' """, - (pNewIP,) ) - - # commit changes - db.commitDB() - -#------------------------------------------------------------------------------- -def check_IP_format (pIP): - # Check IP format - IPv4SEG = r'(?:25[0-5]|(?:2[0-4]|1{0,1}[0-9]){0,1}[0-9])' - IPv4ADDR = r'(?:(?:' + IPv4SEG + r'\.){3,3}' + IPv4SEG + r')' - IP = re.search(IPv4ADDR, pIP) - - # Return error if not IP - if IP is None : - return "" - - # Return IP - return IP.group(0) @@ -1023,7 +623,7 @@ def save_scanned_devices (p_arpscan_devices, p_cycle_interval): cycle) ) # Check Internet connectivity - internet_IP = get_internet_IP() + internet_IP = get_internet_IP(DIG_GET_IP_ARG) # TESTING - Force IP # internet_IP = "" if internet_IP != "" : @@ -2802,183 +2402,22 @@ def to_binary_sensor(input): return result -#=============================================================================== -# API -#=============================================================================== -def update_api(isNotification = False, updateOnlyDataSources = []): - - folder = pialertPath + '/front/api/' - - if isNotification: - # Update last notification alert in all formats - mylog('verbose', [' [API] Updating notification_* files in /front/api']) - - write_file(folder + 'notification_text.txt' , mail_text) - write_file(folder + 'notification_text.html' , mail_html) - write_file(folder + 'notification_json_final.json' , json.dumps(json_final)) - - # Save plugins - if ENABLE_PLUGINS: - write_file(folder + 'plugins.json' , json.dumps({"data" : plugins})) - - # prepare database tables we want to expose - dataSourcesSQLs = [ - ["devices", sql_devices_all], - ["nmap_scan", sql_nmap_scan_all], - ["pholus_scan", sql_pholus_scan_all], - ["events_pending_alert", sql_events_pending_alert], - ["settings", sql_settings], - ["plugins_events", sql_plugins_events], - ["plugins_history", sql_plugins_history], - ["plugins_objects", sql_plugins_objects], - ["language_strings", sql_language_strings], - ["custom_endpoint", API_CUSTOM_SQL], - ] - - # Save selected database tables - for dsSQL in dataSourcesSQLs: - - if updateOnlyDataSources == [] or dsSQL[0] in updateOnlyDataSources: - - api_endpoint_class(dsSQL[1], folder + 'table_' + dsSQL[0] + '.json') -#------------------------------------------------------------------------------- -apiEndpoints = [] -class api_endpoint_class: - def __init__(self, sql, path): - global apiEndpoints - - self.sql = sql - self.jsonData = get_table_as_json(sql).json - self.path = path - self.fileName = path.split('/')[-1] - self.hash = hash(json.dumps(self.jsonData)) - - # check if the endpoint needs to be updated - found = False - changed = False - changedIndex = -1 - index = 0 - - # search previous endpoint states to check if API needs updating - for endpoint in apiEndpoints: - # match sql and API endpoint path - if endpoint.sql == self.sql and endpoint.path == self.path: - found = True - if endpoint.hash != self.hash: - changed = True - changedIndex = index - - index = index + 1 - - # cehck if API endpoints have changed or if it's a new one - if not found or changed: - - mylog('verbose', [f' [API] Updating {self.fileName} file in /front/api']) - - write_file(self.path, json.dumps(self.jsonData)) - - if not found: - apiEndpoints.append(self) - - elif changed and changedIndex != -1 and changedIndex < len(apiEndpoints): - # update hash - apiEndpoints[changedIndex].hash = self.hash - else: - mylog('info', [f' [API] ERROR Updating {self.fileName}']) - - -#------------------------------------------------------------------------------- -def get_table_as_json(sqlQuery): - - sql.execute(sqlQuery) - - columnNames = list(map(lambda x: x[0], sql.description)) - - rows = sql.fetchall() - - result = {"data":[]} - - for row in rows: - tmp = row_to_json(columnNames, row) - result["data"].append(tmp) - return json_struc(result, columnNames) - -#------------------------------------------------------------------------------- -class json_struc: - def __init__(self, jsn, columnNames): - self.json = jsn - self.columnNames = columnNames - -#------------------------------------------------------------------------------- -# Creates a JSON object from a DB row -def row_to_json(names, row): - - rowEntry = {} - - index = 0 - for name in names: - rowEntry[name]= if_byte_then_to_str(row[name]) - index += 1 - - return rowEntry #=============================================================================== # UTIL #=============================================================================== -#------------------------------------------------------------------------------- -def write_file (pPath, pText): - # Write the text depending using the correct python version - if sys.version_info < (3, 0): - file = io.open (pPath , mode='w', encoding='utf-8') - file.write ( pText.decode('unicode_escape') ) - file.close() - else: - file = open (pPath, 'w', encoding='utf-8') - if pText is None: - pText = "" - file.write (pText) - file.close() -#------------------------------------------------------------------------------- -def append_line_to_file (pPath, pText): - # append the line depending using the correct python version - if sys.version_info < (3, 0): - file = io.open (pPath , mode='a', encoding='utf-8') - file.write ( pText.decode('unicode_escape') ) - file.close() - else: - file = open (pPath, 'a', encoding='utf-8') - file.write (pText) - file.close() -#------------------------------------------------------------------------------- -# Make a regular expression -# for validating an Ip-address -ipRegex = "^((25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])\.){3}(25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])$" -# Define a function to -# validate an Ip address -def checkIPV4(ip): - # pass the regular expression - # and the string in search() method - if(re.search(ipRegex, ip)): - return True - else: - return False -#------------------------------------------------------------------------------- -def get_file_content(path): - f = open(path, 'r') - content = f.read() - f.close() - return content + #------------------------------------------------------------------------------- @@ -2993,21 +2432,9 @@ def sanitize_string(input): value = bytes_to_string(re.sub('[^a-zA-Z0-9-_\s]', '', str(input))) return value -#------------------------------------------------------------------------------- -def if_byte_then_to_str(input): - if isinstance(input, bytes): - input = input.decode('utf-8') - input = bytes_to_string(re.sub('[^a-zA-Z0-9-_\s]', '', str(input))) - return input -#------------------------------------------------------------------------------- -def bytes_to_string(value): - # if value is of type bytes, convert to string - if isinstance(value, bytes): - value = value.decode('utf-8') - return value #------------------------------------------------------------------------------- @@ -3157,36 +2584,9 @@ def handle_test(testType): mylog('info', ['[', timeNow(), '] END Test: ', testType]) -#------------------------------------------------------------------------------- -# Return setting value -def get_setting_value(key): - - set = get_setting(key) - if get_setting(key) is not None: - setVal = set[6] # setting value - setTyp = set[3] # setting type - return setVal - - return '' - -#------------------------------------------------------------------------------- -# Return whole setting touple -def get_setting(key): - result = None - # index order: key, name, desc, inputtype, options, regex, result, group, events - for set in mySettings: - if set[0] == key: - result = set - - if result is None: - mylog('info', [' Error - setting_missing - Setting not found for key: ', key]) - mylog('info', [' Error - logging the settings into file: ', logPath + '/setting_missing.json']) - write_file (logPath + '/setting_missing.json', json.dumps({ 'data' : mySettings})) - - return result #------------------------------------------------------------------------------- def isNewVersion(db): @@ -3227,39 +2627,6 @@ def isNewVersion(db): #------------------------------------------------------------------------------- # Plugins #------------------------------------------------------------------------------- -#------------------------------------------------------------------------------- -def get_plugins_configs(): - - pluginsList = [] - - # only top level directories required. No need for the loop - # for root, dirs, files in os.walk(pluginsPath): - - dirs = next(os.walk(pluginsPath))[1] - for d in dirs: # Loop over directories, not files - pluginsList.append(json.loads(get_file_content(pluginsPath + "/" + d + '/config.json'))) - - return pluginsList - -#------------------------------------------------------------------------------- -def collect_lang_strings(db, json, pref): - - for prop in json["localized"]: - for language_string in json[prop]: - import_language_string(db, language_string["language_code"], pref + "_" + prop, language_string["string"]) - - -#------------------------------------------------------------------------------- -def import_language_string(db, code, key, value, extra = ""): - - sql.execute ("""INSERT INTO Plugins_Language_Strings ("Language_Code", "String_Key", "String_Value", "Extra") VALUES (?, ?, ?, ?)""", (str(code), str(key), str(value), str(extra))) - - db.commitDB() - - -#------------------------------------------------------------------------------- -def custom_plugin_decoder(pluginDict): - return namedtuple('X', pluginDict.keys())(*pluginDict.values()) #------------------------------------------------------------------------------- def run_plugin_scripts(db, runType): @@ -3296,521 +2663,11 @@ def run_plugin_scripts(db, runType): print_plugin_info(plugin, ['display_name']) mylog('debug', [' [Plugins] CMD: ', get_plugin_setting(plugin, "CMD")["value"]]) - execute_plugin(plugin) - -#------------------------------------------------------------------------------- -# Executes the plugin command specified in the setting with the function specified as CMD -def execute_plugin(db, plugin): - - # ------- necessary settings check -------- - set = get_plugin_setting(plugin, "CMD") - - # handle missing "function":"CMD" setting - if set == None: - return - - set_CMD = set["value"] - - set = get_plugin_setting(plugin, "RUN_TIMEOUT") - - # handle missing "function":"_TIMEOUT" setting - if set == None: - set_RUN_TIMEOUT = 10 - else: - set_RUN_TIMEOUT = set["value"] - - mylog('debug', [' [Plugins] Timeout: ', set_RUN_TIMEOUT]) - - # Prepare custom params - params = [] - - if "params" in plugin: - for param in plugin["params"]: - resolved = "" - - # Get setting value - if param["type"] == "setting": - resolved = get_setting(param["value"]) - - if resolved != None: - resolved = plugin_param_from_glob_set(resolved) - - # Get Sql result - if param["type"] == "sql": - resolved = flatten_array(db.get_sql_array(param["value"])) - - if resolved == None: - mylog('none', [' [Plugins] The parameter "name":"', param["name"], '" was resolved as None']) - - else: - params.append( [param["name"], resolved] ) - - - # build SQL query parameters to insert into the DB - sqlParams = [] - - # python-script - if plugin['data_source'] == 'python-script': - # ------- prepare params -------- - # prepare command from plugin settings, custom parameters - command = resolve_wildcards_arr(set_CMD.split(), params) - - # Execute command - mylog('verbose', [' [Plugins] Executing: ', set_CMD]) - mylog('debug', [' [Plugins] Resolved : ', command]) - - try: - # try runnning a subprocess with a forced timeout in case the subprocess hangs - output = subprocess.check_output (command, universal_newlines=True, stderr=subprocess.STDOUT, timeout=(set_RUN_TIMEOUT)) - except subprocess.CalledProcessError as e: - # An error occured, handle it - mylog('none', [e.output]) - mylog('none', [' [Plugins] Error - enable LOG_LEVEL=debug and check logs']) - except subprocess.TimeoutExpired as timeErr: - mylog('none', [' [Plugins] TIMEOUT - the process forcefully terminated as timeout reached']) - - - # check the last run output - f = open(pluginsPath + '/' + plugin["code_name"] + '/last_result.log', 'r+') - newLines = f.read().split('\n') - f.close() - - # cleanup - select only lines containing a separator to filter out unnecessary data - newLines = list(filter(lambda x: '|' in x, newLines)) - - # # regular logging - # for line in newLines: - # append_line_to_file (pluginsPath + '/plugin.log', line +'\n') - - for line in newLines: - columns = line.split("|") - # There has to be always 9 columns - if len(columns) == 9: - sqlParams.append((plugin["unique_prefix"], columns[0], columns[1], 'null', columns[2], columns[3], columns[4], columns[5], columns[6], 0, columns[7], 'null', columns[8])) - else: - mylog('none', [' [Plugins]: Skipped invalid line in the output: ', line]) - - # pialert-db-query - if plugin['data_source'] == 'pialert-db-query': - # replace single quotes wildcards - q = set_CMD.replace("{s-quote}", '\'') - - # Execute command - mylog('verbose', [' [Plugins] Executing: ', q]) - - # set_CMD should contain a SQL query - arr = db.get_sql_array (q) - - for row in arr: - # There has to be always 9 columns - if len(row) == 9 and (row[0] in ['','null']) == False : - sqlParams.append((plugin["unique_prefix"], row[0], handle_empty(row[1]), 'null', row[2], row[3], row[4], handle_empty(row[5]), handle_empty(row[6]), 0, row[7], 'null', row[8])) - else: - mylog('none', [' [Plugins]: Skipped invalid sql result']) - - - # check if the subprocess / SQL query failed / there was no valid output - if len(sqlParams) == 0: - mylog('none', [' [Plugins] No output received from the plugin ', plugin["unique_prefix"], ' - enable LOG_LEVEL=debug and check logs']) - return - else: - mylog('verbose', ['[', timeNow(), '] [Plugins]: SUCCESS, received ', len(sqlParams), ' entries']) - - # process results if any - if len(sqlParams) > 0: - sql.executemany ("""INSERT INTO Plugins_Events ("Plugin", "Object_PrimaryID", "Object_SecondaryID", "DateTimeCreated", "DateTimeChanged", "Watched_Value1", "Watched_Value2", "Watched_Value3", "Watched_Value4", "Status" ,"Extra", "UserData", "ForeignKey") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", sqlParams) - db.commitDB() - sql.executemany ("""INSERT INTO Plugins_History ("Plugin", "Object_PrimaryID", "Object_SecondaryID", "DateTimeCreated", "DateTimeChanged", "Watched_Value1", "Watched_Value2", "Watched_Value3", "Watched_Value4", "Status" ,"Extra", "UserData", "ForeignKey") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", sqlParams) - db.commitDB() - - process_plugin_events(plugin) - - # update API endpoints - update_api(False, ["plugins_events","plugins_objects"]) - - -#------------------------------------------------------------------------------- -# Handle empty value -def handle_empty(value): - if value == '' or value is None: - value = 'null' - - return value - -#------------------------------------------------------------------------------- -# Check if watched values changed for the given plugin -def process_plugin_events(db, plugin): - - global pluginObjects, pluginEvents - - pluginPref = plugin["unique_prefix"] - - mylog('debug', [' [Plugins] Processing : ', pluginPref]) - - plugObjectsArr = db.get_sql_array ("SELECT * FROM Plugins_Objects where Plugin = '" + str(pluginPref)+"'") - plugEventsArr = db.get_sql_array ("SELECT * FROM Plugins_Events where Plugin = '" + str(pluginPref)+"'") - - pluginObjects = [] - pluginEvents = [] - - for obj in plugObjectsArr: - pluginObjects.append(plugin_object_class(plugin, obj)) - - existingPluginObjectsCount = len(pluginObjects) - - mylog('debug', [' [Plugins] Existing objects : ', existingPluginObjectsCount]) - mylog('debug', [' [Plugins] New and existing events : ', len(plugEventsArr)]) - - # set status as new - will be changed later if conditions are fulfilled, e.g. entry found - for eve in plugEventsArr: - tmpObject = plugin_object_class(plugin, eve) - tmpObject.status = "new" - pluginEvents.append(tmpObject) - - - # Update the status to "exists" - index = 0 - for tmpObjFromEvent in pluginEvents: - - # compare hash of the IDs for uniqueness - if any(x.idsHash == tmpObject.idsHash for x in pluginObjects): - mylog('debug', [' [Plugins] Found existing object']) - pluginEvents[index].status = "exists" - index += 1 - - # Loop thru events and update the one that exist to determine if watched columns changed - index = 0 - for tmpObjFromEvent in pluginEvents: - - if tmpObjFromEvent.status == "exists": - - # compare hash of the changed watched columns for uniqueness - if any(x.watchedHash != tmpObject.watchedHash for x in pluginObjects): - pluginEvents[index].status = "watched-changed" - else: - pluginEvents[index].status = "watched-not-changed" - index += 1 - - # Merge existing plugin objects with newly discovered ones and update existing ones with new values - for eveObj in pluginEvents: - if eveObj.status == 'new': - pluginObjects.append(eveObj) - else: - index = 0 - for plugObj in pluginObjects: - # find corresponding object for the event and merge - if plugObj.idsHash == eveObj.idsHash: - pluginObjects[index] = combine_plugin_objects(plugObj, eveObj) - - index += 1 - - # Update the DB - # ---------------------------- - - # Update the Plugin_Objects - for plugObj in pluginObjects: - - createdTime = plugObj.created - - if plugObj.status == 'new': - - createdTime = plugObj.changed - - sql.execute ("INSERT INTO Plugins_Objects (Plugin, Object_PrimaryID, Object_SecondaryID, DateTimeCreated, DateTimeChanged, Watched_Value1, Watched_Value2, Watched_Value3, Watched_Value4, Status, Extra, UserData, ForeignKey) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?)", (plugObj.pluginPref, plugObj.primaryId , plugObj.secondaryId , createdTime, plugObj.changed , plugObj.watched1 , plugObj.watched2 , plugObj.watched3 , plugObj.watched4 , plugObj.status , plugObj.extra, plugObj.userData, plugObj.foreignKey )) - else: - sql.execute (f"UPDATE Plugins_Objects set Plugin = '{plugObj.pluginPref}', DateTimeChanged = '{plugObj.changed}', Watched_Value1 = '{plugObj.watched1}', Watched_Value2 = '{plugObj.watched2}', Watched_Value3 = '{plugObj.watched3}', Watched_Value4 = '{plugObj.watched4}', Status = '{plugObj.status}', Extra = '{plugObj.extra}', ForeignKey = '{plugObj.foreignKey}' WHERE \"Index\" = {plugObj.index}") - - # Update the Plugins_Events with the new statuses - sql.execute (f'DELETE FROM Plugins_Events where Plugin = "{pluginPref}"') - - for plugObj in pluginEvents: - - createdTime = plugObj.created - - # use the same datetime for created and changed if a new entry - if plugObj.status == 'new': - createdTime = plugObj.changed - - # insert only events if they are to be reported on - if plugObj.status in get_plugin_setting_value(plugin, "REPORT_ON"): - - sql.execute ("INSERT INTO Plugins_Events (Plugin, Object_PrimaryID, Object_SecondaryID, DateTimeCreated, DateTimeChanged, Watched_Value1, Watched_Value2, Watched_Value3, Watched_Value4, Status, Extra, UserData, ForeignKey) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?)", (plugObj.pluginPref, plugObj.primaryId , plugObj.secondaryId , createdTime, plugObj.changed , plugObj.watched1 , plugObj.watched2 , plugObj.watched3 , plugObj.watched4 , plugObj.status , plugObj.extra, plugObj.userData, plugObj.foreignKey )) - - # Perform databse table mapping if enabled for the plugin - if len(pluginEvents) > 0 and "mapped_to_table" in plugin: - - sqlParams = [] - - dbTable = plugin['mapped_to_table'] - - mylog('debug', [' [Plugins] Mapping objects to database table: ', dbTable]) - - # collect all columns to be mapped - mappedCols = [] - columnsStr = '' - valuesStr = '' - - for clmn in plugin['database_column_definitions']: - if 'mapped_to_column' in clmn: - mappedCols.append(clmn) - columnsStr = f'{columnsStr}, "{clmn["mapped_to_column"]}"' - valuesStr = f'{valuesStr}, ?' - - if len(columnsStr) > 0: - columnsStr = columnsStr[1:] # remove first ',' - valuesStr = valuesStr[1:] # remove first ',' - - # map the column names to plugin object event values - for plgEv in pluginEvents: - - tmpList = [] - - for col in mappedCols: - if col['column'] == 'Index': - tmpList.append(plgEv.index) - elif col['column'] == 'Plugin': - tmpList.append(plgEv.pluginPref) - elif col['column'] == 'Object_PrimaryID': - tmpList.append(plgEv.primaryId) - elif col['column'] == 'Object_SecondaryID': - tmpList.append(plgEv.secondaryId) - elif col['column'] == 'DateTimeCreated': - tmpList.append(plgEv.created) - elif col['column'] == 'DateTimeChanged': - tmpList.append(plgEv.changed) - elif col['column'] == 'Watched_Value1': - tmpList.append(plgEv.watched1) - elif col['column'] == 'Watched_Value2': - tmpList.append(plgEv.watched2) - elif col['column'] == 'Watched_Value3': - tmpList.append(plgEv.watched3) - elif col['column'] == 'Watched_Value4': - tmpList.append(plgEv.watched4) - elif col['column'] == 'UserData': - tmpList.append(plgEv.userData) - elif col['column'] == 'Extra': - tmpList.append(plgEv.extra) - elif col['column'] == 'Status': - tmpList.append(plgEv.status) - - sqlParams.append(tuple(tmpList)) - - q = f'INSERT into {dbTable} ({columnsStr}) VALUES ({valuesStr})' - - mylog('debug', [' [Plugins] SQL query for mapping: ', q ]) - - sql.executemany (q, sqlParams) - - db.commitDB() - -#------------------------------------------------------------------------------- -class plugin_object_class: - def __init__(self, plugin, objDbRow): - self.index = objDbRow[0] - self.pluginPref = objDbRow[1] - self.primaryId = objDbRow[2] - self.secondaryId = objDbRow[3] - self.created = objDbRow[4] - self.changed = objDbRow[5] - self.watched1 = objDbRow[6] - self.watched2 = objDbRow[7] - self.watched3 = objDbRow[8] - self.watched4 = objDbRow[9] - self.status = objDbRow[10] - self.extra = objDbRow[11] - self.userData = objDbRow[12] - self.foreignKey = objDbRow[13] - - # self.idsHash = str(hash(str(self.primaryId) + str(self.secondaryId))) - self.idsHash = str(self.primaryId) + str(self.secondaryId) - - self.watchedClmns = [] - self.watchedIndxs = [] - - setObj = get_plugin_setting(plugin, 'WATCH') - - indexNameColumnMapping = [(6, 'Watched_Value1' ), (7, 'Watched_Value2' ), (8, 'Watched_Value3' ), (9, 'Watched_Value4' )] - - if setObj is not None: - - self.watchedClmns = setObj["value"] - - for clmName in self.watchedClmns: - for mapping in indexNameColumnMapping: - if clmName == indexNameColumnMapping[1]: - self.watchedIndxs.append(indexNameColumnMapping[0]) - - tmp = '' - for indx in self.watchedIndxs: - tmp += str(objDbRow[indx]) - - self.watchedHash = str(hash(tmp)) - - -#------------------------------------------------------------------------------- -# Combine plugin objects, keep user-defined values, created time, changed time if nothing changed and the index -def combine_plugin_objects(old, new): - - new.userData = old.userData - new.index = old.index - new.created = old.created - - # Keep changed time if nothing changed - if new.status in ['watched-not-changed']: - new.changed = old.changed - - # return the new object, with some of the old values - return new - -#------------------------------------------------------------------------------- -# Replace {wildcars} with parameters -def resolve_wildcards_arr(commandArr, params): - - mylog('debug', [' [Plugins]: Pre-Resolved CMD: '] + commandArr) - - for param in params: - # mylog('debug', [' [Plugins]: key : {', param[0], '}']) - # mylog('debug', [' [Plugins]: resolved: ', param[1]]) - - i = 0 - - for comPart in commandArr: - - commandArr[i] = comPart.replace('{' + param[0] + '}', param[1]).replace('{s-quote}',"'") - - i += 1 - - return commandArr - -#------------------------------------------------------------------------------- -# Flattens a setting to make it passable to a script -def plugin_param_from_glob_set(globalSetting): - - setVal = globalSetting[6] # setting value - setTyp = globalSetting[3] # setting type - - - noConversion = ['text', 'integer', 'boolean', 'password', 'readonly', 'selectinteger', 'selecttext' ] - arrayConversion = ['multiselect', 'list'] - - if setTyp in noConversion: - return setVal - - if setTyp in arrayConversion: - return flatten_array(setVal) - - -#------------------------------------------------------------------------------- -# Gets the whole setting object -def get_plugin_setting(plugin, function_key): - - result = None - - for set in plugin['settings']: - if set["function"] == function_key: - result = set - - if result == None: - mylog('none', [' [Plugins] Setting with "function":"', function_key, '" is missing in plugin: ', get_plugin_string(plugin, 'display_name')]) - - return result - -#------------------------------------------------------------------------------- -# Gets the setting value -def get_plugin_setting_value(plugin, function_key): - - resultObj = get_plugin_setting(plugin, function_key) - - if resultObj != None: - return resultObj["value"] - - return None - - -#------------------------------------------------------------------------------- -# Get localized string value on the top JSON depth, not recursive -def get_plugin_string(props, el): - - result = '' - - if el in props['localized']: - for str in props[el]: - if str['language_code'] == 'en_us': - result = str['string'] - - if result == '': - result = 'en_us string missing' - - else: - result = props[el] - - return result - -#------------------------------------------------------------------------------- -def print_plugin_info(plugin, elements = ['display_name']): - - mylog('verbose', [' [Plugins] ---------------------------------------------']) - - for el in elements: - res = get_plugin_string(plugin, el) - mylog('verbose', [' [Plugins] ', el ,': ', res]) - -#------------------------------------------------------------------------------- -def flatten_array(arr): - - tmp = '' - - mylog('debug', arr) - - for arrayItem in arr: - # only one column flattening is supported - if isinstance(arrayItem, list): - arrayItem = str(arrayItem[0]) - - tmp += arrayItem + ',' - # tmp = tmp.replace("'","").replace(' ','') # No single quotes or empty spaces allowed - tmp = tmp.replace("'","") # No single quotes allowed - - return tmp[:-1] # Remove last comma ',' - - + execute_plugin(plugin) #------------------------------------------------------------------------------- # Cron-like Scheduling -#------------------------------------------------------------------------------- -class schedule_class: - def __init__(self, service, scheduleObject, last_next_schedule, was_last_schedule_used, last_run = 0): - self.service = service - self.scheduleObject = scheduleObject - self.last_next_schedule = last_next_schedule - self.last_run = last_run - self.was_last_schedule_used = was_last_schedule_used - def runScheduleCheck(self): - result = False - - # Initialize the last run time if never run before - if self.last_run == 0: - self.last_run = (datetime.datetime.now(tz) - timedelta(days=365)).replace(microsecond=0) - - # get the current time with the currently specified timezone - nowTime = datetime.datetime.now(tz).replace(microsecond=0) - - # Run the schedule if the current time is past the schedule time we saved last time and - # (maybe the following check is unnecessary:) - # if the last run is past the last time we run a scheduled Pholus scan - if nowTime > self.last_next_schedule and self.last_run < self.last_next_schedule: - print_log(f'Scheduler run for {self.service}: YES') - self.was_last_schedule_used = True - result = True - else: - print_log(f'Scheduler run for {self.service}: NO') - - if self.was_last_schedule_used: - self.was_last_schedule_used = False - self.last_next_schedule = self.scheduleObject.next() - - return result #=============================================================================== # BEGIN diff --git a/pialert/plugin.py b/pialert/plugin.py new file mode 100644 index 00000000..eb1c1e6c --- /dev/null +++ b/pialert/plugin.py @@ -0,0 +1,556 @@ +import os +import json +import subprocess +import datetime +from collections import namedtuple + +# pialert modules +from const import pluginsPath, logPath +from files import get_file_content, write_file +from logger import mylog +from conf import mySettings +#from api import update_api + + +#------------------------------------------------------------------------------- +# this is duplicated from helper to avoid circular reference !! TO-DO +#------------------------------------------------------------------------------- +def timeNow(): + return datetime.datetime.now().replace(microsecond=0) + +#------------------------------------------------------------------------------- +def get_plugins_configs(): + + pluginsList = [] + + # only top level directories required. No need for the loop + # for root, dirs, files in os.walk(pluginsPath): + + dirs = next(os.walk(pluginsPath))[1] + for d in dirs: # Loop over directories, not files + pluginsList.append(json.loads(get_file_content(pluginsPath + "/" + d + '/config.json'))) + + return pluginsList + + + +#------------------------------------------------------------------------------- +def print_plugin_info(plugin, elements = ['display_name']): + + mylog('verbose', [' [Plugins] ---------------------------------------------']) + + for el in elements: + res = get_plugin_string(plugin, el) + mylog('verbose', [' [Plugins] ', el ,': ', res]) + + +#------------------------------------------------------------------------------- +# Gets the whole setting object +def get_plugin_setting(plugin, function_key): + + result = None + + for set in plugin['settings']: + if set["function"] == function_key: + result = set + + if result == None: + mylog('none', [' [Plugins] Setting with "function":"', function_key, '" is missing in plugin: ', get_plugin_string(plugin, 'display_name')]) + + return result + +#------------------------------------------------------------------------------- +# Return whole setting touple +def get_setting(key): + result = None + # index order: key, name, desc, inputtype, options, regex, result, group, events + for set in mySettings: + if set[0] == key: + result = set + + if result is None: + mylog('info', [' Error - setting_missing - Setting not found for key: ', key]) + mylog('info', [' Error - logging the settings into file: ', logPath + '/setting_missing.json']) + write_file (logPath + '/setting_missing.json', json.dumps({ 'data' : mySettings})) + + return result + + +#------------------------------------------------------------------------------- +# Get localized string value on the top JSON depth, not recursive +def get_plugin_string(props, el): + + result = '' + + if el in props['localized']: + for str in props[el]: + if str['language_code'] == 'en_us': + result = str['string'] + + if result == '': + result = 'en_us string missing' + + else: + result = props[el] + + return result + + +#------------------------------------------------------------------------------- +# Executes the plugin command specified in the setting with the function specified as CMD +def execute_plugin(db, plugin): + sql = db.sql + + # ------- necessary settings check -------- + set = get_plugin_setting(plugin, "CMD") + + # handle missing "function":"CMD" setting + if set == None: + return + + set_CMD = set["value"] + + set = get_plugin_setting(plugin, "RUN_TIMEOUT") + + # handle missing "function":"_TIMEOUT" setting + if set == None: + set_RUN_TIMEOUT = 10 + else: + set_RUN_TIMEOUT = set["value"] + + mylog('debug', [' [Plugins] Timeout: ', set_RUN_TIMEOUT]) + + # Prepare custom params + params = [] + + if "params" in plugin: + for param in plugin["params"]: + resolved = "" + + # Get setting value + if param["type"] == "setting": + resolved = get_setting(param["value"]) + + if resolved != None: + resolved = plugin_param_from_glob_set(resolved) + + # Get Sql result + if param["type"] == "sql": + resolved = flatten_array(db.get_sql_array(param["value"])) + + if resolved == None: + mylog('none', [' [Plugins] The parameter "name":"', param["name"], '" was resolved as None']) + + else: + params.append( [param["name"], resolved] ) + + + # build SQL query parameters to insert into the DB + sqlParams = [] + + # python-script + if plugin['data_source'] == 'python-script': + # ------- prepare params -------- + # prepare command from plugin settings, custom parameters + command = resolve_wildcards_arr(set_CMD.split(), params) + + # Execute command + mylog('verbose', [' [Plugins] Executing: ', set_CMD]) + mylog('debug', [' [Plugins] Resolved : ', command]) + + try: + # try runnning a subprocess with a forced timeout in case the subprocess hangs + output = subprocess.check_output (command, universal_newlines=True, stderr=subprocess.STDOUT, timeout=(set_RUN_TIMEOUT)) + except subprocess.CalledProcessError as e: + # An error occured, handle it + mylog('none', [e.output]) + mylog('none', [' [Plugins] Error - enable LOG_LEVEL=debug and check logs']) + except subprocess.TimeoutExpired as timeErr: + mylog('none', [' [Plugins] TIMEOUT - the process forcefully terminated as timeout reached']) + + + # check the last run output + f = open(pluginsPath + '/' + plugin["code_name"] + '/last_result.log', 'r+') + newLines = f.read().split('\n') + f.close() + + # cleanup - select only lines containing a separator to filter out unnecessary data + newLines = list(filter(lambda x: '|' in x, newLines)) + + # # regular logging + # for line in newLines: + # append_line_to_file (pluginsPath + '/plugin.log', line +'\n') + + for line in newLines: + columns = line.split("|") + # There has to be always 9 columns + if len(columns) == 9: + sqlParams.append((plugin["unique_prefix"], columns[0], columns[1], 'null', columns[2], columns[3], columns[4], columns[5], columns[6], 0, columns[7], 'null', columns[8])) + else: + mylog('none', [' [Plugins]: Skipped invalid line in the output: ', line]) + + # pialert-db-query + if plugin['data_source'] == 'pialert-db-query': + # replace single quotes wildcards + q = set_CMD.replace("{s-quote}", '\'') + + # Execute command + mylog('verbose', [' [Plugins] Executing: ', q]) + + # set_CMD should contain a SQL query + arr = db.get_sql_array (q) + + for row in arr: + # There has to be always 9 columns + if len(row) == 9 and (row[0] in ['','null']) == False : + sqlParams.append((plugin["unique_prefix"], row[0], handle_empty(row[1]), 'null', row[2], row[3], row[4], handle_empty(row[5]), handle_empty(row[6]), 0, row[7], 'null', row[8])) + else: + mylog('none', [' [Plugins]: Skipped invalid sql result']) + + + # check if the subprocess / SQL query failed / there was no valid output + if len(sqlParams) == 0: + mylog('none', [' [Plugins] No output received from the plugin ', plugin["unique_prefix"], ' - enable LOG_LEVEL=debug and check logs']) + return + else: + mylog('verbose', ['[', timeNow(), '] [Plugins]: SUCCESS, received ', len(sqlParams), ' entries']) + + # process results if any + if len(sqlParams) > 0: + sql.executemany ("""INSERT INTO Plugins_Events ("Plugin", "Object_PrimaryID", "Object_SecondaryID", "DateTimeCreated", "DateTimeChanged", "Watched_Value1", "Watched_Value2", "Watched_Value3", "Watched_Value4", "Status" ,"Extra", "UserData", "ForeignKey") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", sqlParams) + db.commitDB() + sql.executemany ("""INSERT INTO Plugins_History ("Plugin", "Object_PrimaryID", "Object_SecondaryID", "DateTimeCreated", "DateTimeChanged", "Watched_Value1", "Watched_Value2", "Watched_Value3", "Watched_Value4", "Status" ,"Extra", "UserData", "ForeignKey") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", sqlParams) + db.commitDB() + + process_plugin_events(plugin) + + # update API endpoints + # update_api(False, ["plugins_events","plugins_objects"]) # TO-DO - remover circular reference + +#------------------------------------------------------------------------------- +def custom_plugin_decoder(pluginDict): + return namedtuple('X', pluginDict.keys())(*pluginDict.values()) + +#------------------------------------------------------------------------------- +# Handle empty value +def handle_empty(value): + if value == '' or value is None: + value = 'null' + + return value + + +#------------------------------------------------------------------------------- +# Flattens a setting to make it passable to a script +def plugin_param_from_glob_set(globalSetting): + + setVal = globalSetting[6] # setting value + setTyp = globalSetting[3] # setting type + + + noConversion = ['text', 'integer', 'boolean', 'password', 'readonly', 'selectinteger', 'selecttext' ] + arrayConversion = ['multiselect', 'list'] + + if setTyp in noConversion: + return setVal + + if setTyp in arrayConversion: + return flatten_array(setVal) + +#------------------------------------------------------------------------------- +# Gets the setting value +def get_plugin_setting_value(plugin, function_key): + + resultObj = get_plugin_string(plugin, function_key) + + if resultObj != None: + return resultObj["value"] + + return None + +#------------------------------------------------------------------------------- +# Return setting value +def get_setting_value(key): + + set = get_setting(key) + + if get_setting(key) is not None: + + setVal = set[6] # setting value + setTyp = set[3] # setting type + + return setVal + + return '' + +#------------------------------------------------------------------------------- +def flatten_array(arr): + + tmp = '' + + mylog('debug', arr) + + for arrayItem in arr: + # only one column flattening is supported + if isinstance(arrayItem, list): + arrayItem = str(arrayItem[0]) + + tmp += arrayItem + ',' + # tmp = tmp.replace("'","").replace(' ','') # No single quotes or empty spaces allowed + tmp = tmp.replace("'","") # No single quotes allowed + + return tmp[:-1] # Remove last comma ',' + + +#------------------------------------------------------------------------------- +# Replace {wildcars} with parameters +def resolve_wildcards_arr(commandArr, params): + + mylog('debug', [' [Plugins]: Pre-Resolved CMD: '] + commandArr) + + for param in params: + # mylog('debug', [' [Plugins]: key : {', param[0], '}']) + # mylog('debug', [' [Plugins]: resolved: ', param[1]]) + + i = 0 + + for comPart in commandArr: + + commandArr[i] = comPart.replace('{' + param[0] + '}', param[1]).replace('{s-quote}',"'") + + i += 1 + + return commandArr + + +#------------------------------------------------------------------------------- +# Combine plugin objects, keep user-defined values, created time, changed time if nothing changed and the index +def combine_plugin_objects(old, new): + + new.userData = old.userData + new.index = old.index + new.created = old.created + + # Keep changed time if nothing changed + if new.status in ['watched-not-changed']: + new.changed = old.changed + + # return the new object, with some of the old values + return new + +#------------------------------------------------------------------------------- +# Check if watched values changed for the given plugin +def process_plugin_events(db, plugin): + sql = db.sql + + global pluginObjects, pluginEvents + + pluginPref = plugin["unique_prefix"] + + mylog('debug', [' [Plugins] Processing : ', pluginPref]) + + plugObjectsArr = db.get_sql_array ("SELECT * FROM Plugins_Objects where Plugin = '" + str(pluginPref)+"'") + plugEventsArr = db.get_sql_array ("SELECT * FROM Plugins_Events where Plugin = '" + str(pluginPref)+"'") + + pluginObjects = [] + pluginEvents = [] + + for obj in plugObjectsArr: + pluginObjects.append(plugin_object_class(plugin, obj)) + + existingPluginObjectsCount = len(pluginObjects) + + mylog('debug', [' [Plugins] Existing objects : ', existingPluginObjectsCount]) + mylog('debug', [' [Plugins] New and existing events : ', len(plugEventsArr)]) + + # set status as new - will be changed later if conditions are fulfilled, e.g. entry found + for eve in plugEventsArr: + tmpObject = plugin_object_class(plugin, eve) + tmpObject.status = "new" + pluginEvents.append(tmpObject) + + + # Update the status to "exists" + index = 0 + for tmpObjFromEvent in pluginEvents: + + # compare hash of the IDs for uniqueness + if any(x.idsHash == tmpObject.idsHash for x in pluginObjects): + mylog('debug', [' [Plugins] Found existing object']) + pluginEvents[index].status = "exists" + index += 1 + + # Loop thru events and update the one that exist to determine if watched columns changed + index = 0 + for tmpObjFromEvent in pluginEvents: + + if tmpObjFromEvent.status == "exists": + + # compare hash of the changed watched columns for uniqueness + if any(x.watchedHash != tmpObject.watchedHash for x in pluginObjects): + pluginEvents[index].status = "watched-changed" + else: + pluginEvents[index].status = "watched-not-changed" + index += 1 + + # Merge existing plugin objects with newly discovered ones and update existing ones with new values + for eveObj in pluginEvents: + if eveObj.status == 'new': + pluginObjects.append(eveObj) + else: + index = 0 + for plugObj in pluginObjects: + # find corresponding object for the event and merge + if plugObj.idsHash == eveObj.idsHash: + pluginObjects[index] = combine_plugin_objects(plugObj, eveObj) + + index += 1 + +# Update the DB + # ---------------------------- + + # Update the Plugin_Objects + for plugObj in pluginObjects: + + createdTime = plugObj.created + + if plugObj.status == 'new': + + createdTime = plugObj.changed + + sql.execute ("INSERT INTO Plugins_Objects (Plugin, Object_PrimaryID, Object_SecondaryID, DateTimeCreated, DateTimeChanged, Watched_Value1, Watched_Value2, Watched_Value3, Watched_Value4, Status, Extra, UserData, ForeignKey) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?)", (plugObj.pluginPref, plugObj.primaryId , plugObj.secondaryId , createdTime, plugObj.changed , plugObj.watched1 , plugObj.watched2 , plugObj.watched3 , plugObj.watched4 , plugObj.status , plugObj.extra, plugObj.userData, plugObj.foreignKey )) + else: + sql.execute (f"UPDATE Plugins_Objects set Plugin = '{plugObj.pluginPref}', DateTimeChanged = '{plugObj.changed}', Watched_Value1 = '{plugObj.watched1}', Watched_Value2 = '{plugObj.watched2}', Watched_Value3 = '{plugObj.watched3}', Watched_Value4 = '{plugObj.watched4}', Status = '{plugObj.status}', Extra = '{plugObj.extra}', ForeignKey = '{plugObj.foreignKey}' WHERE \"Index\" = {plugObj.index}") + + # Update the Plugins_Events with the new statuses + sql.execute (f'DELETE FROM Plugins_Events where Plugin = "{pluginPref}"') + + for plugObj in pluginEvents: + + createdTime = plugObj.created + + # use the same datetime for created and changed if a new entry + if plugObj.status == 'new': + createdTime = plugObj.changed + + # insert only events if they are to be reported on + if plugObj.status in get_plugin_setting_value(plugin, "REPORT_ON"): + + sql.execute ("INSERT INTO Plugins_Events (Plugin, Object_PrimaryID, Object_SecondaryID, DateTimeCreated, DateTimeChanged, Watched_Value1, Watched_Value2, Watched_Value3, Watched_Value4, Status, Extra, UserData, ForeignKey) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?)", (plugObj.pluginPref, plugObj.primaryId , plugObj.secondaryId , createdTime, plugObj.changed , plugObj.watched1 , plugObj.watched2 , plugObj.watched3 , plugObj.watched4 , plugObj.status , plugObj.extra, plugObj.userData, plugObj.foreignKey )) + + # Perform databse table mapping if enabled for the plugin + if len(pluginEvents) > 0 and "mapped_to_table" in plugin: + + sqlParams = [] + + dbTable = plugin['mapped_to_table'] + + mylog('debug', [' [Plugins] Mapping objects to database table: ', dbTable]) + + # collect all columns to be mapped + mappedCols = [] + columnsStr = '' + valuesStr = '' + + for clmn in plugin['database_column_definitions']: + if 'mapped_to_column' in clmn: + mappedCols.append(clmn) + columnsStr = f'{columnsStr}, "{clmn["mapped_to_column"]}"' + valuesStr = f'{valuesStr}, ?' + + if len(columnsStr) > 0: + columnsStr = columnsStr[1:] # remove first ',' + valuesStr = valuesStr[1:] # remove first ',' + + # map the column names to plugin object event values + for plgEv in pluginEvents: + + tmpList = [] + + for col in mappedCols: + if col['column'] == 'Index': + tmpList.append(plgEv.index) + elif col['column'] == 'Plugin': + tmpList.append(plgEv.pluginPref) + elif col['column'] == 'Object_PrimaryID': + tmpList.append(plgEv.primaryId) + elif col['column'] == 'Object_SecondaryID': + tmpList.append(plgEv.secondaryId) + elif col['column'] == 'DateTimeCreated': + tmpList.append(plgEv.created) + elif col['column'] == 'DateTimeChanged': + tmpList.append(plgEv.changed) + elif col['column'] == 'Watched_Value1': + tmpList.append(plgEv.watched1) + elif col['column'] == 'Watched_Value2': + tmpList.append(plgEv.watched2) + elif col['column'] == 'Watched_Value3': + tmpList.append(plgEv.watched3) + elif col['column'] == 'Watched_Value4': + tmpList.append(plgEv.watched4) + elif col['column'] == 'UserData': + tmpList.append(plgEv.userData) + elif col['column'] == 'Extra': + tmpList.append(plgEv.extra) + elif col['column'] == 'Status': + tmpList.append(plgEv.status) + + sqlParams.append(tuple(tmpList)) + + q = f'INSERT into {dbTable} ({columnsStr}) VALUES ({valuesStr})' + + mylog('debug', [' [Plugins] SQL query for mapping: ', q ]) + + sql.executemany (q, sqlParams) + + db.commitDB() + + + + + +#------------------------------------------------------------------------------- +class plugin_object_class: + def __init__(self, plugin, objDbRow): + self.index = objDbRow[0] + self.pluginPref = objDbRow[1] + self.primaryId = objDbRow[2] + self.secondaryId = objDbRow[3] + self.created = objDbRow[4] + self.changed = objDbRow[5] + self.watched1 = objDbRow[6] + self.watched2 = objDbRow[7] + self.watched3 = objDbRow[8] + self.watched4 = objDbRow[9] + self.status = objDbRow[10] + self.extra = objDbRow[11] + self.userData = objDbRow[12] + self.foreignKey = objDbRow[13] + + # self.idsHash = str(hash(str(self.primaryId) + str(self.secondaryId))) + self.idsHash = str(self.primaryId) + str(self.secondaryId) + + self.watchedClmns = [] + self.watchedIndxs = [] + + setObj = get_plugin_setting(plugin, 'WATCH') + + indexNameColumnMapping = [(6, 'Watched_Value1' ), (7, 'Watched_Value2' ), (8, 'Watched_Value3' ), (9, 'Watched_Value4' )] + + if setObj is not None: + + self.watchedClmns = setObj["value"] + + for clmName in self.watchedClmns: + for mapping in indexNameColumnMapping: + if clmName == indexNameColumnMapping[1]: + self.watchedIndxs.append(indexNameColumnMapping[0]) + + tmp = '' + for indx in self.watchedIndxs: + tmp += str(objDbRow[indx]) + + self.watchedHash = str(hash(tmp)) + + +