Files
NetAlertX/server/scan/name_resolution.py
Jokob @NetAlertX c7399215ec Refactor event and session column names to camelCase
- Updated test cases to reflect new column names (eve_MAC -> eveMac, eve_DateTime -> eveDateTime, etc.) across various test files.
- Modified SQL table definitions in the database cleanup and migration tests to use camelCase naming conventions.
- Implemented migration tests to ensure legacy column names are correctly renamed to camelCase equivalents.
- Ensured that existing data is preserved during the migration process and that views referencing old column names are dropped before renaming.
- Verified that the migration function is idempotent, allowing for safe re-execution without data loss.
2026-03-16 10:11:22 +00:00

80 lines
2.6 KiB
Python
Executable File

import re
from logger import mylog
from helper import get_setting_value
class ResolvedName:
def __init__(
self, raw: str = "(name not found)", cleaned: str = "(name not found)"
):
self.raw = raw
self.cleaned = cleaned
def __str__(self):
return self.cleaned
class NameResolver:
def __init__(self, db):
self.db = db
def resolve_from_plugin(self, plugin: str, pMAC: str, pIP: str) -> ResolvedName:
sql = self.db.sql
nameNotFound = ResolvedName()
# Check by MAC
sql.execute(f"""
SELECT watchedValue2 FROM Plugins_Objects
WHERE plugin = '{plugin}' AND objectPrimaryId = '{pMAC}'
""")
result = sql.fetchall()
# self.db.commitDB() # Issue #1251: Optimize name resolution lookup
if result:
raw = result[0][0]
return ResolvedName(raw, self.clean_device_name(raw, False))
# Check name by IP if enabled
if get_setting_value('NEWDEV_IP_MATCH_NAME'):
sql.execute(f"""
SELECT watchedValue2 FROM Plugins_Objects
WHERE plugin = '{plugin}' AND objectSecondaryId = '{pIP}'
""")
result = sql.fetchall()
# self.db.commitDB() # Issue #1251: Optimize name resolution lookup
if result:
raw = result[0][0]
return ResolvedName(raw, self.clean_device_name(raw, True))
return nameNotFound
def resolve_mdns(self, pMAC, pIP) -> ResolvedName:
return self.resolve_from_plugin("AVAHISCAN", pMAC, pIP)
def resolve_nslookup(self, pMAC, pIP) -> ResolvedName:
return self.resolve_from_plugin("NSLOOKUP", pMAC, pIP)
def resolve_nbtlookup(self, pMAC, pIP) -> ResolvedName:
return self.resolve_from_plugin("NBTSCAN", pMAC, pIP)
def resolve_dig(self, pMAC, pIP) -> ResolvedName:
return self.resolve_from_plugin("DIGSCAN", pMAC, pIP)
def clean_device_name(self, name: str, match_ip: bool) -> str:
mylog("debug", [f"[cleanDeviceName] input: {name}"])
if match_ip:
name += " (IP match)"
regexes = get_setting_value('NEWDEV_NAME_CLEANUP_REGEX') or []
mylog('trace', [f"[cleanDeviceName] applying regexes: {regexes}"])
for rgx in regexes:
mylog("trace", [f"[cleanDeviceName] applying regex: {rgx}"])
name = re.sub(rgx, "", name)
name = re.sub(r"\.$", "", name)
name = name.replace(". (IP match)", " (IP match)")
mylog("debug", [f"[cleanDeviceName] output: {name}"])
return name