Merge remote-tracking branch 'origin/main' into hardening

This commit is contained in:
Adam Outler
2025-10-18 13:23:57 -04:00
63 changed files with 3718 additions and 730 deletions

View File

@@ -1,6 +1,19 @@
import threading
import sys
from flask import Flask, request, jsonify, Response
from flask_cors import CORS
# Register NetAlertX directories
INSTALL_PATH = "/app"
sys.path.extend([f"{INSTALL_PATH}/server"])
from logger import mylog
from helper import get_setting_value, timeNowTZ
from db.db_helper import get_date_from_period
from app_state import updateState
from .graphql_endpoint import devicesSchema
from .device_endpoint import get_device_data, set_device_data, delete_device, delete_device_events, reset_device_props, copy_device, update_device_column
from .devices_endpoint import get_all_devices, delete_unknown_devices, delete_all_with_empty_macs, delete_devices, export_devices, import_csv, devices_totals, devices_by_status
@@ -11,17 +24,7 @@ from .sessions_endpoint import get_sessions, delete_session, create_session, get
from .nettools_endpoint import wakeonlan, traceroute, speedtest, nslookup, nmap_scan, internet_info
from .dbquery_endpoint import read_query, write_query, update_query, delete_query
from .sync_endpoint import handle_sync_post, handle_sync_get
import sys
# Register NetAlertX directories
INSTALL_PATH = "/app"
sys.path.extend([f"{INSTALL_PATH}/server"])
from logger import mylog
from helper import get_setting_value, timeNowTZ
from db.db_helper import get_date_from_period
from app_state import updateState
from messaging.in_app import write_notification
from messaging.in_app import write_notification, mark_all_notifications_read, delete_notifications, get_unread_notifications, delete_notification, mark_notification_as_read
# Flask application
app = Flask(__name__)
@@ -36,6 +39,7 @@ CORS(
r"/sessions/*": {"origins": "*"},
r"/settings/*": {"origins": "*"},
r"/dbquery/*": {"origins": "*"},
r"/messaging/*": {"origins": "*"},
r"/events/*": {"origins": "*"}
},
supports_credentials=True,
@@ -500,6 +504,69 @@ def metrics():
# Return Prometheus metrics as plain text
return Response(get_metric_stats(), mimetype="text/plain")
# --------------------------
# In-app notifications
# --------------------------
@app.route("/messaging/in-app/write", methods=["POST"])
def api_write_notification():
if not is_authorized():
return jsonify({"error": "Forbidden"}), 403
data = request.json or {}
content = data.get("content")
level = data.get("level", "alert")
if not content:
return jsonify({"success": False, "error": "Missing content"}), 400
write_notification(content, level)
return jsonify({"success": True})
@app.route("/messaging/in-app/unread", methods=["GET"])
def api_get_unread_notifications():
if not is_authorized():
return jsonify({"error": "Forbidden"}), 403
return get_unread_notifications()
@app.route("/messaging/in-app/read/all", methods=["POST"])
def api_mark_all_notifications_read():
if not is_authorized():
return jsonify({"error": "Forbidden"}), 403
return jsonify(mark_all_notifications_read())
@app.route("/messaging/in-app/delete", methods=["DELETE"])
def api_delete_all_notifications():
if not is_authorized():
return jsonify({"error": "Forbidden"}), 403
return delete_notifications()
@app.route("/messaging/in-app/delete/<guid>", methods=["DELETE"])
def api_delete_notification(guid):
"""Delete a single notification by GUID."""
if not is_authorized():
return jsonify({"error": "Forbidden"}), 403
result = delete_notification(guid)
if result.get("success"):
return jsonify({"success": True})
else:
return jsonify({"success": False, "error": result.get("error")}), 500
@app.route("/messaging/in-app/read/<guid>", methods=["POST"])
def api_mark_notification_read(guid):
"""Mark a single notification as read by GUID."""
if not is_authorized():
return jsonify({"error": "Forbidden"}), 403
result = mark_notification_as_read(guid)
if result.get("success"):
return jsonify({"success": True})
else:
return jsonify({"success": False, "error": result.get("error")}), 500
# --------------------------
# SYNC endpoint

View File

@@ -31,7 +31,7 @@ arpscan_devices = []
SCAN_SUBNETS = ['192.168.1.0/24 --interface=eth1', '192.168.1.0/24 --interface=eth0']
LOG_LEVEL = 'verbose'
TIMEZONE = 'Europe/Berlin'
UI_LANG = 'English'
UI_LANG = 'English (en_us)'
UI_PRESENCE = ['online', 'offline', 'archived']
UI_MY_DEVICES = ['online', 'offline', 'archived', 'new', 'down']
UI_NOT_RANDOM_MAC = []

View File

@@ -153,47 +153,259 @@ class SafeConditionBuilder:
def _parse_condition(self, condition: str) -> Tuple[str, Dict[str, Any]]:
"""
Parse a condition string into safe SQL with parameters.
This method handles basic patterns like:
- AND devName = 'value'
- AND devComments LIKE '%value%'
- AND eve_EventType IN ('type1', 'type2')
This method handles both single and compound conditions:
- Single: AND devName = 'value'
- Compound: AND devName = 'value' AND devVendor = 'Apple'
- Multiple clauses with AND/OR operators
Args:
condition: Condition string to parse
Returns:
Tuple of (safe_sql_snippet, parameters_dict)
"""
condition = condition.strip()
# Handle empty conditions
if not condition:
return "", {}
# Check if this is a compound condition (multiple clauses)
if self._is_compound_condition(condition):
return self._parse_compound_condition(condition)
# Single condition: extract leading logical operator if present
logical_op = None
clause_text = condition
# Check for leading AND
if condition.upper().startswith('AND ') or condition.upper().startswith('AND\t'):
logical_op = 'AND'
clause_text = condition[3:].strip()
# Check for leading OR
elif condition.upper().startswith('OR ') or condition.upper().startswith('OR\t'):
logical_op = 'OR'
clause_text = condition[2:].strip()
# Parse the single condition
return self._parse_single_condition(clause_text, logical_op)
def _is_compound_condition(self, condition: str) -> bool:
"""
Determine if a condition contains multiple clauses (compound condition).
A compound condition has multiple logical operators (AND/OR) connecting
separate comparison clauses.
Args:
condition: Condition string to check
Returns:
True if compound (multiple clauses), False if single clause
"""
# Track if we're inside quotes to avoid counting operators in quoted strings
in_quotes = False
logical_op_count = 0
i = 0
while i < len(condition):
char = condition[i]
# Toggle quote state
if char == "'":
in_quotes = not in_quotes
i += 1
continue
# Only count logical operators outside of quotes
if not in_quotes:
# Look for AND or OR as whole words
remaining = condition[i:].upper()
# Check for AND (must be word boundary)
if remaining.startswith('AND ') or remaining.startswith('AND\t'):
logical_op_count += 1
i += 3
continue
# Check for OR (must be word boundary)
if remaining.startswith('OR ') or remaining.startswith('OR\t'):
logical_op_count += 1
i += 2
continue
i += 1
# A compound condition has more than one logical operator
# (first AND/OR starts the condition, subsequent ones connect clauses)
return logical_op_count > 1
def _parse_compound_condition(self, condition: str) -> Tuple[str, Dict[str, Any]]:
"""
Parse a compound condition with multiple clauses.
Splits the condition into individual clauses, parses each one,
and reconstructs the full condition with all parameters.
Args:
condition: Compound condition string
Returns:
Tuple of (safe_sql_snippet, parameters_dict)
"""
# Split the condition into individual clauses while preserving logical operators
clauses = self._split_by_logical_operators(condition)
# Parse each clause individually
parsed_parts = []
all_params = {}
for clause_text, logical_op in clauses:
# Parse this single clause
sql_part, params = self._parse_single_condition(clause_text, logical_op)
if sql_part:
parsed_parts.append(sql_part)
all_params.update(params)
if not parsed_parts:
raise ValueError("No valid clauses found in compound condition")
# Join all parsed parts
final_sql = " ".join(parsed_parts)
return final_sql, all_params
def _split_by_logical_operators(self, condition: str) -> List[Tuple[str, Optional[str]]]:
"""
Split a compound condition into individual clauses.
Returns a list of tuples: (clause_text, logical_operator)
The logical operator is the AND/OR that precedes the clause.
Args:
condition: Compound condition string
Returns:
List of (clause_text, logical_op) tuples
"""
clauses = []
current_clause = []
current_logical_op = None
in_quotes = False
i = 0
while i < len(condition):
char = condition[i]
# Toggle quote state
if char == "'":
in_quotes = not in_quotes
current_clause.append(char)
i += 1
continue
# Only look for logical operators outside of quotes
if not in_quotes:
remaining = condition[i:].upper()
# Check if we're at a word boundary (start of string or after whitespace)
at_word_boundary = (i == 0 or condition[i-1] in ' \t')
# Check for AND (must be at word boundary)
if at_word_boundary and (remaining.startswith('AND ') or remaining.startswith('AND\t')):
# Save current clause if we have one
if current_clause:
clause_text = ''.join(current_clause).strip()
if clause_text:
clauses.append((clause_text, current_logical_op))
current_clause = []
# Set the logical operator for the next clause
current_logical_op = 'AND'
i += 3 # Skip 'AND'
# Skip whitespace after AND
while i < len(condition) and condition[i] in ' \t':
i += 1
continue
# Check for OR (must be at word boundary)
if at_word_boundary and (remaining.startswith('OR ') or remaining.startswith('OR\t')):
# Save current clause if we have one
if current_clause:
clause_text = ''.join(current_clause).strip()
if clause_text:
clauses.append((clause_text, current_logical_op))
current_clause = []
# Set the logical operator for the next clause
current_logical_op = 'OR'
i += 2 # Skip 'OR'
# Skip whitespace after OR
while i < len(condition) and condition[i] in ' \t':
i += 1
continue
# Add character to current clause
current_clause.append(char)
i += 1
# Don't forget the last clause
if current_clause:
clause_text = ''.join(current_clause).strip()
if clause_text:
clauses.append((clause_text, current_logical_op))
return clauses
def _parse_single_condition(self, condition: str, logical_op: Optional[str] = None) -> Tuple[str, Dict[str, Any]]:
"""
Parse a single condition clause into safe SQL with parameters.
This method handles basic patterns like:
- devName = 'value' (with optional AND/OR prefix)
- devComments LIKE '%value%'
- eve_EventType IN ('type1', 'type2')
Args:
condition: Single condition string to parse
logical_op: Optional logical operator (AND/OR) to prepend
Returns:
Tuple of (safe_sql_snippet, parameters_dict)
"""
condition = condition.strip()
# Handle empty conditions
if not condition:
return "", {}
# Simple pattern matching for common conditions
# Pattern 1: AND/OR column operator value (supporting Unicode in quoted strings)
pattern1 = r'^\s*(AND|OR)?\s+(\w+)\s+(=|!=|<>|<|>|<=|>=|LIKE|NOT\s+LIKE)\s+\'([^\']*)\'\s*$'
# Pattern 1: [AND/OR] column operator value (supporting Unicode in quoted strings)
pattern1 = r'^\s*(\w+)\s+(=|!=|<>|<|>|<=|>=|LIKE|NOT\s+LIKE)\s+\'([^\']*)\'\s*$'
match1 = re.match(pattern1, condition, re.IGNORECASE | re.UNICODE)
if match1:
logical_op, column, operator, value = match1.groups()
column, operator, value = match1.groups()
return self._build_simple_condition(logical_op, column, operator, value)
# Pattern 2: AND/OR column IN ('val1', 'val2', ...)
pattern2 = r'^\s*(AND|OR)?\s+(\w+)\s+(IN|NOT\s+IN)\s+\(([^)]+)\)\s*$'
# Pattern 2: [AND/OR] column IN ('val1', 'val2', ...)
pattern2 = r'^\s*(\w+)\s+(IN|NOT\s+IN)\s+\(([^)]+)\)\s*$'
match2 = re.match(pattern2, condition, re.IGNORECASE)
if match2:
logical_op, column, operator, values_str = match2.groups()
column, operator, values_str = match2.groups()
return self._build_in_condition(logical_op, column, operator, values_str)
# Pattern 3: AND/OR column IS NULL/IS NOT NULL
pattern3 = r'^\s*(AND|OR)?\s+(\w+)\s+(IS\s+NULL|IS\s+NOT\s+NULL)\s*$'
# Pattern 3: [AND/OR] column IS NULL/IS NOT NULL
pattern3 = r'^\s*(\w+)\s+(IS\s+NULL|IS\s+NOT\s+NULL)\s*$'
match3 = re.match(pattern3, condition, re.IGNORECASE)
if match3:
logical_op, column, operator = match3.groups()
column, operator = match3.groups()
return self._build_null_condition(logical_op, column, operator)
# If no patterns match, reject the condition for security

View File

@@ -157,7 +157,7 @@ def importConfigs (pm, db, all_plugins):
# ----------------------------------------
# ccd(key, default, config_dir, name, inputtype, options, group, events=[], desc = "", regex = "", setJsonMetadata = {}, overrideTemplate = {})
conf.LOADED_PLUGINS = ccd('LOADED_PLUGINS', [] , c_d, 'Loaded plugins', '{"dataType":"array","elements":[{"elementType":"select","elementOptions":[{"multiple":"true","ordeable":"true"}],"transformers":[]},{"elementType":"button","elementOptions":[{"sourceSuffixes":[]},{"separator":""},{"cssClasses":"col-xs-12"},{"onClick":"selectChange(this)"},{"getStringKey":"Gen_Change"}],"transformers":[]}]}', '[]', 'General')
conf.LOADED_PLUGINS = ccd('LOADED_PLUGINS', [] , c_d, 'Loaded plugins', '{"dataType":"array","elements":[{"elementType":"select","elementHasInputValue":1,"elementOptions":[{"multiple":"true","ordeable":"true"}],"transformers":[]},{"elementType":"button","elementOptions":[{"sourceSuffixes":[]},{"separator":""},{"cssClasses":"col-xs-12"},{"onClick":"selectChange(this)"},{"getStringKey":"Gen_Change"}],"transformers":[]}]}', '[]', 'General')
conf.DISCOVER_PLUGINS = ccd('DISCOVER_PLUGINS', True , c_d, 'Discover plugins', """{"dataType": "boolean","elements": [{"elementType": "input","elementOptions": [{ "type": "checkbox" }],"transformers": []}]}""", '[]', 'General')
conf.SCAN_SUBNETS = ccd('SCAN_SUBNETS', ['192.168.1.0/24 --interface=eth1', '192.168.1.0/24 --interface=eth0'] , c_d, 'Subnets to scan', '''{"dataType": "array","elements": [{"elementType": "input","elementOptions": [{"placeholder": "192.168.1.0/24 --interface=eth1"},{"suffix": "_in"},{"cssClasses": "col-sm-10"},{"prefillValue": "null"}],"transformers": []},{"elementType": "button","elementOptions": [{"sourceSuffixes": ["_in"]},{"separator": ""},{"cssClasses": "col-xs-12"},{"onClick": "addList(this, false)"},{"getStringKey": "Gen_Add"}],"transformers": []},{"elementType": "select","elementHasInputValue": 1,"elementOptions": [{"multiple": "true"},{"readonly": "true"},{"editable": "true"}],"transformers": []},{"elementType": "button","elementOptions": [{"sourceSuffixes": []},{"separator": ""},{"cssClasses": "col-xs-6"},{"onClick": "removeAllOptions(this)"},{"getStringKey": "Gen_Remove_All"}],"transformers": []},{"elementType": "button","elementOptions": [{"sourceSuffixes": []},{"separator": ""},{"cssClasses": "col-xs-6"},{"onClick": "removeFromList(this)"},{"getStringKey": "Gen_Remove_Last"}],"transformers": []}]}''', '[]', 'General')
conf.LOG_LEVEL = ccd('LOG_LEVEL', 'verbose' , c_d, 'Log verboseness', '{"dataType":"string", "elements": [{"elementType" : "select", "elementOptions" : [] ,"transformers": []}]}', "['none', 'minimal', 'verbose', 'debug', 'trace']", 'General')
@@ -176,7 +176,7 @@ def importConfigs (pm, db, all_plugins):
conf.API_TOKEN = ccd('API_TOKEN', 't_' + generate_random_string(20) , c_d, 'API token', '{"dataType": "string","elements": [{"elementType": "input","elementHasInputValue": 1,"elementOptions": [{ "cssClasses": "col-xs-12" }],"transformers": []},{"elementType": "button","elementOptions": [{ "getStringKey": "Gen_Generate" },{ "customParams": "API_TOKEN" },{ "onClick": "generateApiToken(this, 20)" },{ "cssClasses": "col-xs-12" }],"transformers": []}]}', '[]', 'General')
# UI
conf.UI_LANG = ccd('UI_LANG', 'English' , c_d, 'Language Interface', '{"dataType":"string", "elements": [{"elementType" : "select", "elementOptions" : [] ,"transformers": []}]}', "['English', 'German', 'Spanish', 'French', 'Norwegian', 'Russian', 'Italian (it_it)', 'Portuguese (pt_br)', 'Portuguese (pt_pt)', 'Polish (pl_pl)', 'Chinese (zh_cn)', 'Turkish (tr_tr)', 'Czech (cs_cz)', 'Arabic (ar_ar)', 'Catalan (ca_ca)', 'Ukrainian (uk_ua)' ]", 'UI')
conf.UI_LANG = ccd('UI_LANG', 'English (en_us)' , c_d, 'Language Interface', '{"dataType":"string", "elements": [{"elementType" : "select", "elementOptions" : [] ,"transformers": []}]}', "['English (en_us)', 'Arabic (ar_ar)', 'Catalan (ca_ca)', 'Czech (cs_cz)', 'German (de_de)', 'Spanish (es_es)', 'Farsi (fa_fa)', 'French (fr_fr)', 'Italian (it_it)', 'Norwegian (nb_no)', 'Polish (pl_pl)', 'Portuguese (pt_br)', 'Portuguese (pt_pt)', 'Russian (ru_ru)', 'Turkish (tr_tr)', 'Ukrainian (uk_ua)', 'Chinese (zh_cn)']", 'UI')
# Init timezone in case it changed and handle invalid values
try:
@@ -368,19 +368,8 @@ def importConfigs (pm, db, all_plugins):
# mylog('verbose', [f"[Config] pref {plugin["unique_prefix"]} run_val {run_val} run_sch {run_sch} "])
if run_val == 'schedule':
newSchedule = None
try:
newSchedule = Cron(run_sch).schedule(start_date=datetime.datetime.now(conf.tz))
if (newSchedule is not None):
conf.mySchedules.append(schedule_class(plugin["unique_prefix"], newSchedule, newSchedule.next(), False))
else:
raise(ValueError("Invalid schedule"))
except ValueError as e:
mylog('none', [f"[Config] [ERROR] Invalid schedule '{run_sch}' for plugin '{plugin['unique_prefix']}'. Error: {e}."])
except Exception as e:
mylog('none', [f"[Config] [ERROR] Could not set schedule '{run_sch}' for plugin '{plugin['unique_prefix']}'. Error: {e}."])
newSchedule = Cron(run_sch).schedule(start_date=datetime.datetime.now(conf.tz))
conf.mySchedules.append(schedule_class(plugin["unique_prefix"], newSchedule, newSchedule.next(), False))
# mylog('verbose', [f"[Config] conf.mySchedules {conf.mySchedules}"])

View File

@@ -84,13 +84,16 @@ class Logger:
root_logger.setLevel(custom_to_logging_levels.get(currentLevel, logging.NOTSET))
def mylog(self, requestedDebugLevel, *args):
self.reqLvl = self._to_num(requestedDebugLevel)
if self.reqLvl is not None and self.reqLvl <= self.setLvl:
self.setLvl = self._to_num(currentLevel)
if self.isAbove(requestedDebugLevel):
file_print(*args)
def isAbove(self, requestedDebugLevel):
reqLvl = self._to_num(requestedDebugLevel)
return reqLvl is not None and self.setLvl >= reqLvl
return reqLvl is not None and self.setLvl is not None and self.setLvl >= reqLvl
#-------------------------------------------------------------------------------
# Dedicated thread for writing logs
@@ -122,6 +125,8 @@ def start_log_writer_thread():
def file_print(*args):
result = timeNowTZ().strftime('%H:%M:%S') + ' '
for arg in args:
if isinstance(arg, list):
arg = ' '.join(str(a) for a in arg) # so taht new lines are handled correctly also when passing a list
result += str(arg)
logging.log(custom_to_logging_levels.get(currentLevel, logging.NOTSET), result)

View File

@@ -9,6 +9,7 @@ import subprocess
import requests
from yattag import indent
from json2table import convert
from flask import jsonify
# Register NetAlertX directories
INSTALL_PATH="/app"
@@ -25,7 +26,18 @@ NOTIFICATION_API_FILE = apiPath + 'user_notifications.json'
# Show Frontend User Notification
def write_notification(content, level='alert', timestamp=None):
"""
Create and append a new user notification entry to the notifications file.
Args:
content (str): The message content to display to the user.
level (str, optional): Notification severity (e.g., 'info', 'alert', 'warning').
Defaults to 'alert'.
timestamp (datetime, optional): Custom timestamp; if None, uses current time.
Returns:
None
"""
if timestamp is None:
timestamp = timeNowTZ()
@@ -67,7 +79,15 @@ def write_notification(content, level='alert', timestamp=None):
# Trim notifications
def remove_old(keepNumberOfEntries):
"""
Trim the notifications file, keeping only the most recent N entries.
Args:
keepNumberOfEntries (int): Number of latest notifications to retain.
Returns:
None
"""
# Check if file exists
if not os.path.exists(NOTIFICATION_API_FILE):
mylog('info', '[Notification] No notifications file to clean.')
@@ -106,3 +126,141 @@ def remove_old(keepNumberOfEntries):
mylog('verbose', f'[Notification] Trimmed notifications to latest {keepNumberOfEntries}')
except Exception as e:
mylog('none', f'Error writing trimmed notifications file: {e}')
def mark_all_notifications_read():
"""
Mark all existing notifications as read.
Returns:
dict: JSON-compatible dictionary containing:
{
"success": bool,
"error": str (optional)
}
"""
if not os.path.exists(NOTIFICATION_API_FILE):
return {"success": True}
try:
with open(NOTIFICATION_API_FILE, "r") as f:
notifications = json.load(f)
except Exception as e:
mylog("none", f"[Notification] Failed to read notifications: {e}")
return {"success": False, "error": str(e)}
for n in notifications:
n["read"] = 1
try:
with open(NOTIFICATION_API_FILE, "w") as f:
json.dump(notifications, f, indent=4)
except Exception as e:
mylog("none", f"[Notification] Failed to write notifications: {e}")
return {"success": False, "error": str(e)}
mylog("debug", "[Notification] All notifications marked as read.")
return {"success": True}
def delete_notifications():
"""
Delete all notifications from the JSON file.
Returns:
A JSON response with {"success": True}.
"""
with open(NOTIFICATION_API_FILE, "w") as f:
json.dump([], f, indent=4)
mylog("debug", "[Notification] All notifications deleted.")
return jsonify({"success": True})
def get_unread_notifications():
"""
Retrieve all unread notifications from the JSON file.
Returns:
A JSON array of unread notification objects.
"""
if not os.path.exists(NOTIFICATION_API_FILE):
return jsonify([])
with open(NOTIFICATION_API_FILE, "r") as f:
notifications = json.load(f)
unread = [n for n in notifications if n.get("read", 0) == 0]
return jsonify(unread)
def mark_notification_as_read(guid=None, max_attempts=3):
"""
Mark a notification as read based on GUID.
If guid is None, mark all notifications as read.
Args:
guid (str, optional): The GUID of the notification to mark. Defaults to None.
max_attempts (int, optional): Number of attempts to read/write file. Defaults to 3.
Returns:
dict: {"success": True} on success, {"success": False, "error": "..."} on failure
"""
attempts = 0
while attempts < max_attempts:
try:
if os.path.exists(NOTIFICATION_API_FILE) and os.access(NOTIFICATION_API_FILE, os.R_OK | os.W_OK):
with open(NOTIFICATION_API_FILE, "r") as f:
notifications = json.load(f)
if notifications is not None:
for notification in notifications:
if guid is None or notification.get("guid") == guid:
notification["read"] = 1
with open(NOTIFICATION_API_FILE, "w") as f:
json.dump(notifications, f, indent=4)
return {"success": True}
except Exception as e:
mylog("none", f"[Notification] Attempt {attempts+1} failed: {e}")
attempts += 1
time.sleep(0.5) # Sleep 0.5 seconds before retrying
error_msg = f"Failed to read/write notification file after {max_attempts} attempts."
mylog("none", f"[Notification] {error_msg}")
return {"success": False, "error": error_msg}
def delete_notification(guid):
"""
Delete a notification from the notifications file based on its GUID.
Args:
guid (str): The GUID of the notification to delete.
Returns:
dict: {"success": True} on success, {"success": False, "error": "..."} on failure
"""
if not guid:
return {"success": False, "error": "GUID is required"}
if not os.path.exists(NOTIFICATION_API_FILE):
return {"success": True} # Nothing to delete
try:
with open(NOTIFICATION_API_FILE, "r") as f:
notifications = json.load(f)
# Filter out the notification with the specified GUID
filtered_notifications = [n for n in notifications if n.get("guid") != guid]
# Write the updated notifications back
with open(NOTIFICATION_API_FILE, "w") as f:
json.dump(filtered_notifications, f, indent=4)
return {"success": True}
except Exception as e:
mylog("none", f"[Notification] Failed to delete notification {guid}: {e}")
return {"success": False, "error": str(e)}