""" Colection of generic functions to support Pi.Alert """ import io import sys import datetime import os import re import subprocess from pytz import timezone from datetime import timedelta import json import time from pathlib import Path import requests import conf from const import * from logger import mylog, logResult #------------------------------------------------------------------------------- def timeNowTZ(): return datetime.datetime.now(conf.tz).replace(microsecond=0) def timeNow(): return datetime.datetime.now().replace(microsecond=0) #------------------------------------------------------------------------------- def updateState(db, newState): # ?? Why is the state written to the DB? # The state is written to the DB so the front-end can use the value to display the current state in the header of the app # The Parameters DB table is used to communicate with the front end. #sql = db.sql mylog('debug', '[updateState] changing state to: "' + newState +'"') db.sql.execute ("UPDATE Parameters SET par_Value='"+ newState +"' WHERE par_ID='Back_App_State'") db.commitDB() #------------------------------------------------------------------------------- def updateSubnets(scan_subnets): subnets = [] # multiple interfaces if type(scan_subnets) is list: for interface in scan_subnets : subnets.append(interface) # one interface only else: subnets.append(scan_subnets) return subnets #------------------------------------------------------------------------------- # check RW access of DB and config file def checkPermissionsOK(): #global confR_access, confW_access, dbR_access, dbW_access confR_access = (os.access(fullConfPath, os.R_OK)) confW_access = (os.access(fullConfPath, os.W_OK)) dbR_access = (os.access(fullDbPath, os.R_OK)) dbW_access = (os.access(fullDbPath, os.W_OK)) mylog('none', ['\n']) mylog('none', ['The container restarted (started). If this is unexpected check https://bit.ly/PiAlertDebug for troubleshooting tips.']) mylog('none', ['\n']) mylog('none', ['Permissions check (All should be True)']) mylog('none', ['------------------------------------------------']) mylog('none', [ " " , confPath , " | " , " READ | " , confR_access]) mylog('none', [ " " , confPath , " | " , " WRITE | " , confW_access]) mylog('none', [ " " , dbPath , " | " , " READ | " , dbR_access]) mylog('none', [ " " , dbPath , " | " , " WRITE | " , dbW_access]) mylog('none', ['------------------------------------------------']) #return dbR_access and dbW_access and confR_access and confW_access return (confR_access, dbR_access) #------------------------------------------------------------------------------- def fixPermissions(): # Try fixing access rights if needed chmodCommands = [] chmodCommands.append(['sudo', 'chmod', 'a+rw', '-R', fullDbPath]) chmodCommands.append(['sudo', 'chmod', 'a+rw', '-R', fullConfPath]) for com in chmodCommands: # Execute command mylog('none', ["[Setup] Attempting to fix permissions."]) try: # try runnning a subprocess result = subprocess.check_output (com, universal_newlines=True) except subprocess.CalledProcessError as e: # An error occured, handle it mylog('none', ["[Setup] Fix Failed. Execute this command manually inside of the container: ", ' '.join(com)]) mylog('none', [e.output]) #------------------------------------------------------------------------------- def initialiseFile(pathToCheck, defaultFile): # if file not readable (missing?) try to copy over the backed-up (default) one if str(os.access(pathToCheck, os.R_OK)) == "False": mylog('none', ["[Setup] ("+ pathToCheck +") file is not readable or missing. Trying to copy over the default one."]) try: # try runnning a subprocess p = subprocess.Popen(["cp", defaultFile , pathToCheck], stdout=subprocess.PIPE, stderr=subprocess.STDOUT) stdout, stderr = p.communicate() if str(os.access(pathToCheck, os.R_OK)) == "False": mylog('none', ["[Setup] Error copying ("+defaultFile+") to ("+pathToCheck+"). Make sure the app has Read & Write access to the parent directory."]) else: mylog('none', ["[Setup] ("+defaultFile+") copied over successfully to ("+pathToCheck+")."]) # write stdout and stderr into .log files for debugging if needed logResult (stdout, stderr) # TO-DO should be changed to mylog except subprocess.CalledProcessError as e: # An error occured, handle it mylog('none', ["[Setup] Error copying ("+defaultFile+"). Make sure the app has Read & Write access to " + pathToCheck]) mylog('none', [e.output]) def filePermissions(): # check and initialize pialert.conf (confR_access, dbR_access) = checkPermissionsOK() # Initial check if confR_access == False: initialiseFile(fullConfPath, "/home/pi/pialert/back/pialert.conf_bak" ) # check and initialize pialert.db if dbR_access == False: initialiseFile(fullDbPath, "/home/pi/pialert/back/pialert.db_bak") # last attempt fixPermissions() #------------------------------------------------------------------------------- def bytes_to_string(value): # if value is of type bytes, convert to string if isinstance(value, bytes): value = value.decode('utf-8') return value #------------------------------------------------------------------------------- def if_byte_then_to_str(input): if isinstance(input, bytes): input = input.decode('utf-8') input = bytes_to_string(re.sub('[^a-zA-Z0-9-_\s]', '', str(input))) return input #------------------------------------------------------------------------------- def collect_lang_strings(db, json, pref): for prop in json["localized"]: for language_string in json[prop]: import_language_string(db, language_string["language_code"], pref + "_" + prop, language_string["string"]) #------------------------------------------------------------------------------- # Creates a JSON object from a DB row def row_to_json(names, row): rowEntry = {} index = 0 for name in names: rowEntry[name]= if_byte_then_to_str(row[name]) index += 1 return rowEntry #------------------------------------------------------------------------------- def import_language_string(db, code, key, value, extra = ""): db.sql.execute ("""INSERT INTO Plugins_Language_Strings ("Language_Code", "String_Key", "String_Value", "Extra") VALUES (?, ?, ?, ?)""", (str(code), str(key), str(value), str(extra))) db.commitDB() #------------------------------------------------------------------------------- def checkIPV4(ip): """ Define a function to validate an Ip address """ ipRegex = "^((25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])\.){3}(25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])$" if(re.search(ipRegex, ip)): return True else: return False #------------------------------------------------------------------------------- def isNewVersion(newVersion: bool): mylog('debug', [f"[Version check] New version available? {newVersion}"]) if newVersion == False: f = open(pialertPath + '/front/buildtimestamp.txt', 'r') buildTimestamp = int(f.read().strip()) f.close() data = "" try: url = requests.get("https://api.github.com/repos/jokob-sk/Pi.Alert/releases") text = url.text data = json.loads(text) except requests.exceptions.ConnectionError as e: mylog('minimal', [" Couldn't check for new release."]) data = "" # make sure we received a valid response and not an API rate limit exceeded message if data != "" and len(data) > 0 and isinstance(data, list) and "published_at" in data[0]: dateTimeStr = data[0]["published_at"] realeaseTimestamp = int(datetime.datetime.strptime(dateTimeStr, '%Y-%m-%dT%H:%M:%SZ').strftime('%s')) if realeaseTimestamp > buildTimestamp + 600: mylog('none', ["[Version check] New version of the container available!"]) newVersion = True # updateState(db, 'Back_New_Version_Available', str(newVersionAvailable)) ## TO DO add this back in but avoid circular ref with database return newVersion #------------------------------------------------------------------------------- def hide_email(email): m = email.split('@') if len(m) == 2: return f'{m[0][0]}{"*"*(len(m[0])-2)}{m[0][-1] if len(m[0]) > 1 else ""}@{m[1]}' return email #------------------------------------------------------------------------------- def removeDuplicateNewLines(text): if "\n\n\n" in text: return removeDuplicateNewLines(text.replace("\n\n\n", "\n\n")) else: return text #------------------------------------------------------------------------------- def add_json_list (row, list): new_row = [] for column in row : column = bytes_to_string(column) new_row.append(column) list.append(new_row) return list #------------------------------------------------------------------------------- def sanitize_string(input): if isinstance(input, bytes): input = input.decode('utf-8') value = bytes_to_string(re.sub('[^a-zA-Z0-9-_\s]', '', str(input))) return value #------------------------------------------------------------------------------- def generate_mac_links (html, deviceUrl): p = re.compile(r'(?:[0-9a-fA-F]:?){12}') MACs = re.findall(p, html) for mac in MACs: html = html.replace('' + mac + '','' + mac + '') return html #------------------------------------------------------------------------------- def initOrSetParam(db, parID, parValue): sql = db.sql sql.execute ("INSERT INTO Parameters(par_ID, par_Value) VALUES('"+str(parID)+"', '"+str(parValue)+"') ON CONFLICT(par_ID) DO UPDATE SET par_Value='"+str(parValue)+"' where par_ID='"+str(parID)+"'") db.commitDB() #------------------------------------------------------------------------------- class json_struc: def __init__(self, jsn, columnNames): self.json = jsn self.columnNames = columnNames #------------------------------------------------------------------------------- def get_file_content(path): f = open(path, 'r') content = f.read() f.close() return content #------------------------------------------------------------------------------- def write_file(pPath, pText): # Convert pText to a string if it's a dictionary if isinstance(pText, dict): pText = json.dumps(pText) # Convert pText to a string if it's a list if isinstance(pText, list): for item in pText: write_file(pPath, item) else: # Write the text using the correct Python version if sys.version_info < (3, 0): file = io.open(pPath, mode='w', encoding='utf-8') file.write(pText.decode('unicode_escape')) file.close() else: file = open(pPath, 'w', encoding='utf-8') if pText is None: pText = "" file.write(pText) file.close() #------------------------------------------------------------------------------- class noti_struc: def __init__(self, json, text, html): self.json = json self.text = text self.html = html #------------------------------------------------------------------------------- def isJsonObject(value): return isinstance(value, dict) #------------------------------------------------------------------------------- # Return whole setting touple def get_setting(key): result = None # index order: key, name, desc, inputtype, options, regex, result, group, events for set in conf.mySettings: if set[0] == key: result = set if result is None: mylog('minimal', [' Error - setting_missing - Setting not found for key: ', key]) mylog('minimal', [' Error - logging the settings into file: ', logPath + '/setting_missing.json']) write_file (logPath + '/setting_missing.json', json.dumps({ 'data' : conf.mySettings})) return result #------------------------------------------------------------------------------- # Return setting value def get_setting_value(key): set = get_setting(key) if get_setting(key) is not None: setVal = set[6] # setting value setTyp = set[3] # setting type return setVal return ''