mirror of
https://github.com/jokob-sk/NetAlertX.git
synced 2026-04-02 16:22:20 -07:00
FIX: lowercase MAC normalization across project v0.1
Signed-off-by: jokob-sk <jokob.sk@gmail.com>
This commit is contained in:
@@ -81,14 +81,14 @@ def ddns_update(DDNS_UPDATE_URL, DDNS_USER, DDNS_PASSWORD, DDNS_DOMAIN, PREV_IP)
|
||||
|
||||
# plugin_objects = Plugin_Objects(RESULT_FILE)
|
||||
# plugin_objects.add_object(
|
||||
# primaryId = 'Internet', # MAC (Device Name)
|
||||
# primaryId = 'internet', # MAC (Device Name)
|
||||
# secondaryId = new_internet_IP, # IP Address
|
||||
# watched1 = f'Previous IP: {PREV_IP}',
|
||||
# watched2 = '',
|
||||
# watched3 = '',
|
||||
# watched4 = '',
|
||||
# extra = f'Previous IP: {PREV_IP}',
|
||||
# foreignKey = 'Internet')
|
||||
# foreignKey = 'internet')
|
||||
|
||||
# plugin_objects.write_result_file()
|
||||
|
||||
|
||||
@@ -79,14 +79,14 @@ def main():
|
||||
plugin_objects = Plugin_Objects(RESULT_FILE)
|
||||
|
||||
plugin_objects.add_object(
|
||||
primaryId = 'Internet', # MAC (Device Name)
|
||||
primaryId = 'internet', # MAC (Device Name)
|
||||
secondaryId = new_internet_IP, # IP Address
|
||||
watched1 = f'Previous IP: {PREV_IP}',
|
||||
watched2 = cmd_output.replace('\n', ''),
|
||||
watched3 = retries_needed,
|
||||
watched4 = 'Gateway',
|
||||
extra = f'Previous IP: {PREV_IP}',
|
||||
foreignKey = 'Internet'
|
||||
foreignKey = 'internet'
|
||||
)
|
||||
|
||||
plugin_objects.write_result_file()
|
||||
@@ -101,8 +101,8 @@ def main():
|
||||
# ===============================================================================
|
||||
def check_internet_IP(PREV_IP, DIG_GET_IP_ARG):
|
||||
|
||||
# Get Internet IP
|
||||
mylog('verbose', [f'[{pluginName}] - Retrieving Internet IP'])
|
||||
# Get internet IP
|
||||
mylog('verbose', [f'[{pluginName}] - Retrieving internet IP'])
|
||||
internet_IP, cmd_output = get_internet_IP(DIG_GET_IP_ARG)
|
||||
|
||||
mylog('verbose', [f'[{pluginName}] Current internet_IP : {internet_IP}'])
|
||||
|
||||
@@ -330,8 +330,8 @@ def main():
|
||||
myssid = device[PORT_SSID] if not device[PORT_SSID].isdigit() else ""
|
||||
ParentNetworkNode = (
|
||||
ieee2ietf_mac_formater(device[SWITCH_AP])
|
||||
if device[SWITCH_AP] != "Internet"
|
||||
else "Internet"
|
||||
if device[SWITCH_AP].lower() != "internet"
|
||||
else "internet"
|
||||
)
|
||||
mymac = ieee2ietf_mac_formater(device[MAC])
|
||||
plugin_objects.add_object(
|
||||
@@ -665,7 +665,7 @@ def get_device_data(omada_clients_output, switches_and_aps, device_handler):
|
||||
device_data_bymac[default_router_mac][TYPE] = "Firewall"
|
||||
# step2 let's find the first switch and set the default router parent to internet
|
||||
first_switch = device_data_bymac[default_router_mac][SWITCH_AP]
|
||||
device_data_bymac[default_router_mac][SWITCH_AP] = "Internet"
|
||||
device_data_bymac[default_router_mac][SWITCH_AP] = "internet"
|
||||
# step3 let's set the switch connected to the default gateway uplink to the default gateway and hardcode port to 1 for now:
|
||||
# device_data_bymac[first_switch][SWITCH_AP]=default_router_mac
|
||||
# device_data_bymac[first_switch][SWITCH_AP][PORT_SSID] = '1'
|
||||
|
||||
@@ -413,11 +413,11 @@ class OmadaData:
|
||||
|
||||
OmadaHelper.verbose(f"Making entry for: {entry['mac_address']}")
|
||||
|
||||
# If the device_type is gateway, set the parent_node to Internet
|
||||
# If the device_type is gateway, set the parent_node to internet
|
||||
device_type = entry["device_type"].lower()
|
||||
parent_node = entry["parent_node_mac_address"]
|
||||
if len(parent_node) == 0 and entry["device_type"] == "gateway" and is_typical_router_ip(entry["ip_address"]):
|
||||
parent_node = "Internet"
|
||||
parent_node = "internet"
|
||||
|
||||
# Some device type naming exceptions
|
||||
if device_type == "iphone":
|
||||
|
||||
@@ -177,27 +177,25 @@ def decode_settings_base64(encoded_str, convert_types=True):
|
||||
# -------------------------------------------------------------------
|
||||
def normalize_mac(mac):
|
||||
"""
|
||||
Normalize a MAC address to the standard format with colon separators.
|
||||
For example, "aa-bb-cc-dd-ee-ff" will be normalized to "AA:BB:CC:DD:EE:FF".
|
||||
Wildcard MAC addresses like "AA:BB:CC:*" will be normalized to "AA:BB:CC:*".
|
||||
normalize a mac address to the standard format with colon separators.
|
||||
for example, "AA-BB-CC-DD-EE-FF" will be normalized to "aa:bb:cc:dd:ee:ff".
|
||||
wildcard mac addresses like "AA:BB:CC:*" will be normalized to "aa:bb:cc:*".
|
||||
|
||||
:param mac: The MAC address to normalize.
|
||||
:return: The normalized MAC address.
|
||||
:param mac: the mac address to normalize.
|
||||
:return: the normalized mac address (lowercase).
|
||||
"""
|
||||
s = str(mac).strip()
|
||||
s = str(mac).strip().lower()
|
||||
|
||||
if s.lower() == "internet":
|
||||
return "Internet"
|
||||
if s == "internet":
|
||||
return "internet"
|
||||
|
||||
s = s.upper()
|
||||
|
||||
# Determine separator if present, prefer colon, then hyphen
|
||||
# determine separator if present, prefer colon, then hyphen
|
||||
if ':' in s:
|
||||
parts = s.split(':')
|
||||
elif '-' in s:
|
||||
parts = s.split('-')
|
||||
else:
|
||||
# No explicit separator; attempt to split every two chars
|
||||
# no explicit separator; attempt to split every two chars
|
||||
parts = [s[i:i + 2] for i in range(0, len(s), 2)]
|
||||
|
||||
normalized_parts = []
|
||||
@@ -206,10 +204,10 @@ def normalize_mac(mac):
|
||||
if part == '*':
|
||||
normalized_parts.append('*')
|
||||
else:
|
||||
# Ensure two hex digits (zfill is fine for alphanumeric input)
|
||||
# ensure two hex digits
|
||||
normalized_parts.append(part.zfill(2))
|
||||
|
||||
# Use colon as canonical separator
|
||||
# use colon as canonical separator
|
||||
return ':'.join(normalized_parts)
|
||||
|
||||
|
||||
|
||||
@@ -74,7 +74,7 @@ def main():
|
||||
watched1 = device['dev_name'], # name
|
||||
watched2 = device['dev_type'], # device_type (AP/Switch etc)
|
||||
watched3 = device['dev_connected'], # connectedAt or empty
|
||||
watched4 = device['dev_parent_mac'], # parent_mac or "Internet"
|
||||
watched4 = device['dev_parent_mac'], # parent_mac or "internet"
|
||||
extra = '',
|
||||
foreignKey = device['dev_mac']
|
||||
)
|
||||
@@ -115,10 +115,10 @@ def get_device_data(site, api):
|
||||
continue
|
||||
device_id_to_mac[dev["id"]] = dev.get("macAddress", "")
|
||||
|
||||
# Helper to resolve uplinkDeviceId to parent MAC, or "Internet" if no uplink
|
||||
# Helper to resolve uplinkDeviceId to parent MAC, or "internet" if no uplink
|
||||
def resolve_parent_mac(uplink_id):
|
||||
if not uplink_id:
|
||||
return "Internet"
|
||||
return "internet"
|
||||
return device_id_to_mac.get(uplink_id, "Unknown")
|
||||
|
||||
# Process Unifi devices
|
||||
|
||||
@@ -173,7 +173,7 @@ def collect_details(device_type, devices, online_macs, processed_macs, plugin_ob
|
||||
|
||||
# override parent MAC if this is a router
|
||||
if parentMac == 'null' and is_typical_router_ip(ipTmp):
|
||||
parentMac = 'Internet'
|
||||
parentMac = 'internet'
|
||||
|
||||
# Add object only if not processed
|
||||
if macTmp not in processed_macs and (status == 1 or force_import is True):
|
||||
|
||||
Reference in New Issue
Block a user