BE: API in-app messaging endpoint

Signed-off-by: jokob-sk <jokob.sk@gmail.com>
This commit is contained in:
jokob-sk
2025-10-10 17:00:53 +11:00
parent 8ae0367e8e
commit b59bca2967
5 changed files with 522 additions and 11 deletions

View File

@@ -59,6 +59,8 @@ http://<server>:<GRAPHQL_PORT>/
* [Events](API_EVENTS.md) Device event logging and management
* [Sessions](API_SESSIONS.md) Connection sessions and history
* [Settings](API_SETTINGS.md) Settings
* Messaging:
* [In app messaging](API_MESSAGING_IN_APP.md) - In-app messaging
* [Metrics](API_METRICS.md) Prometheus metrics and per-device status
* [Network Tools](API_NETTOOLS.md) Utilities like Wake-on-LAN, traceroute, nslookup, nmap, and internet info
* [Online History](API_ONLINEHISTORY.md) Online/offline device records

173
docs/API_MESSAGING_IN_APP.md Executable file
View File

@@ -0,0 +1,173 @@
# In-app Notifications API
Manage in-app notifications for users. Notifications can be written, retrieved, marked as read, or deleted.
---
### Write Notification
* **POST** `/messaging/in-app/write` → Create a new in-app notification.
**Request Body:**
```json
{
"content": "This is a test notification",
"level": "alert" // optional, ["interrupt","info","alert"] default: "alert"
}
```
**Response:**
```json
{
"success": true
}
```
#### `curl` Example
```bash
curl -X POST "http://<server_ip>:<GRAPHQL_PORT>/messaging/in-app/write" \
-H "Authorization: Bearer <API_TOKEN>" \
-H "Accept: application/json" \
-H "Content-Type: application/json" \
-d '{
"content": "This is a test notification",
"level": "alert"
}'
```
---
### Get Unread Notifications
* **GET** `/messaging/in-app/unread` → Retrieve all unread notifications.
**Response:**
```json
[
{
"timestamp": "2025-10-10T12:34:56",
"guid": "f47ac10b-58cc-4372-a567-0e02b2c3d479",
"read": 0,
"level": "alert",
"content": "This is a test notification"
}
]
```
#### `curl` Example
```bash
curl -X GET "http://<server_ip>:<GRAPHQL_PORT>/messaging/in-app/unread" \
-H "Authorization: Bearer <API_TOKEN>" \
-H "Accept: application/json"
```
---
### Mark All Notifications as Read
* **POST** `/messaging/in-app/read/all` → Mark all notifications as read.
**Response:**
```json
{
"success": true
}
```
#### `curl` Example
```bash
curl -X POST "http://<server_ip>:<GRAPHQL_PORT>/messaging/in-app/read/all" \
-H "Authorization: Bearer <API_TOKEN>" \
-H "Accept: application/json"
```
---
### Mark Single Notification as Read
* **POST** `/messaging/in-app/read/<guid>` → Mark a single notification as read using its GUID.
**Response (success):**
```json
{
"success": true
}
```
**Response (failure):**
```json
{
"success": false,
"error": "Notification not found"
}
```
#### `curl` Example
```bash
curl -X POST "http://<server_ip>:<GRAPHQL_PORT>/messaging/in-app/read/f47ac10b-58cc-4372-a567-0e02b2c3d479" \
-H "Authorization: Bearer <API_TOKEN>" \
-H "Accept: application/json"
```
---
### Delete All Notifications
* **DELETE** `/messaging/in-app/delete` → Remove all notifications from the system.
**Response:**
```json
{
"success": true
}
```
#### `curl` Example
```bash
curl -X DELETE "http://<server_ip>:<GRAPHQL_PORT>/messaging/in-app/delete" \
-H "Authorization: Bearer <API_TOKEN>" \
-H "Accept: application/json"
```
---
### Delete Single Notification
* **DELETE** `/messaging/in-app/delete/<guid>` → Remove a single notification by its GUID.
**Response (success):**
```json
{
"success": true
}
```
**Response (failure):**
```json
{
"success": false,
"error": "Notification not found"
}
```
#### `curl` Example
```bash
curl -X DELETE "http://<server_ip>:<GRAPHQL_PORT>/messaging/in-app/delete/f47ac10b-58cc-4372-a567-0e02b2c3d479" \
-H "Authorization: Bearer <API_TOKEN>" \
-H "Accept: application/json"
```

View File

@@ -1,6 +1,19 @@
import threading
import sys
from flask import Flask, request, jsonify, Response
from flask_cors import CORS
# Register NetAlertX directories
INSTALL_PATH = "/app"
sys.path.extend([f"{INSTALL_PATH}/server"])
from logger import mylog
from helper import get_setting_value, timeNowTZ
from db.db_helper import get_date_from_period
from app_state import updateState
from .graphql_endpoint import devicesSchema
from .device_endpoint import get_device_data, set_device_data, delete_device, delete_device_events, reset_device_props, copy_device, update_device_column
from .devices_endpoint import get_all_devices, delete_unknown_devices, delete_all_with_empty_macs, delete_devices, export_devices, import_csv, devices_totals, devices_by_status
@@ -11,17 +24,7 @@ from .sessions_endpoint import get_sessions, delete_session, create_session, get
from .nettools_endpoint import wakeonlan, traceroute, speedtest, nslookup, nmap_scan, internet_info
from .dbquery_endpoint import read_query, write_query, update_query, delete_query
from .sync_endpoint import handle_sync_post, handle_sync_get
import sys
# Register NetAlertX directories
INSTALL_PATH = "/app"
sys.path.extend([f"{INSTALL_PATH}/server"])
from logger import mylog
from helper import get_setting_value, timeNowTZ
from db.db_helper import get_date_from_period
from app_state import updateState
from messaging.in_app import write_notification
from messaging.in_app import write_notification, mark_all_notifications_read, delete_notifications, get_unread_notifications, delete_notification, mark_notification_as_read
# Flask application
app = Flask(__name__)
@@ -36,6 +39,7 @@ CORS(
r"/sessions/*": {"origins": "*"},
r"/settings/*": {"origins": "*"},
r"/dbquery/*": {"origins": "*"},
r"/messaging/*": {"origins": "*"},
r"/events/*": {"origins": "*"}
},
supports_credentials=True,
@@ -500,6 +504,69 @@ def metrics():
# Return Prometheus metrics as plain text
return Response(get_metric_stats(), mimetype="text/plain")
# --------------------------
# In-app notifications
# --------------------------
@app.route("/messaging/in-app/write", methods=["POST"])
def api_write_notification():
if not is_authorized():
return jsonify({"error": "Forbidden"}), 403
data = request.json or {}
content = data.get("content")
level = data.get("level", "alert")
if not content:
return jsonify({"success": False, "error": "Missing content"}), 400
write_notification(content, level)
return jsonify({"success": True})
@app.route("/messaging/in-app/unread", methods=["GET"])
def api_get_unread_notifications():
if not is_authorized():
return jsonify({"error": "Forbidden"}), 403
return get_unread_notifications()
@app.route("/messaging/in-app/read/all", methods=["POST"])
def api_mark_all_notifications_read():
if not is_authorized():
return jsonify({"error": "Forbidden"}), 403
return jsonify(mark_all_notifications_read())
@app.route("/messaging/in-app/delete", methods=["DELETE"])
def api_delete_all_notifications():
if not is_authorized():
return jsonify({"error": "Forbidden"}), 403
return delete_notifications()
@app.route("/messaging/in-app/delete/<guid>", methods=["DELETE"])
def api_delete_notification(guid):
"""Delete a single notification by GUID."""
if not is_authorized():
return jsonify({"error": "Forbidden"}), 403
result = delete_notification(guid)
if result.get("success"):
return jsonify({"success": True})
else:
return jsonify({"success": False, "error": result.get("error")}), 500
@app.route("/messaging/in-app/read/<guid>", methods=["POST"])
def api_mark_notification_read(guid):
"""Mark a single notification as read by GUID."""
if not is_authorized():
return jsonify({"error": "Forbidden"}), 403
result = mark_notification_as_read(guid)
if result.get("success"):
return jsonify({"success": True})
else:
return jsonify({"success": False, "error": result.get("error")}), 500
# --------------------------
# SYNC endpoint

View File

@@ -9,6 +9,7 @@ import subprocess
import requests
from yattag import indent
from json2table import convert
from flask import jsonify
# Register NetAlertX directories
INSTALL_PATH="/app"
@@ -25,7 +26,18 @@ NOTIFICATION_API_FILE = apiPath + 'user_notifications.json'
# Show Frontend User Notification
def write_notification(content, level='alert', timestamp=None):
"""
Create and append a new user notification entry to the notifications file.
Args:
content (str): The message content to display to the user.
level (str, optional): Notification severity (e.g., 'info', 'alert', 'warning').
Defaults to 'alert'.
timestamp (datetime, optional): Custom timestamp; if None, uses current time.
Returns:
None
"""
if timestamp is None:
timestamp = timeNowTZ()
@@ -67,7 +79,15 @@ def write_notification(content, level='alert', timestamp=None):
# Trim notifications
def remove_old(keepNumberOfEntries):
"""
Trim the notifications file, keeping only the most recent N entries.
Args:
keepNumberOfEntries (int): Number of latest notifications to retain.
Returns:
None
"""
# Check if file exists
if not os.path.exists(NOTIFICATION_API_FILE):
mylog('info', '[Notification] No notifications file to clean.')
@@ -106,3 +126,141 @@ def remove_old(keepNumberOfEntries):
mylog('verbose', f'[Notification] Trimmed notifications to latest {keepNumberOfEntries}')
except Exception as e:
mylog('none', f'Error writing trimmed notifications file: {e}')
def mark_all_notifications_read():
"""
Mark all existing notifications as read.
Returns:
dict: JSON-compatible dictionary containing:
{
"success": bool,
"error": str (optional)
}
"""
if not os.path.exists(NOTIFICATION_API_FILE):
return {"success": True}
try:
with open(NOTIFICATION_API_FILE, "r") as f:
notifications = json.load(f)
except Exception as e:
mylog("none", f"[Notification] Failed to read notifications: {e}")
return {"success": False, "error": str(e)}
for n in notifications:
n["read"] = 1
try:
with open(NOTIFICATION_API_FILE, "w") as f:
json.dump(notifications, f, indent=4)
except Exception as e:
mylog("none", f"[Notification] Failed to write notifications: {e}")
return {"success": False, "error": str(e)}
mylog("debug", "[Notification] All notifications marked as read.")
return {"success": True}
def delete_notifications():
"""
Delete all notifications from the JSON file.
Returns:
A JSON response with {"success": True}.
"""
with open(NOTIFICATION_API_FILE, "w") as f:
json.dump([], f, indent=4)
mylog("debug", "[Notification] All notifications deleted.")
return jsonify({"success": True})
def get_unread_notifications():
"""
Retrieve all unread notifications from the JSON file.
Returns:
A JSON array of unread notification objects.
"""
if not os.path.exists(NOTIFICATION_API_FILE):
return jsonify([])
with open(NOTIFICATION_API_FILE, "r") as f:
notifications = json.load(f)
unread = [n for n in notifications if n.get("read", 0) == 0]
return jsonify(unread)
def mark_notification_as_read(guid=None, max_attempts=3):
"""
Mark a notification as read based on GUID.
If guid is None, mark all notifications as read.
Args:
guid (str, optional): The GUID of the notification to mark. Defaults to None.
max_attempts (int, optional): Number of attempts to read/write file. Defaults to 3.
Returns:
dict: {"success": True} on success, {"success": False, "error": "..."} on failure
"""
attempts = 0
while attempts < max_attempts:
try:
if os.path.exists(NOTIFICATION_API_FILE) and os.access(NOTIFICATION_API_FILE, os.R_OK | os.W_OK):
with open(NOTIFICATION_API_FILE, "r") as f:
notifications = json.load(f)
if notifications is not None:
for notification in notifications:
if guid is None or notification.get("guid") == guid:
notification["read"] = 1
with open(NOTIFICATION_API_FILE, "w") as f:
json.dump(notifications, f, indent=4)
return {"success": True}
except Exception as e:
mylog("none", f"[Notification] Attempt {attempts+1} failed: {e}")
attempts += 1
time.sleep(0.5) # Sleep 0.5 seconds before retrying
error_msg = f"Failed to read/write notification file after {max_attempts} attempts."
mylog("none", f"[Notification] {error_msg}")
return {"success": False, "error": error_msg}
def delete_notification(guid):
"""
Delete a notification from the notifications file based on its GUID.
Args:
guid (str): The GUID of the notification to delete.
Returns:
dict: {"success": True} on success, {"success": False, "error": "..."} on failure
"""
if not guid:
return {"success": False, "error": "GUID is required"}
if not os.path.exists(NOTIFICATION_API_FILE):
return {"success": True} # Nothing to delete
try:
with open(NOTIFICATION_API_FILE, "r") as f:
notifications = json.load(f)
# Filter out the notification with the specified GUID
filtered_notifications = [n for n in notifications if n.get("guid") != guid]
# Write the updated notifications back
with open(NOTIFICATION_API_FILE, "w") as f:
json.dump(filtered_notifications, f, indent=4)
return {"success": True}
except Exception as e:
mylog("none", f"[Notification] Failed to delete notification {guid}: {e}")
return {"success": False, "error": str(e)}

View File

@@ -0,0 +1,111 @@
# -----------------------------
# In-app notifications tests with cleanup
# -----------------------------
import json
import random
import string
import uuid
import pytest
import os
import sys
# Define the installation path and extend the system path for plugin imports
INSTALL_PATH = "/app"
sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
from api_server.api_server_start import app
from messaging.in_app import NOTIFICATION_API_FILE # Import the path to notifications file
from helper import get_setting_value
@pytest.fixture(scope="session")
def api_token():
return get_setting_value("API_TOKEN")
@pytest.fixture
def client():
with app.test_client() as client:
yield client
def auth_headers(token):
return {"Authorization": f"Bearer {token}"}
@pytest.fixture
def random_content():
return "Test Notification " + "".join(random.choices(string.ascii_letters + string.digits, k=6))
@pytest.fixture
def notification_guid(client, api_token, random_content):
# Write a notification and return its GUID
resp = client.post(
"/messaging/in-app/write",
json={"content": random_content, "level": "alert"},
headers=auth_headers(api_token)
)
assert resp.status_code == 200
# Fetch the unread notifications and get GUID
resp = client.get("/messaging/in-app/unread", headers=auth_headers(api_token))
data = resp.json
guid = next((n["guid"] for n in data if n["content"] == random_content), None)
assert guid is not None
return guid
@pytest.fixture(autouse=True)
def cleanup_notifications():
# Runs before and after each test
# Backup original file if exists
backup = None
if os.path.exists(NOTIFICATION_API_FILE):
with open(NOTIFICATION_API_FILE, "r") as f:
backup = f.read()
yield # run the test
# Cleanup after test
with open(NOTIFICATION_API_FILE, "w") as f:
f.write("[]")
# Restore backup if needed
if backup:
with open(NOTIFICATION_API_FILE, "w") as f:
f.write(backup)
# -----------------------------
def test_write_notification(client, api_token, random_content):
resp = client.post(
"/messaging/in-app/write",
json={"content": random_content, "level": "alert"},
headers=auth_headers(api_token)
)
assert resp.status_code == 200
assert resp.json.get("success") is True
def test_get_unread_notifications(client, api_token, random_content):
client.post("/messaging/in-app/write", json={"content": random_content}, headers=auth_headers(api_token))
resp = client.get("/messaging/in-app/unread", headers=auth_headers(api_token))
assert resp.status_code == 200
notifications = resp.json
assert any(n["content"] == random_content for n in notifications)
def test_mark_all_notifications_read(client, api_token, random_content):
client.post("/messaging/in-app/write", json={"content": random_content}, headers=auth_headers(api_token))
resp = client.post("/messaging/in-app/read/all", headers=auth_headers(api_token))
assert resp.status_code == 200
assert resp.json.get("success") is True
def test_mark_single_notification_read(client, api_token, notification_guid):
resp = client.post(f"/messaging/in-app/read/{notification_guid}", headers=auth_headers(api_token))
assert resp.status_code == 200
assert resp.json.get("success") is True
def test_delete_single_notification(client, api_token, notification_guid):
resp = client.delete(f"/messaging/in-app/delete/{notification_guid}", headers=auth_headers(api_token))
assert resp.status_code == 200
assert resp.json.get("success") is True
def test_delete_all_notifications(client, api_token, random_content):
# Add a notification first
client.post("/messaging/in-app/write", json={"content": random_content}, headers=auth_headers(api_token))
resp = client.delete("/messaging/in-app/delete", headers=auth_headers(api_token))
assert resp.status_code == 200
assert resp.json.get("success") is True