mirror of
https://github.com/jokob-sk/NetAlertX.git
synced 2026-03-31 07:12:23 -07:00
@@ -75,6 +75,34 @@
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"function": "MODE",
|
||||
"events": ["run"],
|
||||
"type": {
|
||||
"dataType": "string",
|
||||
"elements": [
|
||||
{ "elementType": "select", "elementOptions": [], "transformers": [] }
|
||||
]
|
||||
},
|
||||
"default_value": "ping",
|
||||
"options": [
|
||||
"ping",
|
||||
"fping"
|
||||
],
|
||||
"localized": ["name", "description"],
|
||||
"name": [
|
||||
{
|
||||
"language_code": "en_us",
|
||||
"string": "Mode"
|
||||
}
|
||||
],
|
||||
"description": [
|
||||
{
|
||||
"language_code": "en_us",
|
||||
"string": "Selects the ICMP engine to use. <code>ping</code> checks devices individually and works even when the ARP / neighbor cache is empty, but is slower on larger networks. <code>fping</code> scans IP ranges in parallel and is significantly faster, but relies on the system neighbor cache to resolve IP addresses to MAC addresses. For most networks, <code>fping</code> is recommended. The default command arguments <code>ICMP_ARGS</code> are compatible with both modes."
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"function": "CMD",
|
||||
"type": {
|
||||
@@ -115,7 +143,7 @@
|
||||
}
|
||||
]
|
||||
},
|
||||
"default_value": "-i 0.5 -c 3 -W 4 -w 5",
|
||||
"default_value": "-i 0.5 -c 3 -w 5",
|
||||
"options": [],
|
||||
"localized": ["name", "description"],
|
||||
"name": [
|
||||
@@ -127,7 +155,7 @@
|
||||
"description": [
|
||||
{
|
||||
"language_code": "en_us",
|
||||
"string": "Arguments passed to the <code>ping</code> command. Please be careful modifying these."
|
||||
"string": "Arguments passed to the underlying <code>ping</code> or <code>fping</code> command. The default values are compatible with both modes and work well in most environments. Modify with care, and consult the relevant manual pages if advanced tuning is required."
|
||||
}
|
||||
]
|
||||
},
|
||||
@@ -159,6 +187,41 @@
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"function": "FAKE_MAC",
|
||||
"type": {
|
||||
"dataType": "boolean",
|
||||
"elements": [
|
||||
{
|
||||
"elementType": "input",
|
||||
"elementOptions": [
|
||||
{
|
||||
"type": "checkbox"
|
||||
}
|
||||
],
|
||||
"transformers": []
|
||||
}
|
||||
]
|
||||
},
|
||||
"default_value": false,
|
||||
"options": [],
|
||||
"localized": [
|
||||
"name",
|
||||
"description"
|
||||
],
|
||||
"name": [
|
||||
{
|
||||
"language_code": "en_us",
|
||||
"string": "Fake MAC"
|
||||
}
|
||||
],
|
||||
"description": [
|
||||
{
|
||||
"language_code": "en_us",
|
||||
"string": "If enabled and the mode is set to <code>fping</code>, the plugin will also discover new devices not already in the database. Enabling this setting generates a fake MAC address from the IP address to track devices. This may cause inconsistencies if IPs change or devices are re-discovered with a different MAC. Static IPs are recommended. Device type and icon might not be detected correctly, and some plugins may fail if they rely on a valid MAC address. When unchecked, devices without a MAC address are skipped."
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"function": "RUN_SCHD",
|
||||
"type": {
|
||||
|
||||
@@ -16,6 +16,7 @@ from logger import mylog, Logger # noqa: E402 [flake8 lint suppression]
|
||||
from helper import get_setting_value # noqa: E402 [flake8 lint suppression]
|
||||
from const import logPath # noqa: E402 [flake8 lint suppression]
|
||||
from models.device_instance import DeviceInstance # noqa: E402 [flake8 lint suppression]
|
||||
from utils.crypto_utils import string_to_mac_hash # noqa: E402 [flake8 lint suppression]
|
||||
import conf # noqa: E402 [flake8 lint suppression]
|
||||
from pytz import timezone # noqa: E402 [flake8 lint suppression]
|
||||
|
||||
@@ -32,13 +33,39 @@ LOG_FILE = os.path.join(LOG_PATH, f'script.{pluginName}.log')
|
||||
RESULT_FILE = os.path.join(LOG_PATH, f'last_result.{pluginName}.log')
|
||||
|
||||
|
||||
def parse_scan_subnets(subnets):
|
||||
"""Extract subnet and interface from SCAN_SUBNETS"""
|
||||
ranges = []
|
||||
interfaces = []
|
||||
for entry in subnets:
|
||||
parts = entry.split("--interface=")
|
||||
ranges.append(parts[0].strip())
|
||||
if len(parts) > 1:
|
||||
interfaces.append(parts[1].strip())
|
||||
return ranges, interfaces
|
||||
|
||||
|
||||
def get_device_by_ip(ip, all_devices):
|
||||
"""Get existing device based on IP"""
|
||||
for device in all_devices:
|
||||
if device["devLastIP"] == ip:
|
||||
return device
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def main():
|
||||
|
||||
mylog('verbose', [f'[{pluginName}] In script'])
|
||||
|
||||
timeout = get_setting_value('ICMP_RUN_TIMEOUT')
|
||||
args = get_setting_value('ICMP_ARGS')
|
||||
in_regex = get_setting_value('ICMP_IN_REGEX')
|
||||
regex = get_setting_value('ICMP_IN_REGEX')
|
||||
mode = get_setting_value('ICMP_MODE')
|
||||
fakeMac = get_setting_value('ICMP_FAKE_MAC')
|
||||
scan_subnets = get_setting_value("SCAN_SUBNETS")
|
||||
|
||||
subnets, interfaces = parse_scan_subnets(scan_subnets)
|
||||
|
||||
# Initialize the Plugin obj output file
|
||||
plugin_objects = Plugin_Objects(RESULT_FILE)
|
||||
@@ -50,33 +77,13 @@ def main():
|
||||
all_devices = device_handler.getAll()
|
||||
|
||||
# Compile the regex for efficiency if it will be used multiple times
|
||||
regex_pattern = re.compile(in_regex)
|
||||
regex_pattern = re.compile(regex)
|
||||
|
||||
# Filter devices based on the regex match
|
||||
filtered_devices = [
|
||||
device for device in all_devices
|
||||
if regex_pattern.match(device['devLastIP'])
|
||||
]
|
||||
if mode == "ping":
|
||||
plugin_objects = execute_ping(timeout, args, all_devices, regex_pattern, plugin_objects)
|
||||
|
||||
mylog('verbose', [f'[{pluginName}] Devices to PING: {len(filtered_devices)}'])
|
||||
|
||||
for device in filtered_devices:
|
||||
is_online, output = execute_scan(device['devLastIP'], timeout, args)
|
||||
|
||||
mylog('verbose', [f"[{pluginName}] ip: {device['devLastIP']} is_online: {is_online}"])
|
||||
|
||||
if is_online:
|
||||
plugin_objects.add_object(
|
||||
# "MAC", "IP", "Name", "Output"
|
||||
primaryId = device['devMac'],
|
||||
secondaryId = device['devLastIP'],
|
||||
watched1 = device['devName'],
|
||||
watched2 = output.replace('\n', ''),
|
||||
watched3 = '',
|
||||
watched4 = '',
|
||||
extra = '',
|
||||
foreignKey = device['devMac']
|
||||
)
|
||||
elif mode == "fping":
|
||||
plugin_objects = execute_fping(timeout, args, all_devices, plugin_objects, subnets, interfaces, fakeMac)
|
||||
|
||||
plugin_objects.write_result_file()
|
||||
|
||||
@@ -88,27 +95,24 @@ def main():
|
||||
# ===============================================================================
|
||||
# Execute scan
|
||||
# ===============================================================================
|
||||
def execute_scan(ip, timeout, args):
|
||||
def execute_ping(timeout, args, all_devices, regex_pattern, plugin_objects):
|
||||
"""
|
||||
Execute the ICMP command on IP.
|
||||
Execute ICMP command on filtered devices.
|
||||
"""
|
||||
|
||||
icmp_args = ['ping'] + args.split() + [ip]
|
||||
# Filter devices based on the regex match
|
||||
filtered_devices = [
|
||||
device for device in all_devices
|
||||
if regex_pattern.match(device['devLastIP'])
|
||||
]
|
||||
|
||||
# Execute command
|
||||
output = ""
|
||||
mylog('verbose', [f'[{pluginName}] Devices to PING: {len(filtered_devices)}'])
|
||||
|
||||
try:
|
||||
# try runnning a subprocess with a forced (timeout) in case the subprocess hangs
|
||||
output = subprocess.check_output(
|
||||
icmp_args,
|
||||
universal_newlines=True,
|
||||
stderr=subprocess.STDOUT,
|
||||
timeout=(timeout),
|
||||
text=True
|
||||
)
|
||||
for device in filtered_devices:
|
||||
|
||||
mylog('verbose', [f'[{pluginName}] DEBUG OUTPUT : {output}'])
|
||||
cmd = ["ping"] + args.split() + [device['devLastIP']]
|
||||
|
||||
output = ""
|
||||
|
||||
# Parse output using case-insensitive regular expressions
|
||||
# Synology-NAS:/# ping -i 0.5 -c 3 -W 8 -w 9 192.168.1.82
|
||||
@@ -128,31 +132,115 @@ def execute_scan(ip, timeout, args):
|
||||
# --- 192.168.1.92 ping statistics ---
|
||||
# 3 packets transmitted, 0 packets received, 100% packet loss
|
||||
|
||||
# TODO: parse output and return True if online, False if Offline (100% packet loss, bad address)
|
||||
is_online = True
|
||||
try:
|
||||
output = subprocess.check_output(
|
||||
cmd, universal_newlines=True, stderr=subprocess.STDOUT, timeout=timeout, text=True
|
||||
)
|
||||
|
||||
mylog("verbose", [f"[{pluginName}] DEBUG OUTPUT : {output}"])
|
||||
|
||||
# Check for 0% packet loss in the output
|
||||
if re.search(r"0% packet loss", output, re.IGNORECASE):
|
||||
is_online = True
|
||||
elif re.search(r"bad address", output, re.IGNORECASE):
|
||||
is_online = False
|
||||
elif re.search(r"100% packet loss", output, re.IGNORECASE):
|
||||
is_online = False
|
||||
if re.search(r"0% packet loss", output, re.IGNORECASE):
|
||||
is_online = True
|
||||
elif re.search(r"bad address", output, re.IGNORECASE):
|
||||
is_online = False
|
||||
elif re.search(r"100% packet loss", output, re.IGNORECASE):
|
||||
is_online = False
|
||||
|
||||
return is_online, output
|
||||
if is_online:
|
||||
|
||||
except subprocess.CalledProcessError as e:
|
||||
# An error occurred, handle it
|
||||
mylog('verbose', [f'[{pluginName}] ⚠ ERROR - check logs'])
|
||||
mylog('verbose', [f'[{pluginName}]', e.output])
|
||||
plugin_objects.add_object(
|
||||
# "MAC", "IP", "Name", "Output"
|
||||
primaryId = device['devMac'],
|
||||
secondaryId = device['devLastIP'],
|
||||
watched1 = device['devName'],
|
||||
watched2 = output.replace('\n', ''),
|
||||
watched3 = '',
|
||||
watched4 = '',
|
||||
extra = '',
|
||||
foreignKey = device['devMac']
|
||||
)
|
||||
|
||||
return False, output
|
||||
mylog('verbose', [f"[{pluginName}] ip: {device['devLastIP']} is_online: {is_online}"])
|
||||
|
||||
except subprocess.CalledProcessError as e:
|
||||
mylog("verbose", [f"[{pluginName}] ⚠ ERROR - check logs"])
|
||||
mylog("verbose", [f"[{pluginName}]", e.output])
|
||||
except subprocess.TimeoutExpired:
|
||||
mylog("verbose", [f"[{pluginName}] TIMEOUT - process terminated"])
|
||||
|
||||
return plugin_objects
|
||||
|
||||
|
||||
def execute_fping(timeout, args, all_devices, plugin_objects, subnets, interfaces, fakeMac):
|
||||
"""
|
||||
Run fping command and return alive IPs
|
||||
"""
|
||||
cmd = ["fping", "-a"]
|
||||
|
||||
if interfaces:
|
||||
cmd += ["-I", ",".join(interfaces)]
|
||||
|
||||
# Build a lookup dict once
|
||||
device_map = {d["devLastIP"]: d for d in all_devices if d.get("devLastIP")}
|
||||
|
||||
known_ips = list(device_map.keys())
|
||||
online_ips = []
|
||||
|
||||
cmd += args.split()
|
||||
cmd += subnets
|
||||
cmd += known_ips
|
||||
|
||||
mylog("verbose", [f"[{pluginName}] fping cmd: {' '.join(cmd)}"])
|
||||
|
||||
try:
|
||||
output = subprocess.check_output(
|
||||
cmd,
|
||||
stderr=subprocess.DEVNULL,
|
||||
timeout=timeout,
|
||||
text=True
|
||||
)
|
||||
online_ips = [line.strip() for line in output.splitlines() if line.strip()]
|
||||
|
||||
except subprocess.CalledProcessError:
|
||||
online_ips = []
|
||||
|
||||
except subprocess.TimeoutExpired:
|
||||
mylog('verbose', [f'[{pluginName}] TIMEOUT - the process forcefully terminated as timeout reached'])
|
||||
return False, output
|
||||
mylog("verbose", [f"[{pluginName}] fping timeout"])
|
||||
online_ips = []
|
||||
|
||||
return False, output
|
||||
# process all online IPs
|
||||
for onlineIp in online_ips:
|
||||
if onlineIp in known_ips:
|
||||
# use lookup dict instead of looping
|
||||
device = device_map.get(onlineIp)
|
||||
if device:
|
||||
plugin_objects.add_object(
|
||||
primaryId = device['devMac'],
|
||||
secondaryId = device['devLastIP'],
|
||||
watched1 = device['devName'],
|
||||
watched2 = 'mode:fping',
|
||||
watched3 = '',
|
||||
watched4 = '',
|
||||
extra = '',
|
||||
foreignKey = device['devMac']
|
||||
)
|
||||
else:
|
||||
mylog("none", [f"[{pluginName}] ERROR reverse device lookup failed unexpectedly for {onlineIp}"])
|
||||
elif fakeMac:
|
||||
fakeMacFromIp = string_to_mac_hash(onlineIp)
|
||||
plugin_objects.add_object(
|
||||
primaryId = fakeMacFromIp,
|
||||
secondaryId = onlineIp,
|
||||
watched1 = "(unknown)",
|
||||
watched2 = 'mode:fping',
|
||||
watched3 = '',
|
||||
watched4 = '',
|
||||
extra = '',
|
||||
foreignKey = fakeMacFromIp
|
||||
)
|
||||
else:
|
||||
mylog('verbose', [f"[{pluginName}] Skipping: {onlineIp}, as new IP and ICMP_FAKE_MAC setting not enabled"])
|
||||
|
||||
|
||||
# ===============================================================================
|
||||
|
||||
Reference in New Issue
Block a user