BE: in-app notifications overwrite prevention + device huristics update

Signed-off-by: jokob-sk <jokob.sk@gmail.com>
This commit is contained in:
jokob-sk
2026-01-30 20:25:09 +11:00
parent 405c1c37cb
commit 8640b8c282
2 changed files with 67 additions and 43 deletions

View File

@@ -155,9 +155,10 @@
"matching_pattern": [ "matching_pattern": [
{ "mac_prefix": "001FA7", "vendor": "Sony" }, { "mac_prefix": "001FA7", "vendor": "Sony" },
{ "mac_prefix": "7C04D0", "vendor": "Nintendo" }, { "mac_prefix": "7C04D0", "vendor": "Nintendo" },
{ "mac_prefix": "EC26CA", "vendor": "Sony" } { "mac_prefix": "EC26CA", "vendor": "Sony" },
{ "mac_prefix": "48B02D", "vendor": "NVIDIA" }
], ],
"name_pattern": ["playstation", "xbox"] "name_pattern": ["playstation", "xbox", "shield", "nvidia"]
}, },
{ {
"dev_type": "Camera", "dev_type": "Camera",

View File

@@ -1,9 +1,9 @@
import os import os
import sys import sys
import _io
import json import json
import uuid import uuid
import time import time
import fcntl
from flask import jsonify from flask import jsonify
@@ -20,6 +20,35 @@ from api_server.sse_broadcast import broadcast_unread_notifications_count # noq
NOTIFICATION_API_FILE = apiPath + 'user_notifications.json' NOTIFICATION_API_FILE = apiPath + 'user_notifications.json'
def locked_notifications_file(callback):
# Ensure file exists
if not os.path.exists(NOTIFICATION_API_FILE):
with open(NOTIFICATION_API_FILE, "w") as f:
f.write("[]")
with open(NOTIFICATION_API_FILE, "r+") as f:
fcntl.flock(f, fcntl.LOCK_EX)
try:
raw = f.read().strip() or "[]"
try:
data = json.loads(raw)
except json.JSONDecodeError:
mylog("none", "[Notification] Corrupted JSON detected, resetting.")
data = []
# Let caller modify data
result = callback(data)
# Write back atomically
f.seek(0)
f.truncate()
json.dump(data, f, indent=4)
return result
finally:
fcntl.flock(f, fcntl.LOCK_UN)
# Show Frontend User Notification # Show Frontend User Notification
def write_notification(content, level="alert", timestamp=None): def write_notification(content, level="alert", timestamp=None):
""" """
@@ -37,45 +66,21 @@ def write_notification(content, level="alert", timestamp=None):
if timestamp is None: if timestamp is None:
timestamp = timeNowDB() timestamp = timeNowDB()
# Generate GUID
guid = str(uuid.uuid4())
# Prepare notification dictionary
notification = { notification = {
"timestamp": str(timestamp), "timestamp": str(timestamp),
"guid": guid, "guid": str(uuid.uuid4()),
"read": 0, "read": 0,
"level": level, "level": level,
"content": content, "content": content,
} }
# If file exists, load existing data, otherwise initialize as empty list def update(notifications):
if os.path.exists(NOTIFICATION_API_FILE): notifications.append(notification)
with open(NOTIFICATION_API_FILE, "r") as file:
# Check if the file object is of type _io.TextIOWrapper
if isinstance(file, _io.TextIOWrapper):
file_contents = file.read() # Read file contents
if file_contents == "":
file_contents = "[]" # If file is empty, initialize as empty list
# mylog('debug', ['[Notification] User Notifications file: ', file_contents]) locked_notifications_file(update)
notifications = json.loads(file_contents) # Parse JSON data
else:
mylog("none", "[Notification] File is not of type _io.TextIOWrapper")
notifications = []
else:
notifications = []
# Append new notification
notifications.append(notification)
# Write updated data back to file
with open(NOTIFICATION_API_FILE, "w") as file:
json.dump(notifications, file, indent=4)
# Broadcast unread count update
try: try:
unread_count = sum(1 for n in notifications if n.get("read", 0) == 0) unread_count = sum(1 for n in locked_notifications_file(lambda n: n) if n.get("read", 0) == 0)
broadcast_unread_notifications_count(unread_count) broadcast_unread_notifications_count(unread_count)
except Exception as e: except Exception as e:
mylog("none", [f"[Notification] Failed to broadcast unread count: {e}"]) mylog("none", [f"[Notification] Failed to broadcast unread count: {e}"])
@@ -143,24 +148,42 @@ def mark_all_notifications_read():
"error": str (optional) "error": str (optional)
} }
""" """
# If notifications file does not exist, nothing to mark
if not os.path.exists(NOTIFICATION_API_FILE): if not os.path.exists(NOTIFICATION_API_FILE):
return {"success": True} return {"success": True}
try: try:
with open(NOTIFICATION_API_FILE, "r") as f: # Open file in read/write mode and acquire exclusive lock
notifications = json.load(f) with open(NOTIFICATION_API_FILE, "r+") as f:
except Exception as e: fcntl.flock(f, fcntl.LOCK_EX)
mylog("none", f"[Notification] Failed to read notifications: {e}")
return {"success": False, "error": str(e)}
for n in notifications: try:
n["read"] = 1 # Read file contents
file_contents = f.read().strip()
if file_contents == "":
notifications = []
else:
try:
notifications = json.loads(file_contents)
except json.JSONDecodeError as e:
mylog("none", f"[Notification] Corrupted notifications JSON: {e}")
notifications = []
# Mark all notifications as read
for n in notifications:
n["read"] = 1
# Rewrite file safely
f.seek(0)
f.truncate()
json.dump(notifications, f, indent=4)
finally:
# Always release file lock
fcntl.flock(f, fcntl.LOCK_UN)
try:
with open(NOTIFICATION_API_FILE, "w") as f:
json.dump(notifications, f, indent=4)
except Exception as e: except Exception as e:
mylog("none", f"[Notification] Failed to write notifications: {e}") mylog("none", f"[Notification] Failed to read/write notifications: {e}")
return {"success": False, "error": str(e)} return {"success": False, "error": str(e)}
mylog("debug", "[Notification] All notifications marked as read.") mylog("debug", "[Notification] All notifications marked as read.")