""" Colection of generic functions to support NetAlertX """ import io import sys import datetime # from datetime import strptime import os import re import subprocess import pytz from pytz import timezone import json import time from pathlib import Path import requests from cryptography.fernet import Fernet import base64 import hashlib import conf from const import * from logger import mylog, logResult # Register NetAlertX directories INSTALL_PATH="/app" #------------------------------------------------------------------------------- # DateTime #------------------------------------------------------------------------------- # Get the current time in the current TimeZone def timeNowTZ(): if conf.tz: return datetime.datetime.now(conf.tz).replace(microsecond=0) else: return datetime.datetime.now().replace(microsecond=0) # if isinstance(conf.TIMEZONE, str): # tz = pytz.timezone(conf.TIMEZONE) # else: # tz = conf.TIMEZONE # return datetime.datetime.now(tz).replace(microsecond=0) def timeNow(): return datetime.datetime.now().replace(microsecond=0) def get_timezone_offset(): now = datetime.datetime.now(conf.tz) offset_hours = now.utcoffset().total_seconds() / 3600 offset_formatted = "{:+03d}:{:02d}".format(int(offset_hours), int((offset_hours % 1) * 60)) return offset_formatted #------------------------------------------------------------------------------- # App state #------------------------------------------------------------------------------- # A class to manage the application state and to provide a frontend accessible API point class app_state_class: def __init__(self, currentState, settingsSaved=None, settingsImported=None, showSpinner=False): # json file containing the state to communicate with the frontend stateFile = apiPath + '/app_state.json' # if currentState == 'Initializing': # checkNewVersion(False) # Update self self.currentState = currentState self.lastUpdated = str(timeNowTZ()) # Check if the file exists and init values if os.path.exists(stateFile): with open(stateFile, 'r') as json_file: previousState = json.load(json_file) self.settingsSaved = previousState.get("settingsSaved", 0) self.settingsImported = previousState.get("settingsImported", 0) self.showSpinner = previousState.get("showSpinner", False) self.isNewVersion = previousState.get("isNewVersion", False) self.isNewVersionChecked = previousState.get("isNewVersionChecked", 0) else: self.settingsSaved = 0 self.settingsImported = 0 self.showSpinner = False self.isNewVersion = checkNewVersion() self.isNewVersionChecked = int(timeNow().timestamp()) # Overwrite with provided parameters if supplied if settingsSaved is not None: self.settingsSaved = settingsSaved if settingsImported is not None: self.settingsImported = settingsImported if showSpinner is not None: self.showSpinner = showSpinner # check for new version every hour and if currently not running new version if self.isNewVersion is False and self.isNewVersionChecked + 3600 < int(timeNow().timestamp()): self.isNewVersion = checkNewVersion() self.isNewVersionChecked = int(timeNow().timestamp()) # Update .json file with open(stateFile, 'w') as json_file: json.dump(self, json_file, cls=AppStateEncoder, indent=4) def isSet(self): result = False if self.currentState != "": result = True return result #------------------------------------------------------------------------------- # method to update the state def updateState(newState, settingsSaved = None, settingsImported = None, showSpinner = False): state = app_state_class(newState, settingsSaved, settingsImported, showSpinner) #------------------------------------------------------------------------------- def updateSubnets(scan_subnets): subnets = [] # multiple interfaces if type(scan_subnets) is list: for interface in scan_subnets : subnets.append(interface) # one interface only else: subnets.append(scan_subnets) return subnets #------------------------------------------------------------------------------- # File system permission handling #------------------------------------------------------------------------------- # check RW access of DB and config file def checkPermissionsOK(): #global confR_access, confW_access, dbR_access, dbW_access confR_access = (os.access(fullConfPath, os.R_OK)) confW_access = (os.access(fullConfPath, os.W_OK)) dbR_access = (os.access(fullDbPath, os.R_OK)) dbW_access = (os.access(fullDbPath, os.W_OK)) mylog('none', ['\n']) mylog('none', ['The container restarted (started). If this is unexpected check https://bit.ly/NetAlertX_debug for troubleshooting tips.']) mylog('none', ['\n']) mylog('none', ['Permissions check (All should be True)']) mylog('none', ['------------------------------------------------']) mylog('none', [ " " , confPath , " | " , " READ | " , confR_access]) mylog('none', [ " " , confPath , " | " , " WRITE | " , confW_access]) mylog('none', [ " " , dbPath , " | " , " READ | " , dbR_access]) mylog('none', [ " " , dbPath , " | " , " WRITE | " , dbW_access]) mylog('none', ['------------------------------------------------']) #return dbR_access and dbW_access and confR_access and confW_access return (confR_access, dbR_access) #------------------------------------------------------------------------------- def fixPermissions(): # Try fixing access rights if needed chmodCommands = [] chmodCommands.append(['sudo', 'chmod', 'a+rw', '-R', fullDbPath]) chmodCommands.append(['sudo', 'chmod', 'a+rw', '-R', fullConfPath]) for com in chmodCommands: # Execute command mylog('none', ["[Setup] Attempting to fix permissions."]) try: # try runnning a subprocess result = subprocess.check_output (com, universal_newlines=True) except subprocess.CalledProcessError as e: # An error occured, handle it mylog('none', ["[Setup] Fix Failed. Execute this command manually inside of the container: ", ' '.join(com)]) mylog('none', [e.output]) #------------------------------------------------------------------------------- def initialiseFile(pathToCheck, defaultFile): # if file not readable (missing?) try to copy over the backed-up (default) one if str(os.access(pathToCheck, os.R_OK)) == "False": mylog('none', ["[Setup] ("+ pathToCheck +") file is not readable or missing. Trying to copy over the default one."]) try: # try runnning a subprocess p = subprocess.Popen(["cp", defaultFile , pathToCheck], stdout=subprocess.PIPE, stderr=subprocess.STDOUT) stdout, stderr = p.communicate() if str(os.access(pathToCheck, os.R_OK)) == "False": mylog('none', ["[Setup] ⚠ ERROR copying ("+defaultFile+") to ("+pathToCheck+"). Make sure the app has Read & Write access to the parent directory."]) else: mylog('none', ["[Setup] ("+defaultFile+") copied over successfully to ("+pathToCheck+")."]) # write stdout and stderr into .log files for debugging if needed logResult (stdout, stderr) # TO-DO should be changed to mylog except subprocess.CalledProcessError as e: # An error occured, handle it mylog('none', ["[Setup] ⚠ ERROR copying ("+defaultFile+"). Make sure the app has Read & Write access to " + pathToCheck]) mylog('none', [e.output]) #------------------------------------------------------------------------------- def filePermissions(): # check and initialize .conf (confR_access, dbR_access) = checkPermissionsOK() # Initial check if confR_access == False: initialiseFile(fullConfPath, f"{INSTALL_PATH}/back/app.conf" ) # check and initialize .db if dbR_access == False: initialiseFile(fullDbPath, f"{INSTALL_PATH}/back/app.db") # last attempt fixPermissions() #------------------------------------------------------------------------------- # File manipulation methods #------------------------------------------------------------------------------- #------------------------------------------------------------------------------- def get_file_content(path): f = open(path, 'r') content = f.read() f.close() return content #------------------------------------------------------------------------------- def write_file(pPath, pText): # Convert pText to a string if it's a dictionary if isinstance(pText, dict): pText = json.dumps(pText) # Convert pText to a string if it's a list if isinstance(pText, list): for item in pText: write_file(pPath, item) else: # Write the text using the correct Python version if sys.version_info < (3, 0): file = io.open(pPath, mode='w', encoding='utf-8') file.write(pText.decode('unicode_escape')) file.close() else: file = open(pPath, 'w', encoding='utf-8') if pText is None: pText = "" file.write(pText) file.close() #------------------------------------------------------------------------------- # Setting methods #------------------------------------------------------------------------------- #------------------------------------------------------------------------------- # Return whole setting touple def get_setting(key): settingsFile = apiPath + 'table_settings.json' try: with open(settingsFile, 'r') as json_file: data = json.load(json_file) for item in data.get("data",[]): if item.get("Code_Name") == key: return item mylog('debug', [f'[Settings] ⚠ ERROR - setting_missing - Setting not found for key: {key} in file {settingsFile}']) return None except (FileNotFoundError, json.JSONDecodeError, ValueError) as e: # Handle the case when the file is not found, JSON decoding fails, or data is not in the expected format mylog('none', [f'[Settings] ⚠ ERROR - JSONDecodeError or FileNotFoundError for file {settingsFile}']) return None #------------------------------------------------------------------------------- # Settings #------------------------------------------------------------------------------- #------------------------------------------------------------------------------- # Return setting value def get_setting_value(key): # Returns empty string if not set value = '' setting = get_setting(key) if setting is not None: # mylog('none', [f'[SETTINGS] setting json:{json.dumps(setting)}']) set_type = 'Error: Not handled' set_value = 'Error: Not handled' set_value = setting["Value"] # Setting value (Value (upper case) = user overriden default_value) set_type = setting["Type"] # Setting type # lower case "type" - default json value vs uppper-case "Type" (= from user defined settings) value = setting_value_to_python_type(set_type, set_value) return value #------------------------------------------------------------------------------- # Convert the setting value to the corresponding python type def setting_value_to_python_type(set_type, set_value): value = '' # Handle different types of settings if set_type in ['text', 'string', 'password', 'password.SHA256', 'readonly', 'text.select']: value = str(set_value) elif set_type in ['boolean', 'integer.checkbox']: value = False if isinstance(set_value, str) and set_value.lower() in ['true', '1']: value = True elif isinstance(set_value, int) and set_value == 1: value = True elif isinstance(set_value, bool): value = set_value elif set_type in ['integer.select', 'integer']: value = int(set_value) elif set_type in ['text.multiselect', 'list', 'subnets', 'list.select']: # Handle string mylog('debug', [f'[SETTINGS] Handling set_type: "{set_type}", set_value: "{set_value}"']) if isinstance(set_value, str): value = json.loads(set_value.replace("'", "\"")) # Assuming set_value is a list in this case elif isinstance(set_value, list): value = set_value elif set_type == '.template': # Assuming set_value is a JSON object in this case value = json.loads(set_value) else: mylog('none', [f'[SETTINGS] ⚠ ERROR - set_type not handled:{set_type}']) mylog('none', [f'[SETTINGS] ⚠ ERROR - setting json:{json.dumps(setting)}']) return value #------------------------------------------------------------------------------- # Generate a WHERE condition for SQLite based on a list of values. def list_to_where(logical_operator, column_name, condition_operator, values_list): """ Generate a WHERE condition for SQLite based on a list of values. Parameters: - logical_operator: The logical operator ('AND' or 'OR') to combine conditions. - column_name: The name of the column to filter on. - condition_operator: The condition operator ('LIKE', 'NOT LIKE', '=', '!=', etc.). - values_list: A list of values to be included in the condition. Returns: - A string representing the WHERE condition. """ if not values_list: return "" # Return an empty string if the list is empty to avoid breaking the SQL condition. # Replace {s-quote} with single quote in values_list values_list = [value.replace("{s-quote}", "'") for value in values_list] # Build the WHERE condition condition = f"{column_name} {condition_operator} '{values_list[0]}'" for value in values_list[1:]: condition += f" {logical_operator} {column_name} {condition_operator} '{value}'" return f' AND ({condition}) ' #------------------------------------------------------------------------------- # IP validation methods #------------------------------------------------------------------------------- #------------------------------------------------------------------------------- def checkIPV4(ip): """ Define a function to validate an Ip address """ ipRegex = "^((25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])\.){3}(25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])$" if(re.search(ipRegex, ip)): return True else: return False #------------------------------------------------------------------------------- def check_IP_format (pIP): # check if TCP communication error ocurred if 'communications error to' in pIP: return '' # Check IP format IPv4SEG = r'(?:25[0-5]|(?:2[0-4]|1{0,1}[0-9]){0,1}[0-9])' IPv4ADDR = r'(?:(?:' + IPv4SEG + r'\.){3,3}' + IPv4SEG + r')' IP = re.search(IPv4ADDR, pIP) # Return empty if not IP if IP is None : return "" # Return IP return IP.group(0) #------------------------------------------------------------------------------- def get_device_name_nslookup(db, pMAC, pIP): nameNotFound = "(name not found)" sql = db.sql name = nameNotFound # get names from the NSLOOKUP plugin entries vased on MAC sql.execute( f""" SELECT Watched_Value2 FROM Plugins_Objects WHERE Plugin = 'NSLOOKUP' AND Object_PrimaryID = '{pMAC}' """ ) nslookupEntry = sql.fetchall() db.commitDB() if len(nslookupEntry) != 0: name = cleanDeviceName(nslookupEntry[0][0], False) return name # get names from the NSLOOKUP plugin entries based on IP sql.execute( f""" SELECT Watched_Value2 FROM Plugins_Objects WHERE Plugin = 'NSLOOKUP' AND Object_SecondaryID = '{pIP}' """ ) nslookupEntry = sql.fetchall() db.commitDB() if len(nslookupEntry) != 0: name = cleanDeviceName(nslookupEntry[0][0], True) return name return name #------------------------------------------------------------------------------- def resolve_device_name_dig (pMAC, pIP): nameNotFound = "(name not found)" dig_args = ['dig', '+short', '-x', pIP] # Execute command try: # try runnning a subprocess newName = subprocess.check_output (dig_args, universal_newlines=True) # Check returns newName = newName.strip() if len(newName) == 0 : return nameNotFound # Cleanup newName = cleanDeviceName(newName, True) if newName == "" or len(newName) == 0 or newName == '-1' or newName == -1 or "communications error" in newName or 'malformed message packet' in newName : return nameNotFound # all checks passed mylog('debug', [f'[resolve_device_name_dig] Found a new name: "{newName}"']) return newName except subprocess.CalledProcessError as e: # An error occured, handle it mylog('none', ['[resolve_device_name_dig] ⚠ ERROR: ', e.output]) # newName = "Error - check logs" return nameNotFound #------------------------------------------------------------------------------- # DNS record (Pholus/Name resolution) cleanup methods #------------------------------------------------------------------------------- #------------------------------------------------------------------------------- # Disclaimer - I'm interfacing with a script I didn't write (pholus3.py) so it's possible I'm missing types of answers # it's also possible the pholus3.py script can be adjusted to provide a better output to interface with it # Hit me with a PR if you know how! :) def resolve_device_name_pholus (pMAC, pIP, allRes, nameNotFound, match_IP = False): pholusMatchesIndexes = [] result = nameNotFound # Collect all Pholus entries with matching MAC and of type Answer index = 0 for result in allRes: # limiting entries used for name resolution to the ones containing the current IP (v4 only) if ((match_IP and result["IP_v4_or_v6"] == pIP ) or ( result["MAC"] == pMAC )) and result["Record_Type"] == "Answer" and '._googlezone' not in result["Value"]: # found entries with a matching MAC address, let's collect indexes pholusMatchesIndexes.append(index) index += 1 # return if nothing found if len(pholusMatchesIndexes) == 0: return nameNotFound # we have some entries let's try to select the most useful one # Do I need to pre-order allRes to have the most valuable onse on the top? for i in pholusMatchesIndexes: if not checkIPV4(allRes[i]['IP_v4_or_v6']): continue value = allRes[i]["Value"] # airplay matches contain a lot of information # Matches for example: # Brand Tv (50)._airplay._tcp.local. TXT Class:32769 "acl=0 deviceid=66:66:66:66:66:66 features=0x77777,0x38BCB46 rsf=0x3 fv=p20.T-FFFFFF-03.1 flags=0x204 model=XXXX manufacturer=Brand serialNumber=XXXXXXXXXXX protovers=1.1 srcvers=777.77.77 pi=FF:FF:FF:FF:FF:FF psi=00000000-0000-0000-0000-FFFFFFFFFF gid=00000000-0000-0000-0000-FFFFFFFFFF gcgl=0 pk=AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA" if '._airplay._tcp.local. TXT Class:32769' in value: return cleanDeviceName(value.split('._airplay._tcp.local. TXT Class:32769')[0], match_IP) # second best - contains airplay # Matches for example: # _airplay._tcp.local. PTR Class:IN "Brand Tv (50)._airplay._tcp.local." if '_airplay._tcp.local. PTR Class:IN' in value and ('._googlecast') not in value: return cleanDeviceName(value.split('"')[1], match_IP) # Contains PTR Class:32769 # Matches for example: # 3.1.168.192.in-addr.arpa. PTR Class:32769 "MyPc.local." if 'PTR Class:32769' in value: return cleanDeviceName(value.split('"')[1], match_IP) # Contains AAAA Class:IN # Matches for example: # DESKTOP-SOMEID.local. AAAA Class:IN "fe80::fe80:fe80:fe80:fe80" if 'AAAA Class:IN' in value: return cleanDeviceName(value.split('.local.')[0], match_IP) # Contains _googlecast._tcp.local. PTR Class:IN # Matches for example: # _googlecast._tcp.local. PTR Class:IN "Nest-Audio-ff77ff77ff77ff77ff77ff77ff77ff77._googlecast._tcp.local." if '_googlecast._tcp.local. PTR Class:IN' in value and ('Google-Cast-Group') not in value: return cleanDeviceName(value.split('"')[1], match_IP) # Contains A Class:32769 # Matches for example: # Android.local. A Class:32769 "192.168.1.6" if ' A Class:32769' in value: return cleanDeviceName(value.split(' A Class:32769')[0], match_IP) # Contains PTR Class:IN # Matches for example: # _esphomelib._tcp.local. PTR Class:IN "ceiling-light-1._esphomelib._tcp.local." if 'PTR Class:IN' in value and len(value.split('"')) > 1: return cleanDeviceName(value.split('"')[1], match_IP) return nameNotFound #------------------------------------------------------------------------------- import dns.resolver def cleanDeviceName(str, match_IP): if get_setting_value('NEWDEV_LESS_NAME_CLEANUP'): mylog('debug', ["[Name cleanup] Using new cleanDeviceName(" + str + ")"]) # replace all labels starting with underscore str = re.sub(r'^_[^\.]*\.', '', str) # leading label str = re.sub(r'\._[^\.]*\.', '.', str) # nested label # get a stub resolver for access to resolv.conf configuration resolv = dns.resolver.Resolver() # replace the local domain name str = re.sub(r'\.' + resolv.domain.to_text() + r'$', '', str) # replace dns search list for name in resolv.search: str = re.sub(r'\.' + name.to_text() + r'$', '', str) # removing last part of e.g. Nest-Audio-ff77ff77ff77ff77ff77ff77ff77ff77 str = re.sub(r'-[a-fA-F0-9]{32}', '', str) # Remove everything after '#' including the '#' str = re.sub(r'#.*', '', str) # remove trailing dot if str.endswith('.'): str = str[:-1] # add matching info if match_IP: str = str + " (IP match)" # done mylog('debug', ["[Name cleanup] cleanDeviceName = " + str]) return str ################################ # # OLD cleanDeviceName mylog('debug', ["[Name cleanup] Using old cleanDeviceName(" + str + ")"]) # alternative str.split('.')[0] str = str.replace("._airplay", "") str = str.replace("._tcp", "") str = str.replace(".localdomain", "") str = str.replace(".local", "") str = str.replace("._esphomelib", "") str = str.replace("._googlecast", "") str = str.replace(".lan", "") str = str.replace(".home", "") str = re.sub(r'-[a-fA-F0-9]{32}', '', str) # removing last part of e.g. Nest-Audio-ff77ff77ff77ff77ff77ff77ff77ff77 str = re.sub(r'#.*', '', str) # Remove everything after '#' including the '#' # remove trailing dots if str.endswith('.'): str = str[:-1] if match_IP: str = str + " (IP match)" mylog('debug', ["cleanDeviceName = " + str]) return str #------------------------------------------------------------------------------- # String manipulation methods #------------------------------------------------------------------------------- #------------------------------------------------------------------------------- def bytes_to_string(value): # if value is of type bytes, convert to string if isinstance(value, bytes): value = value.decode('utf-8') return value #------------------------------------------------------------------------------- def if_byte_then_to_str(input): if isinstance(input, bytes): input = input.decode('utf-8') input = bytes_to_string(re.sub('[^a-zA-Z0-9-_\s]', '', str(input))) return input #------------------------------------------------------------------------------- def hide_email(email): m = email.split('@') if len(m) == 2: return f'{m[0][0]}{"*"*(len(m[0])-2)}{m[0][-1] if len(m[0]) > 1 else ""}@{m[1]}' return email #------------------------------------------------------------------------------- def hide_string(input_string): if len(input_string) < 3: return input_string # Strings with 2 or fewer characters remain unchanged else: return input_string[0] + "*" * (len(input_string) - 2) + input_string[-1] #------------------------------------------------------------------------------- def removeDuplicateNewLines(text): if "\n\n\n" in text: return removeDuplicateNewLines(text.replace("\n\n\n", "\n\n")) else: return text #------------------------------------------------------------------------------- def sanitize_string(input): if isinstance(input, bytes): input = input.decode('utf-8') value = bytes_to_string(re.sub('[^a-zA-Z0-9-_\s]', '', str(input))) return value #------------------------------------------------------------------------------- def generate_mac_links (html, deviceUrl): p = re.compile(r'(?:[0-9a-fA-F]:?){12}') MACs = re.findall(p, html) for mac in MACs: html = html.replace('' + mac + '','' + mac + '') return html #------------------------------------------------------------------------------- def extract_between_strings(text, start, end): start_index = text.find(start) end_index = text.find(end, start_index + len(start)) if start_index != -1 and end_index != -1: return text[start_index + len(start):end_index] else: return "" #------------------------------------------------------------------------------- def extract_mac_addresses(text): mac_pattern = r"([0-9A-Fa-f]{2}(?:[:-][0-9A-Fa-f]{2}){5})" mac_addresses = re.findall(mac_pattern, text) return mac_addresses #------------------------------------------------------------------------------- def extract_ip_addresses(text): ip_pattern = r"\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b" ip_addresses = re.findall(ip_pattern, text) return ip_addresses #------------------------------------------------------------------------------- # JSON methods #------------------------------------------------------------------------------- #------------------------------------------------------------------------------- def isJsonObject(value): return isinstance(value, dict) #------------------------------------------------------------------------------- def add_json_list (row, list): new_row = [] for column in row : column = bytes_to_string(column) new_row.append(column) list.append(new_row) return list #------------------------------------------------------------------------------- # Checks if the object has a __dict__ attribute. If it does, it assumes that it's an instance of a class and serializes its attributes dynamically. class AppStateEncoder(json.JSONEncoder): def default(self, obj): if hasattr(obj, '__dict__'): # If the object has a '__dict__', assume it's an instance of a class return obj.__dict__ return super().default(obj) #------------------------------------------------------------------------------- # Checks if the object has a __dict__ attribute. If it does, it assumes that it's an instance of a class and serializes its attributes dynamically. class NotiStrucEncoder(json.JSONEncoder): def default(self, obj): if hasattr(obj, '__dict__'): # If the object has a '__dict__', assume it's an instance of a class return obj.__dict__ return super().default(obj) #------------------------------------------------------------------------------- # Creates a JSON object from a DB row def row_to_json(names, row): rowEntry = {} index = 0 for name in names: rowEntry[name]= if_byte_then_to_str(row[name]) index += 1 return rowEntry #------------------------------------------------------------------------------- # Get language strings from plugin JSON def collect_lang_strings(json, pref, stringSqlParams): for prop in json["localized"]: for language_string in json[prop]: stringSqlParams.append((str(language_string["language_code"]), str(pref + "_" + prop), str(language_string["string"]), "")) return stringSqlParams #------------------------------------------------------------------------------- # Cryptography #------------------------------------------------------------------------------- def prepare_key(encryption_key): if(len(encryption_key) < 32): encryption_key = (int((32 / len(encryption_key)))+1 )*encryption_key key_bytearray = bytearray(encryption_key[:32], 'ASCII') return base64.urlsafe_b64encode(key_bytearray) def encrypt_data(data, encryption_key): fernet = Fernet(prepare_key(encryption_key)) # then use the Fernet class instance # to encrypt the string string must # be encoded to byte string before encryption encrypted_data = fernet.encrypt(data.encode()) return encrypted_data def decrypt_data(data, encryption_key): fernet = Fernet(prepare_key(encryption_key)) # decrypt the encrypted string with the # Fernet instance of the key, # that was used for encrypting the string # encoded byte string is returned by decrypt method, # so decode it to string with decode methods decrypted_data = fernet.decrypt(data).decode() return decrypted_data #------------------------------------------------------------------------------- # Misc #------------------------------------------------------------------------------- #------------------------------------------------------------------------------- def checkNewVersion(): mylog('debug', [f"[Version check] Checking if new version available"]) newVersion = False f = open(applicationPath + '/front/buildtimestamp.txt', 'r') buildTimestamp = int(f.read().strip()) f.close() data = "" try: url = requests.get("https://api.github.com/repos/jokob-sk/NetAlertX/releases") text = url.text data = json.loads(text) except requests.exceptions.ConnectionError as e: mylog('minimal', ["[Version check] ⚠ ERROR: Couldn't check for new release."]) data = "" # make sure we received a valid response and not an API rate limit exceeded message if data != "" and len(data) > 0 and isinstance(data, list) and "published_at" in data[0]: dateTimeStr = data[0]["published_at"] releaseTimestamp = int(datetime.datetime.strptime(dateTimeStr, '%Y-%m-%dT%H:%M:%S%z').timestamp()) if releaseTimestamp > buildTimestamp + 600: mylog('none', ["[Version check] New version of the container available!"]) newVersion = True else: mylog('none', ["[Version check] Running the latest version."]) return newVersion #------------------------------------------------------------------------------- def initOrSetParam(db, parID, parValue): sql = db.sql sql.execute ("INSERT INTO Parameters(par_ID, par_Value) VALUES('"+str(parID)+"', '"+str(parValue)+"') ON CONFLICT(par_ID) DO UPDATE SET par_Value='"+str(parValue)+"' where par_ID='"+str(parID)+"'") db.commitDB() #------------------------------------------------------------------------------- class json_obj: def __init__(self, jsn, columnNames): self.json = jsn self.columnNames = columnNames #------------------------------------------------------------------------------- class noti_obj: def __init__(self, json, text, html): self.json = json self.text = text self.html = html