PLG: NMAP make param handling more robust #1288
Some checks failed
Code checks / check-url-paths (push) Has been cancelled
docker / docker_dev (push) Has been cancelled
Deploy MkDocs / deploy (push) Has been cancelled

Signed-off-by: jokob-sk <jokob.sk@gmail.com>
This commit is contained in:
jokob-sk
2025-11-16 10:16:23 +11:00
parent 093d595fc5
commit dbd1bdabc2

View File

@@ -9,7 +9,7 @@ import subprocess
INSTALL_PATH = os.getenv('NETALERTX_APP', '/app')
sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
from plugin_helper import Plugin_Objects, decodeBase64
from plugin_helper import Plugin_Objects
from logger import mylog, Logger, append_line_to_file
from utils.datetime_utils import timeNowDB
from helper import get_setting_value
@@ -29,33 +29,59 @@ LOG_PATH = logPath + '/plugins'
LOG_FILE = os.path.join(LOG_PATH, f'script.{pluginName}.log')
RESULT_FILE = os.path.join(LOG_PATH, f'last_result.{pluginName}.log')
# Initialize the Plugin obj output file
plugin_objects = Plugin_Objects(RESULT_FILE)
#-------------------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser(description='Scan ports of devices specified by IP addresses')
parser.add_argument('ips', nargs='+', help="list of IPs to scan")
parser.add_argument('macs', nargs='+', help="list of MACs related to the supplied IPs in the same order")
parser.add_argument('timeout', nargs='+', help="timeout")
parser.add_argument('args', nargs='+', help="args")
values = parser.parse_args()
parser = argparse.ArgumentParser(
description='Scan ports of devices specified by IP addresses'
)
# Plugin_Objects is a class that reads data from the RESULT_FILE
# and returns a list of results.
plugin_objects = Plugin_Objects(RESULT_FILE)
# Accept ANY key=value pairs
parser.add_argument('params', nargs='+', help="key=value style params")
# Print a message to indicate that the script is starting.
mylog('debug', [f'[{pluginName}] In script '])
raw = parser.parse_args()
# Printing the params list to check its content.
mylog('debug', [f'[{pluginName}] values.ips: ', values.ips])
mylog('debug', [f'[{pluginName}] values.macs: ', values.macs])
mylog('debug', [f'[{pluginName}] values.timeout: ', values.timeout])
mylog('debug', [f'[{pluginName}] values.args: ', values.args])
try:
args = parse_kv_args(raw.params)
except ValueError as e:
mylog('error', [f"[{pluginName}] Argument error: {e}"])
sys.exit(1)
argsDecoded = decodeBase64(values.args[0].split('=b')[1])
# Required keys
required = ['ips', 'macs']
for key in required:
if key not in args:
mylog('error', [f"[{pluginName}] Missing required parameter: {key}"])
sys.exit(1)
mylog('debug', [f'[{pluginName}] argsDecoded: ', argsDecoded])
# Parse lists
ip_list = safe_split_list(args['ips'], "ips")
mac_list = safe_split_list(args['macs'], "macs")
entries = performNmapScan(values.ips[0].split('=')[1].split(','), values.macs[0].split('=')[1].split(',') , values.timeout[0].split('=')[1], argsDecoded)
if len(ip_list) != len(mac_list):
mylog('error', [
f"[{pluginName}] Mismatch: {len(ip_list)} IPs but {len(mac_list)} MACs"
])
sys.exit(1)
# Optional
timeout = int(args.get("timeout", get_setting_value("NMAP_RUN_TIMEOUT")))
NMAP_ARGS = get_setting_value("NMAP_ARGS")
mylog('debug', [f'[{pluginName}] Parsed IPs: {ip_list}'])
mylog('debug', [f'[{pluginName}] Parsed MACs: {mac_list}'])
mylog('debug', [f'[{pluginName}] Timeout: {timeout}'])
mylog('debug', [f'[{pluginName}] NMAP_ARGS: {NMAP_ARGS}'])
entries = performNmapScan(
ip_list,
mac_list,
timeout,
NMAP_ARGS
)
mylog('verbose', [f'[{pluginName}] Total number of ports found by NMAP: ', len(entries)])
@@ -89,6 +115,35 @@ class nmap_entry:
self.hash = str(mac) + str(port)+ str(state)+ str(service)
#-------------------------------------------------------------------------------
def parse_kv_args(raw_args):
"""
Converts ['ips=a,b,c', 'macs=x,y,z', 'timeout=5'] to a dict.
Ignores unknown keys.
"""
parsed = {}
for item in raw_args:
if '=' not in item:
mylog('none', [f"[{pluginName}] Scan: Invalid parameter (missing '='): {item}"])
key, value = item.split('=', 1)
if key in parsed:
mylog('none', [f"[{pluginName}] Scan: Duplicate parameter supplied: {key}"])
parsed[key] = value
return parsed
#-------------------------------------------------------------------------------
def safe_split_list(value, keyname):
"""Split comma list safely and ensure no empty items."""
items = [x.strip() for x in value.split(',') if x.strip()]
if not items:
mylog('none', [f"[{pluginName}] Scan: {keyname} list is empty or invalid"])
return items
#-------------------------------------------------------------------------------
def performNmapScan(deviceIPs, deviceMACs, timeoutSec, args):
"""