mirror of
https://github.com/jokob-sk/NetAlertX.git
synced 2025-12-07 09:36:05 -08:00
84 lines
2.5 KiB
Python
Executable File
84 lines
2.5 KiB
Python
Executable File
import sys
|
|
import re
|
|
import subprocess
|
|
import socket
|
|
import dns.resolver
|
|
|
|
# Register NetAlertX directories
|
|
INSTALL_PATH = "/app"
|
|
sys.path.extend([f"{INSTALL_PATH}/server"])
|
|
|
|
import conf
|
|
from const import *
|
|
from logger import mylog
|
|
from helper import get_setting_value
|
|
|
|
class ResolvedName:
|
|
def __init__(self, raw: str = "(name not found)", cleaned: str = "(name not found)"):
|
|
self.raw = raw
|
|
self.cleaned = cleaned
|
|
|
|
def __str__(self):
|
|
return self.cleaned
|
|
|
|
class NameResolver:
|
|
def __init__(self, db):
|
|
self.db = db
|
|
|
|
def resolve_from_plugin(self, plugin: str, pMAC: str, pIP: str) -> ResolvedName:
|
|
sql = self.db.sql
|
|
nameNotFound = ResolvedName()
|
|
|
|
# Check by MAC
|
|
sql.execute(f"""
|
|
SELECT Watched_Value2 FROM Plugins_Objects
|
|
WHERE Plugin = '{plugin}' AND Object_PrimaryID = '{pMAC}'
|
|
""")
|
|
result = sql.fetchall()
|
|
self.db.commitDB()
|
|
if result:
|
|
raw = result[0][0]
|
|
return ResolvedName(raw, self.clean_device_name(raw, False))
|
|
|
|
# Check by IP
|
|
sql.execute(f"""
|
|
SELECT Watched_Value2 FROM Plugins_Objects
|
|
WHERE Plugin = '{plugin}' AND Object_SecondaryID = '{pIP}'
|
|
""")
|
|
result = sql.fetchall()
|
|
self.db.commitDB()
|
|
if result:
|
|
raw = result[0][0]
|
|
return ResolvedName(raw, self.clean_device_name(raw, True))
|
|
|
|
return nameNotFound
|
|
|
|
def resolve_mdns(self, pMAC, pIP) -> ResolvedName:
|
|
return self.resolve_from_plugin("AVAHISCAN", pMAC, pIP)
|
|
|
|
def resolve_nslookup(self, pMAC, pIP) -> ResolvedName:
|
|
return self.resolve_from_plugin("NSLOOKUP", pMAC, pIP)
|
|
|
|
def resolve_nbtlookup(self, pMAC, pIP) -> ResolvedName:
|
|
return self.resolve_from_plugin("NBTSCAN", pMAC, pIP)
|
|
|
|
def resolve_dig(self, pMAC, pIP) -> ResolvedName:
|
|
return self.resolve_from_plugin("DIGSCAN", pMAC, pIP)
|
|
|
|
def clean_device_name(self, name: str, match_ip: bool) -> str:
|
|
mylog('debug', [f"[cleanDeviceName] input: {name}"])
|
|
|
|
if match_ip:
|
|
name += " (IP match)"
|
|
|
|
regexes = get_setting_value('NEWDEV_NAME_CLEANUP_REGEX') or []
|
|
for rgx in regexes:
|
|
mylog('trace', [f"[cleanDeviceName] applying regex: {rgx}"])
|
|
name = re.sub(rgx, "", name)
|
|
|
|
name = re.sub(r'\.$', '', name)
|
|
name = name.replace(". (IP match)", " (IP match)")
|
|
|
|
mylog('debug', [f"[cleanDeviceName] output: {name}"])
|
|
return name
|