""" Colection of generic functions to support Pi.Alert """ import io import sys import datetime # from datetime import strptime import os import re import subprocess import pytz from pytz import timezone import json import time from pathlib import Path import requests import conf from const import * from logger import mylog, logResult #------------------------------------------------------------------------------- # DateTime #------------------------------------------------------------------------------- # Get the current time in the current TimeZone def timeNowTZ(): if isinstance(conf.TIMEZONE, str): tz = pytz.timezone(conf.TIMEZONE) else: tz = conf.TIMEZONE return datetime.datetime.now(tz).replace(microsecond=0) def timeNow(): return datetime.datetime.now().replace(microsecond=0) #------------------------------------------------------------------------------- # App state #------------------------------------------------------------------------------- # A class to manage the application state and to provide a frontend accessible API point class app_state_class: def __init__(self, currentState, settingsSaved=None, settingsImported=None, showSpinner=False): # json file containing the state to communicate with the frontend stateFile = apiPath + '/app_state.json' # if currentState == 'Initializing': # checkNewVersion(False) # Update self self.currentState = currentState self.lastUpdated = str(timeNowTZ()) # Check if the file exists and init values if os.path.exists(stateFile): with open(stateFile, 'r') as json_file: previousState = json.load(json_file) self.settingsSaved = previousState.get("settingsSaved", 0) self.settingsImported = previousState.get("settingsImported", 0) self.showSpinner = previousState.get("showSpinner", False) self.isNewVersion = previousState.get("isNewVersion", False) self.isNewVersionChecked = previousState.get("isNewVersionChecked", 0) else: self.settingsSaved = 0 self.settingsImported = 0 self.showSpinner = False self.isNewVersion = checkNewVersion() self.isNewVersionChecked = int(timeNow().timestamp()) # Overwrite with provided parameters if supplied if settingsSaved is not None: self.settingsSaved = settingsSaved if settingsImported is not None: self.settingsImported = settingsImported if showSpinner is not None: self.showSpinner = showSpinner # check for new version every hour and if currently not running new version if self.isNewVersion is False and self.isNewVersionChecked + 3600 < int(timeNow().timestamp()): self.isNewVersion = checkNewVersion() self.isNewVersionChecked = int(timeNow().timestamp()) # Update .json file with open(stateFile, 'w') as json_file: json.dump(self, json_file, cls=AppStateEncoder, indent=4) def isSet(self): result = False if self.currentState != "": result = True return result #------------------------------------------------------------------------------- # method to update the state def updateState(newState, settingsSaved = None, settingsImported = None, showSpinner = False): state = app_state_class(newState, settingsSaved, settingsImported, showSpinner) #------------------------------------------------------------------------------- def updateSubnets(scan_subnets): subnets = [] # multiple interfaces if type(scan_subnets) is list: for interface in scan_subnets : subnets.append(interface) # one interface only else: subnets.append(scan_subnets) return subnets #------------------------------------------------------------------------------- # File system permission handling #------------------------------------------------------------------------------- # check RW access of DB and config file def checkPermissionsOK(): #global confR_access, confW_access, dbR_access, dbW_access confR_access = (os.access(fullConfPath, os.R_OK)) confW_access = (os.access(fullConfPath, os.W_OK)) dbR_access = (os.access(fullDbPath, os.R_OK)) dbW_access = (os.access(fullDbPath, os.W_OK)) mylog('none', ['\n']) mylog('none', ['The container restarted (started). If this is unexpected check https://bit.ly/PiAlertDebug for troubleshooting tips.']) mylog('none', ['\n']) mylog('none', ['Permissions check (All should be True)']) mylog('none', ['------------------------------------------------']) mylog('none', [ " " , confPath , " | " , " READ | " , confR_access]) mylog('none', [ " " , confPath , " | " , " WRITE | " , confW_access]) mylog('none', [ " " , dbPath , " | " , " READ | " , dbR_access]) mylog('none', [ " " , dbPath , " | " , " WRITE | " , dbW_access]) mylog('none', ['------------------------------------------------']) #return dbR_access and dbW_access and confR_access and confW_access return (confR_access, dbR_access) #------------------------------------------------------------------------------- def fixPermissions(): # Try fixing access rights if needed chmodCommands = [] chmodCommands.append(['sudo', 'chmod', 'a+rw', '-R', fullDbPath]) chmodCommands.append(['sudo', 'chmod', 'a+rw', '-R', fullConfPath]) for com in chmodCommands: # Execute command mylog('none', ["[Setup] Attempting to fix permissions."]) try: # try runnning a subprocess result = subprocess.check_output (com, universal_newlines=True) except subprocess.CalledProcessError as e: # An error occured, handle it mylog('none', ["[Setup] Fix Failed. Execute this command manually inside of the container: ", ' '.join(com)]) mylog('none', [e.output]) #------------------------------------------------------------------------------- def initialiseFile(pathToCheck, defaultFile): # if file not readable (missing?) try to copy over the backed-up (default) one if str(os.access(pathToCheck, os.R_OK)) == "False": mylog('none', ["[Setup] ("+ pathToCheck +") file is not readable or missing. Trying to copy over the default one."]) try: # try runnning a subprocess p = subprocess.Popen(["cp", defaultFile , pathToCheck], stdout=subprocess.PIPE, stderr=subprocess.STDOUT) stdout, stderr = p.communicate() if str(os.access(pathToCheck, os.R_OK)) == "False": mylog('none', ["[Setup] Error copying ("+defaultFile+") to ("+pathToCheck+"). Make sure the app has Read & Write access to the parent directory."]) else: mylog('none', ["[Setup] ("+defaultFile+") copied over successfully to ("+pathToCheck+")."]) # write stdout and stderr into .log files for debugging if needed logResult (stdout, stderr) # TO-DO should be changed to mylog except subprocess.CalledProcessError as e: # An error occured, handle it mylog('none', ["[Setup] Error copying ("+defaultFile+"). Make sure the app has Read & Write access to " + pathToCheck]) mylog('none', [e.output]) #------------------------------------------------------------------------------- def filePermissions(): # check and initialize pialert.conf (confR_access, dbR_access) = checkPermissionsOK() # Initial check if confR_access == False: initialiseFile(fullConfPath, "/home/pi/pialert/back/pialert.conf_bak" ) # check and initialize pialert.db if dbR_access == False: initialiseFile(fullDbPath, "/home/pi/pialert/back/pialert.db_bak") # last attempt fixPermissions() #------------------------------------------------------------------------------- # File manipulation methods #------------------------------------------------------------------------------- #------------------------------------------------------------------------------- def get_file_content(path): f = open(path, 'r') content = f.read() f.close() return content #------------------------------------------------------------------------------- def write_file(pPath, pText): # Convert pText to a string if it's a dictionary if isinstance(pText, dict): pText = json.dumps(pText) # Convert pText to a string if it's a list if isinstance(pText, list): for item in pText: write_file(pPath, item) else: # Write the text using the correct Python version if sys.version_info < (3, 0): file = io.open(pPath, mode='w', encoding='utf-8') file.write(pText.decode('unicode_escape')) file.close() else: file = open(pPath, 'w', encoding='utf-8') if pText is None: pText = "" file.write(pText) file.close() #------------------------------------------------------------------------------- # Setting methods #------------------------------------------------------------------------------- #------------------------------------------------------------------------------- # Return whole setting touple def get_setting(key): result = None # index order: key, name, desc, inputtype, options, regex, result, group, events for set in conf.mySettings: if set[0] == key: result = set if result is None: mylog('minimal', [' Error - setting_missing - Setting not found for key: ', key]) mylog('minimal', [' Error - logging the settings into file: ', logPath + '/setting_missing.json']) write_file (logPath + '/setting_missing.json', json.dumps({ 'data' : conf.mySettings})) return result #------------------------------------------------------------------------------- # Return setting value def get_setting_value(key): set = get_setting(key) if get_setting(key) is not None: setVal = set[6] # setting value setTyp = set[3] # setting type return setVal return '' #------------------------------------------------------------------------------- # IP validation methods #------------------------------------------------------------------------------- #------------------------------------------------------------------------------- def checkIPV4(ip): """ Define a function to validate an Ip address """ ipRegex = "^((25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])\.){3}(25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])$" if(re.search(ipRegex, ip)): return True else: return False #------------------------------------------------------------------------------- def check_IP_format (pIP): # Check IP format IPv4SEG = r'(?:25[0-5]|(?:2[0-4]|1{0,1}[0-9]){0,1}[0-9])' IPv4ADDR = r'(?:(?:' + IPv4SEG + r'\.){3,3}' + IPv4SEG + r')' IP = re.search(IPv4ADDR, pIP) # Return error if not IP if IP is None : return "" # Return IP return IP.group(0) #------------------------------------------------------------------------------- def resolve_device_name_dig (pMAC, pIP): newName = "" try : dig_args = ['dig', '+short', '-x', pIP] # Execute command try: # try runnning a subprocess newName = subprocess.check_output (dig_args, universal_newlines=True) except subprocess.CalledProcessError as e: # An error occured, handle it mylog('none', ['[device_name_dig] ', e.output]) # newName = "Error - check logs" return -1 # Check returns newName = newName.strip() if len(newName) == 0 : return -1 # Cleanup newName = cleanResult(newName) if newName == "" or len(newName) == 0: return -1 # Return newName return newName # not Found except subprocess.CalledProcessError : return -1 #------------------------------------------------------------------------------- # DNS record (Pholus/Name resolution) cleanup methods #------------------------------------------------------------------------------- #------------------------------------------------------------------------------- # Disclaimer - I'm interfacing with a script I didn't write (pholus3.py) so it's possible I'm missing types of answers # it's also possible the pholus3.py script can be adjusted to provide a better output to interface with it # Hit me with a PR if you know how! :) def resolve_device_name_pholus (pMAC, pIP, allRes): pholusMatchesIndexes = [] index = 0 for result in allRes: # limiting entries used for name resolution to the ones containing the current IP (v4 only) if result["MAC"] == pMAC and result["Record_Type"] == "Answer" and result["IP_v4_or_v6"] == pIP and '._googlezone' not in result["Value"]: # found entries with a matching MAC address, let's collect indexes pholusMatchesIndexes.append(index) index += 1 # return if nothing found if len(pholusMatchesIndexes) == 0: return -1 # we have some entries let's try to select the most useful one # airplay matches contain a lot of information # Matches for example: # Brand Tv (50)._airplay._tcp.local. TXT Class:32769 "acl=0 deviceid=66:66:66:66:66:66 features=0x77777,0x38BCB46 rsf=0x3 fv=p20.T-FFFFFF-03.1 flags=0x204 model=XXXX manufacturer=Brand serialNumber=XXXXXXXXXXX protovers=1.1 srcvers=777.77.77 pi=FF:FF:FF:FF:FF:FF psi=00000000-0000-0000-0000-FFFFFFFFFF gid=00000000-0000-0000-0000-FFFFFFFFFF gcgl=0 pk=AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA" for i in pholusMatchesIndexes: if checkIPV4(allRes[i]['IP_v4_or_v6']) and '._airplay._tcp.local. TXT Class:32769' in str(allRes[i]["Value"]) : return allRes[i]["Value"].split('._airplay._tcp.local. TXT Class:32769')[0] # second best - contains airplay # Matches for example: # _airplay._tcp.local. PTR Class:IN "Brand Tv (50)._airplay._tcp.local." for i in pholusMatchesIndexes: if checkIPV4(allRes[i]['IP_v4_or_v6']) and '_airplay._tcp.local. PTR Class:IN' in allRes[i]["Value"] and ('._googlecast') not in allRes[i]["Value"]: return cleanResult(allRes[i]["Value"].split('"')[1]) # Contains PTR Class:32769 # Matches for example: # 3.1.168.192.in-addr.arpa. PTR Class:32769 "MyPc.local." for i in pholusMatchesIndexes: if checkIPV4(allRes[i]['IP_v4_or_v6']) and 'PTR Class:32769' in allRes[i]["Value"]: return cleanResult(allRes[i]["Value"].split('"')[1]) # Contains AAAA Class:IN # Matches for example: # DESKTOP-SOMEID.local. AAAA Class:IN "fe80::fe80:fe80:fe80:fe80" for i in pholusMatchesIndexes: if checkIPV4(allRes[i]['IP_v4_or_v6']) and 'AAAA Class:IN' in allRes[i]["Value"]: return cleanResult(allRes[i]["Value"].split('.local.')[0]) # Contains _googlecast._tcp.local. PTR Class:IN # Matches for example: # _googlecast._tcp.local. PTR Class:IN "Nest-Audio-ff77ff77ff77ff77ff77ff77ff77ff77._googlecast._tcp.local." for i in pholusMatchesIndexes: if checkIPV4(allRes[i]['IP_v4_or_v6']) and '_googlecast._tcp.local. PTR Class:IN' in allRes[i]["Value"] and ('Google-Cast-Group') not in allRes[i]["Value"]: return cleanResult(allRes[i]["Value"].split('"')[1]) # Contains A Class:32769 # Matches for example: # Android.local. A Class:32769 "192.168.1.6" for i in pholusMatchesIndexes: if checkIPV4(allRes[i]['IP_v4_or_v6']) and ' A Class:32769' in allRes[i]["Value"]: return cleanResult(allRes[i]["Value"].split(' A Class:32769')[0]) # # Contains PTR Class:IN # Matches for example: # _esphomelib._tcp.local. PTR Class:IN "ceiling-light-1._esphomelib._tcp.local." for i in pholusMatchesIndexes: if checkIPV4(allRes[i]['IP_v4_or_v6']) and 'PTR Class:IN' in allRes[i]["Value"]: if allRes[i]["Value"] and len(allRes[i]["Value"].split('"')) > 1: return cleanResult(allRes[i]["Value"].split('"')[1]) return -1 #------------------------------------------------------------------------------- def cleanResult(str): # alternative str.split('.')[0] str = str.replace("._airplay", "") str = str.replace("._tcp", "") str = str.replace(".local", "") str = str.replace("._esphomelib", "") str = str.replace("._googlecast", "") str = str.replace(".lan", "") str = str.replace(".home", "") str = re.sub(r'-[a-fA-F0-9]{32}', '', str) # removing last part of e.g. Nest-Audio-ff77ff77ff77ff77ff77ff77ff77ff77 str = re.sub(r'#.*', '', str) # Remove everything after '#' including the '#' # remove trailing dots if str.endswith('.'): str = str[:-1] return str #------------------------------------------------------------------------------- # String manipulation methods #------------------------------------------------------------------------------- #------------------------------------------------------------------------------- def bytes_to_string(value): # if value is of type bytes, convert to string if isinstance(value, bytes): value = value.decode('utf-8') return value #------------------------------------------------------------------------------- def if_byte_then_to_str(input): if isinstance(input, bytes): input = input.decode('utf-8') input = bytes_to_string(re.sub('[^a-zA-Z0-9-_\s]', '', str(input))) return input #------------------------------------------------------------------------------- def hide_email(email): m = email.split('@') if len(m) == 2: return f'{m[0][0]}{"*"*(len(m[0])-2)}{m[0][-1] if len(m[0]) > 1 else ""}@{m[1]}' return email #------------------------------------------------------------------------------- def removeDuplicateNewLines(text): if "\n\n\n" in text: return removeDuplicateNewLines(text.replace("\n\n\n", "\n\n")) else: return text #------------------------------------------------------------------------------- def sanitize_string(input): if isinstance(input, bytes): input = input.decode('utf-8') value = bytes_to_string(re.sub('[^a-zA-Z0-9-_\s]', '', str(input))) return value #------------------------------------------------------------------------------- def generate_mac_links (html, deviceUrl): p = re.compile(r'(?:[0-9a-fA-F]:?){12}') MACs = re.findall(p, html) for mac in MACs: html = html.replace('