#!/usr/bin/env python import os import time import pathlib import argparse import sys import hashlib import csv import subprocess import re import base64 import sqlite3 from io import StringIO from datetime import datetime # Register NetAlertX directories INSTALL_PATH="/app" sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"]) from plugin_helper import Plugin_Object, Plugin_Objects, decodeBase64 from logger import mylog, Logger, append_line_to_file from helper import timeNowTZ, check_IP_format, get_setting_value from const import logPath, applicationPath, fullDbPath import conf from pytz import timezone # Make sure the TIMEZONE for logging is correct conf.tz = timezone(get_setting_value('TIMEZONE')) # Make sure log level is initialized correctly Logger(get_setting_value('LOG_LEVEL')) pluginName = 'INTRNT' LOG_PATH = logPath + '/plugins' LOG_FILE = os.path.join(LOG_PATH, f'script.{pluginName}.log') RESULT_FILE = os.path.join(LOG_PATH, f'last_result.{pluginName}.log') no_internet_ip = '0.0.0.0' def main(): mylog('verbose', [f'[{pluginName}] In script']) parser = argparse.ArgumentParser(description='Check internet connectivity and IP') parser.add_argument('prev_ip', action="store", help="Previous IP address to compare against the current IP") parser.add_argument('DIG_GET_IP_ARG', action="store", help="Arguments for the 'dig' command to retrieve the IP address") # unused values = parser.parse_args() PREV_IP = values.prev_ip.split('=')[1] DIG_GET_IP_ARG = get_setting_value("INTRNT_DIG_GET_IP_ARG") new_internet_IP = no_internet_ip mylog('verbose', [f'[{pluginName}] INTRNT_DIG_GET_IP_ARG: ', DIG_GET_IP_ARG]) # METHOD 1: dig # perform the new IP lookup N times specified by the INTRNT_TRIES setting INTRNT_RETRIES = get_setting_value("INTRNT_RETRIES") retries_needed = 0 for i in range(INTRNT_RETRIES + 1): new_internet_IP, cmd_output = check_internet_IP( PREV_IP, DIG_GET_IP_ARG) if new_internet_IP == no_internet_ip: time.sleep(1*i) # Exponential backoff strategy else: retries_needed = i break # METHOD 2: curl if new_internet_IP == no_internet_ip: new_internet_IP, cmd_output = fallback_check_ip() mylog('verbose', [f'[{pluginName}] Curl Fallback (new_internet_IP|cmd_output): {new_internet_IP} | {cmd_output}']) # logging append_line_to_file (logPath + '/IP_changes.log', '['+str(timeNowTZ()) +']\t'+ new_internet_IP +'\n') plugin_objects = Plugin_Objects(RESULT_FILE) plugin_objects.add_object( primaryId = 'Internet', # MAC (Device Name) secondaryId = new_internet_IP, # IP Address watched1 = f'Previous IP: {PREV_IP}', watched2 = cmd_output.replace('\n',''), watched3 = retries_needed, watched4 = 'Gateway', extra = f'Previous IP: {PREV_IP}', foreignKey = 'Internet') plugin_objects.write_result_file() mylog('verbose', [f'[{pluginName}] Finished ']) return 0 #=============================================================================== # INTERNET IP CHANGE #=============================================================================== def check_internet_IP ( PREV_IP, DIG_GET_IP_ARG ): # Get Internet IP mylog('verbose', [f'[{pluginName}] - Retrieving Internet IP']) internet_IP, cmd_output = get_internet_IP(DIG_GET_IP_ARG) mylog('verbose', [f'[{pluginName}] Current internet_IP : {internet_IP}']) # Check previously stored IP previous_IP = no_internet_ip if PREV_IP is not None and len(PREV_IP) > 0 : previous_IP = PREV_IP mylog('verbose', [f'[{pluginName}] previous_IP : {previous_IP}']) return internet_IP, cmd_output #------------------------------------------------------------------------------- def get_internet_IP (DIG_GET_IP_ARG): cmd_output = '' # Using 'dig' dig_args = ['dig', '+short'] + DIG_GET_IP_ARG.strip().split() try: cmd_output = subprocess.check_output (dig_args, universal_newlines=True) mylog('verbose', [f'[{pluginName}] DIG result : {cmd_output}']) except subprocess.CalledProcessError as e: mylog('verbose', [e.output]) cmd_output = '' # no internet # Check result is an IP IP = check_IP_format (cmd_output) # Handle invalid response if IP == '': IP = no_internet_ip return IP, cmd_output #------------------------------------------------------------------------------- def fallback_check_ip(): """Fallback mechanism using `curl ifconfig.me/ip`.""" try: cmd_output = subprocess.check_output(['curl', '-s', 'ifconfig.me/ip'], text=True).strip() if cmd_output and re.match(r"^\d{1,3}(\.\d{1,3}){3}$", cmd_output): mylog('verbose', [f'[{pluginName}] Fallback IP retrieved via curl: {cmd_output}']) return cmd_output, f'Fallback via curl: "{cmd_output}"' else: mylog('verbose', [f'[{pluginName}] Invalid IP received from fallback']) return no_internet_ip, f'Fallback via curl failed: "{cmd_output}"' except Exception as e: mylog('none', [f'[{pluginName}] Fallback curl exception: {e}']) return no_internet_ip, f'Fallback via curl exception: "{e}"' #=============================================================================== # BEGIN #=============================================================================== if __name__ == '__main__': main()