""" Colection of generic functions to support Pi.Alert """ import io import sys import datetime import os import re import subprocess import pytz from pytz import timezone from datetime import timedelta import json import time from pathlib import Path import requests import conf from const import * from logger import mylog, logResult #------------------------------------------------------------------------------- def timeNowTZ(): if isinstance(conf.TIMEZONE, str): tz = pytz.timezone(conf.TIMEZONE) else: tz = conf.TIMEZONE return datetime.datetime.now(tz).replace(microsecond=0) def timeNow(): return datetime.datetime.now().replace(microsecond=0) #------------------------------------------------------------------------------- # A class to manage the application state and to provide a frontend accessible API point class app_state_class: def __init__(self, currentState, settingsSaved=None, settingsImported=None, showSpinner=False): # json file containing the state to communicate with the frontend stateFile = apiPath + '/app_state.json' # Update self self.currentState = currentState self.lastUpdated = str(timeNowTZ()) # Check if the file exists and init values if os.path.exists(stateFile): with open(stateFile, 'r') as json_file: previousState = json.load(json_file) self.settingsSaved = previousState.get("settingsSaved", 0) self.settingsImported = previousState.get("settingsImported", 0) self.showSpinner = previousState.get("showSpinner", False) else: self.settingsSaved = 0 self.settingsImported = 0 self.showSpinner = False # Overwrite with provided parameters if not None if settingsSaved is not None: self.settingsSaved = settingsSaved if settingsImported is not None: self.settingsImported = settingsImported if showSpinner is not None: self.showSpinner = showSpinner # Update .json file with open(stateFile, 'w') as json_file: json.dump(self, json_file, cls=AppStateEncoder, indent=4) def isSet(self): result = False if self.currentState != "": result = True return result #------------------------------------------------------------------------------- # Checks if the object has a __dict__ attribute. If it does, it assumes that it's an instance of a class and serializes its attributes dynamically. class AppStateEncoder(json.JSONEncoder): def default(self, obj): if hasattr(obj, '__dict__'): # If the object has a '__dict__', assume it's an instance of a class return obj.__dict__ return super().default(obj) #------------------------------------------------------------------------------- def updateState(newState, settingsSaved = None, settingsImported = None, showSpinner = False): state = app_state_class(newState, settingsSaved, settingsImported, showSpinner) #------------------------------------------------------------------------------- def updateSubnets(scan_subnets): subnets = [] # multiple interfaces if type(scan_subnets) is list: for interface in scan_subnets : subnets.append(interface) # one interface only else: subnets.append(scan_subnets) return subnets #------------------------------------------------------------------------------- # check RW access of DB and config file def checkPermissionsOK(): #global confR_access, confW_access, dbR_access, dbW_access confR_access = (os.access(fullConfPath, os.R_OK)) confW_access = (os.access(fullConfPath, os.W_OK)) dbR_access = (os.access(fullDbPath, os.R_OK)) dbW_access = (os.access(fullDbPath, os.W_OK)) mylog('none', ['\n']) mylog('none', ['The container restarted (started). If this is unexpected check https://bit.ly/PiAlertDebug for troubleshooting tips.']) mylog('none', ['\n']) mylog('none', ['Permissions check (All should be True)']) mylog('none', ['------------------------------------------------']) mylog('none', [ " " , confPath , " | " , " READ | " , confR_access]) mylog('none', [ " " , confPath , " | " , " WRITE | " , confW_access]) mylog('none', [ " " , dbPath , " | " , " READ | " , dbR_access]) mylog('none', [ " " , dbPath , " | " , " WRITE | " , dbW_access]) mylog('none', ['------------------------------------------------']) #return dbR_access and dbW_access and confR_access and confW_access return (confR_access, dbR_access) #------------------------------------------------------------------------------- def fixPermissions(): # Try fixing access rights if needed chmodCommands = [] chmodCommands.append(['sudo', 'chmod', 'a+rw', '-R', fullDbPath]) chmodCommands.append(['sudo', 'chmod', 'a+rw', '-R', fullConfPath]) for com in chmodCommands: # Execute command mylog('none', ["[Setup] Attempting to fix permissions."]) try: # try runnning a subprocess result = subprocess.check_output (com, universal_newlines=True) except subprocess.CalledProcessError as e: # An error occured, handle it mylog('none', ["[Setup] Fix Failed. Execute this command manually inside of the container: ", ' '.join(com)]) mylog('none', [e.output]) #------------------------------------------------------------------------------- def initialiseFile(pathToCheck, defaultFile): # if file not readable (missing?) try to copy over the backed-up (default) one if str(os.access(pathToCheck, os.R_OK)) == "False": mylog('none', ["[Setup] ("+ pathToCheck +") file is not readable or missing. Trying to copy over the default one."]) try: # try runnning a subprocess p = subprocess.Popen(["cp", defaultFile , pathToCheck], stdout=subprocess.PIPE, stderr=subprocess.STDOUT) stdout, stderr = p.communicate() if str(os.access(pathToCheck, os.R_OK)) == "False": mylog('none', ["[Setup] Error copying ("+defaultFile+") to ("+pathToCheck+"). Make sure the app has Read & Write access to the parent directory."]) else: mylog('none', ["[Setup] ("+defaultFile+") copied over successfully to ("+pathToCheck+")."]) # write stdout and stderr into .log files for debugging if needed logResult (stdout, stderr) # TO-DO should be changed to mylog except subprocess.CalledProcessError as e: # An error occured, handle it mylog('none', ["[Setup] Error copying ("+defaultFile+"). Make sure the app has Read & Write access to " + pathToCheck]) mylog('none', [e.output]) def filePermissions(): # check and initialize pialert.conf (confR_access, dbR_access) = checkPermissionsOK() # Initial check if confR_access == False: initialiseFile(fullConfPath, "/home/pi/pialert/back/pialert.conf_bak" ) # check and initialize pialert.db if dbR_access == False: initialiseFile(fullDbPath, "/home/pi/pialert/back/pialert.db_bak") # last attempt fixPermissions() #------------------------------------------------------------------------------- def bytes_to_string(value): # if value is of type bytes, convert to string if isinstance(value, bytes): value = value.decode('utf-8') return value #------------------------------------------------------------------------------- def if_byte_then_to_str(input): if isinstance(input, bytes): input = input.decode('utf-8') input = bytes_to_string(re.sub('[^a-zA-Z0-9-_\s]', '', str(input))) return input #------------------------------------------------------------------------------- def collect_lang_strings(db, json, pref, stringSqlParams): for prop in json["localized"]: for language_string in json[prop]: stringSqlParams.append((str(language_string["language_code"]), str(pref + "_" + prop), str(language_string["string"]), "")) return stringSqlParams #------------------------------------------------------------------------------- # Creates a JSON object from a DB row def row_to_json(names, row): rowEntry = {} index = 0 for name in names: rowEntry[name]= if_byte_then_to_str(row[name]) index += 1 return rowEntry #------------------------------------------------------------------------------- def checkIPV4(ip): """ Define a function to validate an Ip address """ ipRegex = "^((25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])\.){3}(25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])$" if(re.search(ipRegex, ip)): return True else: return False #------------------------------------------------------------------------------- def isNewVersion(newVersion: bool): mylog('debug', [f"[Version check] New version available? {newVersion}"]) if newVersion == False: f = open(pialertPath + '/front/buildtimestamp.txt', 'r') buildTimestamp = int(f.read().strip()) f.close() data = "" try: url = requests.get("https://api.github.com/repos/jokob-sk/Pi.Alert/releases") text = url.text data = json.loads(text) except requests.exceptions.ConnectionError as e: mylog('minimal', [" Couldn't check for new release."]) data = "" # make sure we received a valid response and not an API rate limit exceeded message if data != "" and len(data) > 0 and isinstance(data, list) and "published_at" in data[0]: dateTimeStr = data[0]["published_at"] realeaseTimestamp = int(datetime.datetime.strptime(dateTimeStr, '%Y-%m-%dT%H:%M:%SZ').strftime('%s')) if realeaseTimestamp > buildTimestamp + 600: mylog('none', ["[Version check] New version of the container available!"]) newVersion = True return newVersion #------------------------------------------------------------------------------- def hide_email(email): m = email.split('@') if len(m) == 2: return f'{m[0][0]}{"*"*(len(m[0])-2)}{m[0][-1] if len(m[0]) > 1 else ""}@{m[1]}' return email #------------------------------------------------------------------------------- def removeDuplicateNewLines(text): if "\n\n\n" in text: return removeDuplicateNewLines(text.replace("\n\n\n", "\n\n")) else: return text #------------------------------------------------------------------------------- def add_json_list (row, list): new_row = [] for column in row : column = bytes_to_string(column) new_row.append(column) list.append(new_row) return list #------------------------------------------------------------------------------- def sanitize_string(input): if isinstance(input, bytes): input = input.decode('utf-8') value = bytes_to_string(re.sub('[^a-zA-Z0-9-_\s]', '', str(input))) return value #------------------------------------------------------------------------------- def generate_mac_links (html, deviceUrl): p = re.compile(r'(?:[0-9a-fA-F]:?){12}') MACs = re.findall(p, html) for mac in MACs: html = html.replace('