Merge netalertx/main into openapi-mcp-improvements

This commit is contained in:
Adam Outler
2026-01-31 02:09:57 +01:00
44 changed files with 918 additions and 264 deletions

View File

@@ -339,6 +339,9 @@ class Query(ObjectType):
"devFQDN",
"devParentRelType",
"devParentMAC",
"devVlan",
"devPrimaryIPv4",
"devPrimaryIPv6"
]
search_term = options.search.lower()

View File

@@ -176,6 +176,12 @@ sql_devices_filters = """
SELECT DISTINCT 'devSyncHubNode' AS columnName, devSyncHubNode AS columnValue
FROM Devices WHERE devSyncHubNode NOT IN ('', 'null') AND devSyncHubNode IS NOT NULL
UNION
SELECT DISTINCT 'devVlan' AS columnName, devVlan AS columnValue
FROM Devices WHERE devVlan NOT IN ('', 'null') AND devVlan IS NOT NULL
UNION
SELECT DISTINCT 'devParentRelType' AS columnName, devParentRelType AS columnValue
FROM Devices WHERE devParentRelType NOT IN ('', 'null') AND devParentRelType IS NOT NULL
UNION
SELECT DISTINCT 'devSSID' AS columnName, devSSID AS columnValue
FROM Devices WHERE devSSID NOT IN ('', 'null') AND devSSID IS NOT NULL
ORDER BY columnName;

View File

@@ -448,4 +448,3 @@ def unlock_fields(conn, mac=None, fields=None, clear_all=False):
}
finally:
conn.close()

View File

@@ -590,26 +590,6 @@ def normalize_string(text):
# MAC and IP helper methods
# -------------------------------------------------------------------------------
# # -------------------------------------------------------------------------------------------
# def is_random_mac(mac: str) -> bool:
# """Determine if a MAC address is random, respecting user-defined prefixes not to mark as random."""
# is_random = mac[1].upper() in ["2", "6", "A", "E"]
# # Get prefixes from settings
# prefixes = get_setting_value("UI_NOT_RANDOM_MAC")
# # If detected as random, make sure it doesn't start with a prefix the user wants to exclude
# if is_random:
# for prefix in prefixes:
# if mac.upper().startswith(prefix.upper()):
# is_random = False
# break
# return is_random
# -------------------------------------------------------------------------------------------
def generate_mac_links(html, deviceUrl):
p = re.compile(r"(?:[0-9a-fA-F]:?){12}")

View File

@@ -3,6 +3,7 @@ import sys
import json
import uuid
import time
import fcntl
from flask import jsonify
@@ -19,6 +20,35 @@ from api_server.sse_broadcast import broadcast_unread_notifications_count # noq
NOTIFICATION_API_FILE = apiPath + 'user_notifications.json'
def locked_notifications_file(callback):
# Ensure file exists
if not os.path.exists(NOTIFICATION_API_FILE):
with open(NOTIFICATION_API_FILE, "w") as f:
f.write("[]")
with open(NOTIFICATION_API_FILE, "r+") as f:
fcntl.flock(f, fcntl.LOCK_EX)
try:
raw = f.read().strip() or "[]"
try:
data = json.loads(raw)
except json.JSONDecodeError:
mylog("none", "[Notification] Corrupted JSON detected, resetting.")
data = []
# Let caller modify data
result = callback(data)
# Write back atomically
f.seek(0)
f.truncate()
json.dump(data, f, indent=4)
return result
finally:
fcntl.flock(f, fcntl.LOCK_UN)
# Show Frontend User Notification
def write_notification(content, level="alert", timestamp=None):
"""
@@ -36,50 +66,21 @@ def write_notification(content, level="alert", timestamp=None):
if timestamp is None:
timestamp = timeNowDB()
# Generate GUID
guid = str(uuid.uuid4())
# Prepare notification dictionary
notification = {
"timestamp": str(timestamp),
"guid": guid,
"guid": str(uuid.uuid4()),
"read": 0,
"level": level,
"content": content,
}
# If file exists, load existing data, otherwise initialize as empty list
try:
if os.path.exists(NOTIFICATION_API_FILE):
with open(NOTIFICATION_API_FILE, "r") as file:
file_contents = file.read().strip()
if file_contents:
notifications = json.loads(file_contents)
if not isinstance(notifications, list):
mylog("error", "[Notification] Invalid format: not a list, resetting")
notifications = []
else:
notifications = []
else:
notifications = []
except Exception as e:
mylog("error", [f"[Notification] Error reading notifications file: {e}"])
notifications = []
def update(notifications):
notifications.append(notification)
# Append new notification
notifications.append(notification)
locked_notifications_file(update)
# Write updated data back to file
try:
with open(NOTIFICATION_API_FILE, "w") as file:
json.dump(notifications, file, indent=4)
except Exception as e:
mylog("error", [f"[Notification] Error writing to notifications file: {e}"])
# Don't re-raise, just log. This prevents the API from crashing 500.
# Broadcast unread count update
try:
unread_count = sum(1 for n in notifications if n.get("read", 0) == 0)
unread_count = sum(1 for n in locked_notifications_file(lambda n: n) if n.get("read", 0) == 0)
broadcast_unread_notifications_count(unread_count)
except Exception as e:
mylog("none", [f"[Notification] Failed to broadcast unread count: {e}"])
@@ -147,24 +148,42 @@ def mark_all_notifications_read():
"error": str (optional)
}
"""
# If notifications file does not exist, nothing to mark
if not os.path.exists(NOTIFICATION_API_FILE):
return {"success": True}
try:
with open(NOTIFICATION_API_FILE, "r") as f:
notifications = json.load(f)
except Exception as e:
mylog("none", f"[Notification] Failed to read notifications: {e}")
return {"success": False, "error": str(e)}
# Open file in read/write mode and acquire exclusive lock
with open(NOTIFICATION_API_FILE, "r+") as f:
fcntl.flock(f, fcntl.LOCK_EX)
for n in notifications:
n["read"] = 1
try:
# Read file contents
file_contents = f.read().strip()
if file_contents == "":
notifications = []
else:
try:
notifications = json.loads(file_contents)
except json.JSONDecodeError as e:
mylog("none", f"[Notification] Corrupted notifications JSON: {e}")
notifications = []
# Mark all notifications as read
for n in notifications:
n["read"] = 1
# Rewrite file safely
f.seek(0)
f.truncate()
json.dump(notifications, f, indent=4)
finally:
# Always release file lock
fcntl.flock(f, fcntl.LOCK_UN)
try:
with open(NOTIFICATION_API_FILE, "w") as f:
json.dump(notifications, f, indent=4)
except Exception as e:
mylog("none", f"[Notification] Failed to write notifications: {e}")
mylog("none", f"[Notification] Failed to read/write notifications: {e}")
return {"success": False, "error": str(e)}
mylog("debug", "[Notification] All notifications marked as read.")

View File

@@ -110,6 +110,7 @@ FIELD_SPECS = {
"source_col": "devNameSource",
"empty_values": ["", "null", "(unknown)", "(name not found)"],
"priority": ["NSLOOKUP", "AVAHISCAN", "NBTSCAN", "DIGSCAN", "ARPSCAN", "DHCPLSS", "NEWDEV", "N/A"],
"allow_override_if_changed": True,
},
# ==========================================================

View File

@@ -5,6 +5,7 @@ import base64
from pathlib import Path
from typing import Optional, Tuple
from logger import mylog
from helper import is_random_mac
# Register NetAlertX directories
INSTALL_PATH = os.getenv("NETALERTX_APP", "/app")
@@ -183,17 +184,21 @@ def guess_device_attributes(
type_ = None
icon = None
# --- Strict MAC + vendor rule matching from external file ---
# 1. Try strict MAC match first
type_, icon = match_mac_and_vendor(mac_clean, vendor, default_type, default_icon)
# 2. If no strict match, try Name match BEFORE checking for random MAC
if not type_ or type_ == default_type:
type_, icon = match_name(name, default_type, default_icon)
# 3. Only if it's STILL not found, apply the Random MAC block
if type_ == default_type and is_random_mac(mac):
return default_icon, default_type
# --- Loose Vendor-based fallback ---
if not type_ or type_ == default_type:
type_, icon = match_vendor(vendor, default_type, default_icon)
# --- Loose Name-based fallback ---
if not type_ or type_ == default_type:
type_, icon = match_name(name, default_type, default_icon)
# --- Loose IP-based fallback ---
if (not type_ or type_ == default_type) or (not icon or icon == default_icon):
type_, icon = match_ip(ip, default_type, default_icon)
@@ -261,4 +266,4 @@ def guess_type(
# Handler for when this is run as a program instead of called as a module.
if __name__ == "__main__":
mylog("error", "This module is not intended to be run directly.")
mylog("none", "This module is not intended to be run directly.")