mirror of
https://github.com/jokob-sk/NetAlertX.git
synced 2025-12-07 09:36:05 -08:00
PLG: NMAP make param handling more robust #1288
Signed-off-by: jokob-sk <jokob.sk@gmail.com>
This commit is contained in:
@@ -9,7 +9,7 @@ import subprocess
|
|||||||
INSTALL_PATH = os.getenv('NETALERTX_APP', '/app')
|
INSTALL_PATH = os.getenv('NETALERTX_APP', '/app')
|
||||||
sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
|
sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
|
||||||
|
|
||||||
from plugin_helper import Plugin_Objects, decodeBase64
|
from plugin_helper import Plugin_Objects
|
||||||
from logger import mylog, Logger, append_line_to_file
|
from logger import mylog, Logger, append_line_to_file
|
||||||
from utils.datetime_utils import timeNowDB
|
from utils.datetime_utils import timeNowDB
|
||||||
from helper import get_setting_value
|
from helper import get_setting_value
|
||||||
@@ -29,33 +29,59 @@ LOG_PATH = logPath + '/plugins'
|
|||||||
LOG_FILE = os.path.join(LOG_PATH, f'script.{pluginName}.log')
|
LOG_FILE = os.path.join(LOG_PATH, f'script.{pluginName}.log')
|
||||||
RESULT_FILE = os.path.join(LOG_PATH, f'last_result.{pluginName}.log')
|
RESULT_FILE = os.path.join(LOG_PATH, f'last_result.{pluginName}.log')
|
||||||
|
|
||||||
|
# Initialize the Plugin obj output file
|
||||||
|
plugin_objects = Plugin_Objects(RESULT_FILE)
|
||||||
|
|
||||||
#-------------------------------------------------------------------------------
|
#-------------------------------------------------------------------------------
|
||||||
def main():
|
def main():
|
||||||
parser = argparse.ArgumentParser(description='Scan ports of devices specified by IP addresses')
|
parser = argparse.ArgumentParser(
|
||||||
parser.add_argument('ips', nargs='+', help="list of IPs to scan")
|
description='Scan ports of devices specified by IP addresses'
|
||||||
parser.add_argument('macs', nargs='+', help="list of MACs related to the supplied IPs in the same order")
|
)
|
||||||
parser.add_argument('timeout', nargs='+', help="timeout")
|
|
||||||
parser.add_argument('args', nargs='+', help="args")
|
|
||||||
values = parser.parse_args()
|
|
||||||
|
|
||||||
# Plugin_Objects is a class that reads data from the RESULT_FILE
|
# Accept ANY key=value pairs
|
||||||
# and returns a list of results.
|
parser.add_argument('params', nargs='+', help="key=value style params")
|
||||||
plugin_objects = Plugin_Objects(RESULT_FILE)
|
|
||||||
|
|
||||||
# Print a message to indicate that the script is starting.
|
raw = parser.parse_args()
|
||||||
mylog('debug', [f'[{pluginName}] In script '])
|
|
||||||
|
|
||||||
# Printing the params list to check its content.
|
try:
|
||||||
mylog('debug', [f'[{pluginName}] values.ips: ', values.ips])
|
args = parse_kv_args(raw.params)
|
||||||
mylog('debug', [f'[{pluginName}] values.macs: ', values.macs])
|
except ValueError as e:
|
||||||
mylog('debug', [f'[{pluginName}] values.timeout: ', values.timeout])
|
mylog('error', [f"[{pluginName}] Argument error: {e}"])
|
||||||
mylog('debug', [f'[{pluginName}] values.args: ', values.args])
|
sys.exit(1)
|
||||||
|
|
||||||
argsDecoded = decodeBase64(values.args[0].split('=b')[1])
|
# Required keys
|
||||||
|
required = ['ips', 'macs']
|
||||||
|
for key in required:
|
||||||
|
if key not in args:
|
||||||
|
mylog('error', [f"[{pluginName}] Missing required parameter: {key}"])
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
mylog('debug', [f'[{pluginName}] argsDecoded: ', argsDecoded])
|
# Parse lists
|
||||||
|
ip_list = safe_split_list(args['ips'], "ips")
|
||||||
|
mac_list = safe_split_list(args['macs'], "macs")
|
||||||
|
|
||||||
entries = performNmapScan(values.ips[0].split('=')[1].split(','), values.macs[0].split('=')[1].split(',') , values.timeout[0].split('=')[1], argsDecoded)
|
if len(ip_list) != len(mac_list):
|
||||||
|
mylog('error', [
|
||||||
|
f"[{pluginName}] Mismatch: {len(ip_list)} IPs but {len(mac_list)} MACs"
|
||||||
|
])
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
# Optional
|
||||||
|
timeout = int(args.get("timeout", get_setting_value("NMAP_RUN_TIMEOUT")))
|
||||||
|
|
||||||
|
NMAP_ARGS = get_setting_value("NMAP_ARGS")
|
||||||
|
|
||||||
|
mylog('debug', [f'[{pluginName}] Parsed IPs: {ip_list}'])
|
||||||
|
mylog('debug', [f'[{pluginName}] Parsed MACs: {mac_list}'])
|
||||||
|
mylog('debug', [f'[{pluginName}] Timeout: {timeout}'])
|
||||||
|
mylog('debug', [f'[{pluginName}] NMAP_ARGS: {NMAP_ARGS}'])
|
||||||
|
|
||||||
|
entries = performNmapScan(
|
||||||
|
ip_list,
|
||||||
|
mac_list,
|
||||||
|
timeout,
|
||||||
|
NMAP_ARGS
|
||||||
|
)
|
||||||
|
|
||||||
mylog('verbose', [f'[{pluginName}] Total number of ports found by NMAP: ', len(entries)])
|
mylog('verbose', [f'[{pluginName}] Total number of ports found by NMAP: ', len(entries)])
|
||||||
|
|
||||||
@@ -89,6 +115,35 @@ class nmap_entry:
|
|||||||
self.hash = str(mac) + str(port)+ str(state)+ str(service)
|
self.hash = str(mac) + str(port)+ str(state)+ str(service)
|
||||||
|
|
||||||
|
|
||||||
|
#-------------------------------------------------------------------------------
|
||||||
|
def parse_kv_args(raw_args):
|
||||||
|
"""
|
||||||
|
Converts ['ips=a,b,c', 'macs=x,y,z', 'timeout=5'] to a dict.
|
||||||
|
Ignores unknown keys.
|
||||||
|
"""
|
||||||
|
parsed = {}
|
||||||
|
|
||||||
|
for item in raw_args:
|
||||||
|
if '=' not in item:
|
||||||
|
mylog('none', [f"[{pluginName}] Scan: Invalid parameter (missing '='): {item}"])
|
||||||
|
|
||||||
|
key, value = item.split('=', 1)
|
||||||
|
|
||||||
|
if key in parsed:
|
||||||
|
mylog('none', [f"[{pluginName}] Scan: Duplicate parameter supplied: {key}"])
|
||||||
|
|
||||||
|
parsed[key] = value
|
||||||
|
|
||||||
|
return parsed
|
||||||
|
|
||||||
|
#-------------------------------------------------------------------------------
|
||||||
|
def safe_split_list(value, keyname):
|
||||||
|
"""Split comma list safely and ensure no empty items."""
|
||||||
|
items = [x.strip() for x in value.split(',') if x.strip()]
|
||||||
|
if not items:
|
||||||
|
mylog('none', [f"[{pluginName}] Scan: {keyname} list is empty or invalid"])
|
||||||
|
return items
|
||||||
|
|
||||||
#-------------------------------------------------------------------------------
|
#-------------------------------------------------------------------------------
|
||||||
def performNmapScan(deviceIPs, deviceMACs, timeoutSec, args):
|
def performNmapScan(deviceIPs, deviceMACs, timeoutSec, args):
|
||||||
"""
|
"""
|
||||||
|
|||||||
Reference in New Issue
Block a user