Files
NetAlertX/front/plugins/arp_scan/script.py
jokob-sk 139447b253 BE: mylog() better code radability
Signed-off-by: jokob-sk <jokob.sk@gmail.com>
2025-11-25 07:54:17 +11:00

192 lines
6.8 KiB
Python
Executable File

#!/usr/bin/env python
import os
import time
import argparse
import sys
import re
import base64
import subprocess
# Register NetAlertX directories
INSTALL_PATH = os.getenv('NETALERTX_APP', '/app')
sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
from database import DB # noqa: E402 [flake8 lint suppression]
from plugin_helper import Plugin_Objects, handleEmpty # noqa: E402 [flake8 lint suppression]
from logger import mylog, Logger # noqa: E402 [flake8 lint suppression]
from helper import get_setting_value # noqa: E402 [flake8 lint suppression]
from const import logPath # noqa: E402 [flake8 lint suppression]
import conf # noqa: E402 [flake8 lint suppression]
from pytz import timezone # noqa: E402 [flake8 lint suppression]
# Make sure the TIMEZONE for logging is correct
conf.tz = timezone(get_setting_value("TIMEZONE"))
# Make sure log level is initialized correctly
Logger(get_setting_value("LOG_LEVEL"))
pluginName = "ARPSCAN"
LOG_PATH = logPath + "/plugins"
LOG_FILE = os.path.join(LOG_PATH, f"script.{pluginName}.log")
RESULT_FILE = os.path.join(LOG_PATH, f"last_result.{pluginName}.log")
def main():
parser = argparse.ArgumentParser(description="Import devices from settings")
parser.add_argument("userSubnets", nargs="+", help="list of subnets with options")
values = parser.parse_args()
# Assuming Plugin_Objects is a class or function that reads data from the RESULT_FILE
# and returns a list of objects called 'devices'.
plugin_objects = Plugin_Objects(RESULT_FILE)
# Print a message to indicate that the script is starting.
mylog("verbose", [f"[{pluginName}] In script "])
# holds a list of user-submitted subnets.
# mylog('verbose', ['[ARP Scan] values.userSubnets: ', values.userSubnets])
# Extract the base64-encoded subnet information from the first element of the userSubnets list.
# The format of the element is assumed to be like 'userSubnets=<base64-encoded-data>'.
userSubnetsParamBase64 = values.userSubnets[0].split("userSubnets=")[1]
# Printing the extracted base64-encoded subnet information.
# mylog('verbose', ['[ARP Scan] userSubnetsParamBase64: ', userSubnetsParamBase64])
# Decode the base64-encoded subnet information to get the actual subnet information in ASCII format.
userSubnetsParam = base64.b64decode(userSubnetsParamBase64).decode("ascii")
# Print the decoded subnet information.
mylog("verbose", [f"[{pluginName}] userSubnetsParam: ", userSubnetsParam])
# Check if the decoded subnet information contains multiple subnets separated by commas.
# If it does, split the string into a list of individual subnets.
# Otherwise, create a list with a single element containing the subnet information.
if "," in userSubnetsParam:
subnets_list = userSubnetsParam.split(",")
else:
subnets_list = [userSubnetsParam]
# Create a database connection
db = DB() # instance of class DB
db.open()
# Execute the ARP scanning process on the list of subnets (whether it's one or multiple subnets).
# The function 'execute_arpscan' is assumed to be defined elsewhere in the code.
unique_devices = execute_arpscan(subnets_list)
for device in unique_devices:
plugin_objects.add_object(
primaryId=handleEmpty(device["mac"]), # MAC (Device Name)
secondaryId=handleEmpty(device["ip"]), # IP Address
watched1=handleEmpty(device["ip"]), # Device Name
watched2=handleEmpty(
device.get("hw", "")
), # Vendor (assuming it's in the 'hw' field)
watched3=handleEmpty(device.get("interface", "")), # Add the interface
watched4="",
extra=pluginName,
foreignKey="",
)
plugin_objects.write_result_file()
return 0
def execute_arpscan(userSubnets):
# output of possible multiple interfaces
arpscan_output = ""
devices_list = []
# scan each interface
for interface in userSubnets:
arpscan_output = execute_arpscan_on_interface(interface)
mylog("verbose", [f"[{pluginName}] arpscan_output: ", arpscan_output])
# Search IP + MAC + Vendor as regular expresion
re_ip = (
r"(?P<ip>((2[0-5]|1[0-9]|[0-9])?[0-9]\.){3}((2[0-5]|1[0-9]|[0-9])?[0-9]))"
)
re_mac = r"(?P<mac>([0-9a-fA-F]{2}[:-]){5}([0-9a-fA-F]{2}))"
re_hw = r"(?P<hw>.*)"
re_pattern = re.compile(rf"{re_ip}\s+{re_mac}\s{re_hw}")
devices_list_tmp = [
{**device.groupdict(), "interface": interface}
for device in re.finditer(re_pattern, arpscan_output)
]
devices_list += devices_list_tmp
# mylog('debug', ['[ARP Scan] Found: Devices including duplicates ', len(devices_list) ])
# Delete duplicate MAC
unique_mac = []
unique_devices = []
for device in devices_list:
if device["mac"] not in unique_mac:
unique_mac.append(device["mac"])
unique_devices.append(device)
# return list
mylog("verbose", [f"[{pluginName}] All devices List len:", len(devices_list)])
mylog("verbose", [f"[{pluginName}] Devices List:", devices_list])
mylog("verbose", [f"[{pluginName}] Found: Devices without duplicates ", len(unique_devices)],)
return unique_devices
def execute_arpscan_on_interface(interface):
# Prepare command arguments
arpscan_args = get_setting_value("ARPSCAN_ARGS").split() + interface.split()
# Optional duration in seconds (0 = run once)
try:
scan_duration = int(get_setting_value("ARPSCAN_DURATION"))
except Exception:
scan_duration = 0 # default: single run
# Get timeout from plugin settings (default 30 seconds if not set)
try:
timeout_seconds = int(get_setting_value("ARPSCAN_RUN_TIMEOUT"))
except Exception:
timeout_seconds = 30
results = []
start_time = time.time()
while True:
try:
result = subprocess.check_output(
arpscan_args, universal_newlines=True, timeout=timeout_seconds
)
results.append(result)
except subprocess.CalledProcessError:
result = ""
except subprocess.TimeoutExpired:
mylog("warning", [f"[{pluginName}] arp-scan timed out after {timeout_seconds}s"],)
result = ""
# stop looping if duration not set or expired
if scan_duration == 0 or (time.time() - start_time) > scan_duration:
break
time.sleep(2) # short delay between scans
# concatenate all outputs (for regex parsing)
return "\n".join(results)
# ===============================================================================
# BEGIN
# ===============================================================================
if __name__ == "__main__":
main()