Files
NetAlertX/pialert/initialise.py
2024-02-14 20:18:12 +11:00

311 lines
13 KiB
Python
Executable File

import os
import time
from pytz import timezone
from cron_converter import Cron
from pathlib import Path
import datetime
import json
import shutil
import re
import conf
from const import fullConfPath
from helper import collect_lang_strings, updateSubnets, initOrSetParam, isJsonObject, updateState
from logger import mylog
from api import update_api
from scheduler import schedule_class
from plugin import print_plugin_info, run_plugin_scripts
from plugin_utils import get_plugins_configs
#===============================================================================
# Initialise user defined values
#===============================================================================
#-------------------------------------------------------------------------------
# Import user values
# Check config dictionary
def ccd(key, default, config_dir, name, inputtype, options, group, events=[], desc = "", regex = "", setJsonMetadata = {}, overrideTemplate = {}):
# use default inintialization value
result = default
if events is None:
events = []
# use existing value if already supplied, otherwise default value is used
if key in config_dir:
result = config_dir[key]
if inputtype == 'text':
result = result.replace('\'', "{s-quote}")
conf.mySettingsSQLsafe.append((key, name, desc, inputtype, options, regex, str(result), group, str(events)))
conf.mySettings.append((key, name, desc, inputtype, options, regex, result, group, str(events)))
# save metadata in dummy setting
if '__metadata' not in key:
tuple = (f'{key}__metadata', "metadata name", "metadata desc", 'json', "", "", json.dumps(setJsonMetadata), group, '[]')
conf.mySettingsSQLsafe.append(tuple)
conf.mySettings.append(tuple)
return result
#-------------------------------------------------------------------------------
def importConfigs (db):
sql = db.sql
# get config file name
config_file = Path(fullConfPath)
# Only import file if the file was modifed since last import.
# this avoids time zone issues as we just compare the previous timestamp to the current time stamp
# rename settings that have changed names due to code cleanup and migration to plugins
renameSettings(config_file)
fileModifiedTime = os.path.getmtime(config_file)
mylog('debug', ['[Import Config] checking config file '])
mylog('debug', ['[Import Config] lastImportedConfFile :', conf.lastImportedConfFile])
mylog('debug', ['[Import Config] fileModifiedTime :', fileModifiedTime])
if (fileModifiedTime == conf.lastImportedConfFile) :
mylog('debug', ['[Import Config] skipping config file import'])
return
# Header
updateState("Import config", showSpinner = True)
# remove all plugin langauge strings
sql.execute("DELETE FROM Plugins_Language_Strings;")
db.commitDB()
mylog('debug', ['[Import Config] importing config file'])
conf.mySettings = [] # reset settings
conf.mySettingsSQLsafe = [] # same as above but safe to be passed into a SQL query
# User values loaded from now
c_d = read_config_file(config_file)
# Import setting if found in the dictionary
# General
# ----------------------------------------
conf.LOG_LEVEL = ccd('LOG_LEVEL', 'verbose' , c_d, 'Log verboseness', 'text.select', "['none', 'minimal', 'verbose', 'debug']", 'General')
conf.TIMEZONE = ccd('TIMEZONE', 'Europe/Berlin' , c_d, 'Time zone', 'text', '', 'General')
conf.PLUGINS_KEEP_HIST = ccd('PLUGINS_KEEP_HIST', 250 , c_d, 'Keep history entries', 'integer', '', 'General')
conf.PIALERT_WEB_PROTECTION = ccd('PIALERT_WEB_PROTECTION', False , c_d, 'Enable logon', 'boolean', '', 'General')
conf.PIALERT_WEB_PASSWORD = ccd('PIALERT_WEB_PASSWORD', '8d969eef6ecad3c29a3a629280e686cf0c3f5d5a86aff3ca12020c923adc6c92' , c_d, 'Logon password', 'readonly', '', 'General')
conf.REPORT_DASHBOARD_URL = ccd('REPORT_DASHBOARD_URL', 'http://pi.alert/' , c_d, 'PiAlert URL', 'text', '', 'General')
conf.UI_LANG = ccd('UI_LANG', 'English' , c_d, 'Language Interface', 'text.select', "['English', 'German', 'Spanish']", 'General')
conf.UI_PRESENCE = ccd('UI_PRESENCE', ['online', 'offline', 'archived'] , c_d, 'Include in presence', 'text.multiselect', "['online', 'offline', 'archived']", 'General')
conf.UI_MY_DEVICES = ccd('UI_MY_DEVICES', ['online', 'offline', 'archived', 'new', 'down'] , c_d, 'Include in My Devices', 'text.multiselect', "['online', 'offline', 'archived', 'new', 'down']", 'General')
conf.UI_NOT_RANDOM_MAC = ccd('UI_NOT_RANDOM_MAC', [] , c_d, 'Exlude from Random Prefix', 'list', "", 'General')
conf.DAYS_TO_KEEP_EVENTS = ccd('DAYS_TO_KEEP_EVENTS', 90 , c_d, 'Delete events days', 'integer', '', 'General')
conf.HRS_TO_KEEP_NEWDEV = ccd('HRS_TO_KEEP_NEWDEV', 0 , c_d, 'Keep new devices for', 'integer', "0", 'General')
conf.API_CUSTOM_SQL = ccd('API_CUSTOM_SQL', 'SELECT * FROM Devices WHERE dev_PresentLastScan = 0' , c_d, 'Custom endpoint', 'text', '', 'General')
conf.NETWORK_DEVICE_TYPES = ccd('NETWORK_DEVICE_TYPES', ['AP', 'Gateway', 'Firewall', 'Hypervisor', 'Powerline', 'Switch', 'WLAN', 'PLC', 'Router','USB LAN Adapter', 'USB WIFI Adapter', 'Internet'] , c_d, 'Network device types', 'list', '', 'General')
# ARPSCAN (+ more settings are provided by the ARPSCAN plugin)
conf.SCAN_SUBNETS = ccd('SCAN_SUBNETS', ['192.168.1.0/24 --interface=eth1', '192.168.1.0/24 --interface=eth0'] , c_d, 'Subnets to scan', 'subnets', '', 'ARPSCAN')
# Init timezone in case it changed
conf.tz = timezone(conf.TIMEZONE)
# TODO cleanup later ----------------------------------------------------------------------------------
# init all time values as we have timezone - all this shoudl be moved into plugin/plugin settings
conf.time_started = datetime.datetime.now(conf.tz)
conf.plugins_once_run = False
# timestamps of last execution times
conf.startTime = conf.time_started
now_minus_24h = conf.time_started - datetime.timedelta(hours = 24)
# set these times to the past to force the first run
conf.last_scan_run = now_minus_24h
conf.last_version_check = now_minus_24h
# TODO cleanup later ----------------------------------------------------------------------------------
# reset schedules
conf.mySchedules = []
# Format and prepare the list of subnets
conf.userSubnets = updateSubnets(conf.SCAN_SUBNETS)
# Plugins START
# -----------------
conf.plugins = get_plugins_configs()
mylog('none', ['[Config] Plugins: Number of dynamically loaded plugins: ', len(conf.plugins)])
# handle plugins
index = 0
for plugin in conf.plugins:
# Header
updateState(f"Import plugin {index} of {len(conf.plugins)}")
index +=1
pref = plugin["unique_prefix"]
print_plugin_info(plugin, ['display_name','description'])
# if plugin["enabled"] == 'true':
stringSqlParams = []
# collect plugin level language strings
stringSqlParams = collect_lang_strings(plugin, pref, stringSqlParams)
for set in plugin["settings"]:
setFunction = set["function"]
# Setting code name / key
key = pref + "_" + setFunction
# set.get() - returns None if not found, set["options"] raises error
# ccd(key, default, config_dir, name, inputtype, options, group, events=[], desc = "", regex = "", setJsonMetadata = {}):
v = ccd(key,
set["default_value"],
c_d,
set["name"][0]["string"],
set["type"] ,
str(set["options"]),
group = pref,
events = set.get("events"),
desc = set["description"][0]["string"],
regex = "",
setJsonMetadata = set)
# Save the user defined value into the object
set["value"] = v
# Setup schedules
if setFunction == 'RUN_SCHD':
newSchedule = Cron(v).schedule(start_date=datetime.datetime.now(conf.tz))
conf.mySchedules.append(schedule_class(pref, newSchedule, newSchedule.next(), False))
# Collect settings related language strings
# Creates an entry with key, for example ARPSCAN_CMD_name
stringSqlParams = collect_lang_strings(set, pref + "_" + set["function"], stringSqlParams)
# Collect column related language strings
for clmn in plugin.get('database_column_definitions', []):
# Creates an entry with key, for example ARPSCAN_Object_PrimaryID_name
stringSqlParams = collect_lang_strings(clmn, pref + "_" + clmn.get("column", ""), stringSqlParams)
# bulk-import language strings
sql.executemany ("""INSERT INTO Plugins_Language_Strings ("Language_Code", "String_Key", "String_Value", "Extra") VALUES (?, ?, ?, ?)""", stringSqlParams )
db.commitDB()
conf.plugins_once_run = False
# -----------------
# Plugins END
# Insert settings into the DB
sql.execute ("DELETE FROM Settings")
sql.executemany ("""INSERT INTO Settings ("Code_Name", "Display_Name", "Description", "Type", "Options",
"RegEx", "Value", "Group", "Events" ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""", conf.mySettingsSQLsafe)
#commitDB(sql_connection)
db.commitDB()
# update only the settings datasource
update_api(db, False, ["settings"])
# run plugins that are modifying the config
run_plugin_scripts(db, 'before_config_save' )
# Used to determine the next import
conf.lastImportedConfFile = os.path.getmtime(config_file)
updateState("Config imported", conf.lastImportedConfFile, conf.lastImportedConfFile, False)
mylog('minimal', '[Config] Imported new config')
#-------------------------------------------------------------------------------
def read_config_file(filename):
"""
retuns dict on the config file key:value pairs
"""
mylog('minimal', '[Config] reading config file')
# load the variables from pialert.conf
code = compile(filename.read_text(), filename.name, "exec")
confDict = {} # config dictionary
exec(code, {"__builtins__": {}}, confDict)
return confDict
#-------------------------------------------------------------------------------
# DEPERECATED soonest after 3/3/2024
# 🤔Idea/TODO: Check and compare versions/timestamps amd only perform a replacement if config/version older than...
replacements = {
r'\bREPORT_TO\b': 'SMTP_REPORT_TO',
r'\bREPORT_FROM\b': 'SMTP_REPORT_FROM',
r'REPORT_MAIL=True': 'SMTP_RUN=\'on_notification\'',
r'REPORT_APPRISE=True': 'APPRISE_RUN=\'on_notification\'',
r'REPORT_NTFY=True': 'NTFY_RUN=\'on_notification\'',
r'REPORT_WEBHOOK=True': 'WEBHOOK_RUN=\'on_notification\'',
r'REPORT_PUSHSAFER=True': 'PUSHSAFER_RUN=\'on_notification\'',
r'REPORT_MQTT=True': 'MQTT_RUN=\'on_notification\'',
r'PIHOLE_CMD=': 'PIHOLE_CMD_OLD=',
r'\bINCLUDED_SECTIONS\b': 'NTFPRCS_INCLUDED_SECTIONS',
r'\bDIG_GET_IP_ARG\b': 'INTRNT_DIG_GET_IP_ARG'
}
def renameSettings(config_file):
# Check if the file contains any of the old setting code names
contains_old_settings = False
# Open the original config_file for reading
with open(str(config_file), 'r') as original_file: # Convert config_file to a string
for line in original_file:
# Use regular expressions with word boundaries to check for the old setting code names
if any(re.search(key, line) for key in replacements.keys()):
contains_old_settings = True
break # Exit the loop if any old setting is found
# If the file contains old settings, proceed with renaming and backup
if contains_old_settings:
# Create a backup file with the suffix "_old_setting_names" and timestamp
timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
backup_file = f"{config_file}_old_setting_names_{timestamp}.bak"
mylog('debug', f'[Config] Old setting names will be replaced and a backup ({backup_file}) of the config created.')
shutil.copy(str(config_file), backup_file) # Convert config_file to a string
# Open the original config_file for reading and create a temporary file for writing
with open(str(config_file), 'r') as original_file, open(str(config_file) + "_temp", 'w') as temp_file: # Convert config_file to a string
for line in original_file:
# Use regular expressions with word boundaries for replacements
for key, value in replacements.items():
line = re.sub(key, value, line)
# Write the modified line to the temporary file
temp_file.write(line)
# Close both files
original_file.close()
temp_file.close()
# Replace the original config_file with the temporary file
shutil.move(str(config_file) + "_temp", str(config_file)) # Convert config_file to a string
else:
mylog('debug', '[Config] No old setting names found in the file. No changes made.')