BE: LangStrings /graphql + /logs endpoint, utils chores
Some checks failed
docker / docker_dev (push) Has been cancelled

Signed-off-by: jokob-sk <jokob.sk@gmail.com>
This commit is contained in:
jokob-sk
2025-11-09 18:50:16 +11:00
parent 68c8e16828
commit 8483a741b4
25 changed files with 757 additions and 88 deletions

View File

@@ -64,8 +64,9 @@ http://<server>:<GRAPHQL_PORT>/
* [Metrics](API_METRICS.md) Prometheus metrics and per-device status
* [Network Tools](API_NETTOOLS.md) Utilities like Wake-on-LAN, traceroute, nslookup, nmap, and internet info
* [Online History](API_ONLINEHISTORY.md) Online/offline device records
* [GraphQL](API_GRAPHQL.md) Advanced queries and filtering
* [GraphQL](API_GRAPHQL.md) Advanced queries and filtering for Devices, Settings and Language Strings
* [Sync](API_SYNC.md) Synchronization between multiple NetAlertX instances
* [Logs](API_LOGS.md) Purging of logs and adding to the event execution queue for user triggered events
* [DB query](API_DBQUERY.md) (⚠ Internal) - Low level database access - use other endpoints if possible
See [Testing](API_TESTS.md) for example requests and usage.

View File

@@ -1,9 +1,10 @@
# GraphQL API Endpoint
GraphQL queries are **read-optimized for speed**. Data may be slightly out of date until the file system cache refreshes. The GraphQL endpoints allows you to access the following objects:
GraphQL queries are **read-optimized for speed**. Data may be slightly out of date until the file system cache refreshes. The GraphQL endpoints allow you to access the following objects:
- Devices
- Settings
* Devices
* Settings
* Language Strings (LangStrings)
## Endpoints
@@ -190,11 +191,74 @@ curl 'http://host:GRAPHQL_PORT/graphql' \
}
```
---
## LangStrings Query
The **LangStrings query** provides access to localized strings. Supports filtering by `langCode` and `langStringKey`. If the requested string is missing or empty, you can optionally fallback to `en_us`.
### Sample Query
```graphql
query GetLangStrings {
langStrings(langCode: "de_de", langStringKey: "settings_other_scanners") {
langStrings {
langCode
langStringKey
langStringText
}
count
}
}
```
### Query Parameters
| Parameter | Type | Description |
| ---------------- | ------- | ---------------------------------------------------------------------------------------- |
| `langCode` | String | Optional language code (e.g., `en_us`, `de_de`). If omitted, all languages are returned. |
| `langStringKey` | String | Optional string key to retrieve a specific entry. |
| `fallback_to_en` | Boolean | Optional (default `true`). If `true`, empty or missing strings fallback to `en_us`. |
### `curl` Example
```sh
curl 'http://host:GRAPHQL_PORT/graphql' \
-X POST \
-H 'Authorization: Bearer API_TOKEN' \
-H 'Content-Type: application/json' \
--data '{
"query": "query GetLangStrings { langStrings(langCode: \"de_de\", langStringKey: \"settings_other_scanners\") { langStrings { langCode langStringKey langStringText } count } }"
}'
```
### Sample Response
```json
{
"data": {
"langStrings": {
"count": 1,
"langStrings": [
{
"langCode": "de_de",
"langStringKey": "settings_other_scanners",
"langStringText": "Other, non-device scanner plugins that are currently enabled." // falls back to en_us if empty
}
]
}
}
}
```
---
## Notes
* Device and settings queries can be combined in one request since GraphQL supports batching.
* Device, settings, and LangStrings queries can be combined in **one request** since GraphQL supports batching.
* The `fallback_to_en` feature ensures UI always has a value even if a translation is missing.
* Data is **cached in memory** per JSON file; changes to language or plugin files will only refresh after the cache detects a file modification.
* The `setOverriddenByEnv` flag helps identify setting values that are locked at container runtime.
* The schema is **read-only** — updates must be performed through other APIs or configuration management. See the other [API](API.md) endpoints for details.

179
docs/API_LOGS.md Normal file
View File

@@ -0,0 +1,179 @@
# Logs API Endpoints
Manage or purge application log files stored under `/app/log` and manage the execution queue. These endpoints are primarily used for maintenance tasks such as clearing accumulated logs or adding system actions without restarting the container.
Only specific, pre-approved log files can be purged for security and stability reasons.
---
## Delete (Purge) a Log File
* **DELETE** `/logs?file=<log_file>` → Purge the contents of an allowed log file.
**Query Parameter:**
* `file` → The name of the log file to purge (e.g., `app.log`, `stdout.log`)
**Allowed Files:**
```
app.log
app_front.log
IP_changes.log
stdout.log
stderr.log
app.php_errors.log
execution_queue.log
db_is_locked.log
```
**Authorization:**
Requires a valid API token in the `Authorization` header.
---
### `curl` Example (Success)
```sh
curl -X DELETE 'http://<server_ip>:<GRAPHQL_PORT>/logs?file=app.log' \
-H 'Authorization: Bearer <API_TOKEN>' \
-H 'Accept: application/json'
```
**Response:**
```json
{
"success": true,
"message": "[clean_log] File app.log purged successfully"
}
```
---
### `curl` Example (Not Allowed)
```sh
curl -X DELETE 'http://<server_ip>:<GRAPHQL_PORT>/logs?file=not_allowed.log' \
-H 'Authorization: Bearer <API_TOKEN>' \
-H 'Accept: application/json'
```
**Response:**
```json
{
"success": false,
"message": "[clean_log] File not_allowed.log is not allowed to be purged"
}
```
---
### `curl` Example (Unauthorized)
```sh
curl -X DELETE 'http://<server_ip>:<GRAPHQL_PORT>/logs?file=app.log' \
-H 'Accept: application/json'
```
**Response:**
```json
{
"error": "Forbidden"
}
```
---
## Add an Action to the Execution Queue
* **POST** `/logs/add-to-execution-queue` → Add a system action to the execution queue.
**Request Body (JSON):**
```json
{
"action": "update_api|devices"
}
```
**Authorization:**
Requires a valid API token in the `Authorization` header.
---
### `curl` Example (Success)
The below will update the API cache for Devices
```sh
curl -X POST 'http://<server_ip>:<GRAPHQL_PORT>/logs/add-to-execution-queue' \
-H 'Authorization: Bearer <API_TOKEN>' \
-H 'Content-Type: application/json' \
--data '{"action": "update_api|devices"}'
```
**Response:**
```json
{
"success": true,
"message": "[UserEventsQueueInstance] Action \"update_api|devices\" added to the execution queue."
}
```
---
### `curl` Example (Missing Parameter)
```sh
curl -X POST 'http://<server_ip>:<GRAPHQL_PORT>/logs/add-to-execution-queue' \
-H 'Authorization: Bearer <API_TOKEN>' \
-H 'Content-Type: application/json' \
--data '{}'
```
**Response:**
```json
{
"success": false,
"message": "Missing parameters",
"error": "Missing required 'action' field in JSON body"
}
```
---
### `curl` Example (Unauthorized)
```sh
curl -X POST 'http://<server_ip>:<GRAPHQL_PORT>/logs/add-to-execution-queue' \
-H 'Content-Type: application/json' \
--data '{"action": "update_api|devices"}'
```
**Response:**
```json
{
"error": "Forbidden"
}
```
---
## Notes
* Only predefined files in `/app/log` can be purged — arbitrary paths are **not permitted**.
* When a log file is purged:
* Its content is replaced with a short marker text: `"File manually purged"`.
* A backend log entry is created via `mylog()`.
* A frontend notification is generated via `write_notification()`.
* Execution queue actions are appended to `execution_queue.log` and can be processed asynchronously by background tasks or workflows.
* Unauthorized or invalid attempts are safely logged and rejected.
* For advanced log retrieval, analysis, or structured querying, use the frontend log viewer.
* Always ensure that sensitive or production logs are handled carefully — purging cannot be undone.

View File

@@ -176,7 +176,10 @@ function checkPermissions($files)
}
// ----------------------------------------------------------------------------------------
// 🔺----- API ENDPOINTS SUPERSEDED -----🔺
// check server/api_server/api_server_start.py for equivalents
// equivalent: /messaging/in-app/write
// 🔺----- API ENDPOINTS SUPERSEDED -----🔺
function displayMessage($message, $logAlert = FALSE, $logConsole = TRUE, $logFile = TRUE, $logEcho = FALSE)
{
global $logFolderPath, $log_file, $timestamp;
@@ -234,7 +237,10 @@ function displayMessage($message, $logAlert = FALSE, $logConsole = TRUE, $logFil
}
// 🔺----- API ENDPOINTS SUPERSEDED -----🔺
// check server/api_server/api_server_start.py for equivalents
// equivalent: /logs/add-to-execution-queue
// 🔺----- API ENDPOINTS SUPERSEDED -----🔺
// ----------------------------------------------------------------------------------------
// Adds an action to perform into the execution_queue.log file
function addToExecutionQueue($action)
@@ -257,6 +263,10 @@ function addToExecutionQueue($action)
// ----------------------------------------------------------------------------------------
// 🔺----- API ENDPOINTS SUPERSEDED -----🔺
// check server/api_server/api_server_start.py for equivalents
// equivalent: /logs DELETE
// 🔺----- API ENDPOINTS SUPERSEDED -----🔺
function cleanLog($logFile)
{
global $logFolderPath, $timestamp;
@@ -418,6 +428,10 @@ function saveSettings()
}
// -------------------------------------------------------------------------------------------
// 🔺----- API ENDPOINTS SUPERSEDED -----🔺
// check server/api_server/api_server_start.py for equivalents
// equivalent: /graphql LangStrings endpoint
// 🔺----- API ENDPOINTS SUPERSEDED -----🔺
function getString ($setKey, $default) {
$result = lang($setKey);
@@ -430,6 +444,10 @@ function getString ($setKey, $default) {
return $default;
}
// -------------------------------------------------------------------------------------------
// 🔺----- API ENDPOINTS SUPERSEDED -----🔺
// check server/api_server/api_server_start.py for equivalents
// equivalent: /settings/<key>
// 🔺----- API ENDPOINTS SUPERSEDED -----🔺
function getSettingValue($setKey) {
// Define the JSON endpoint URL
$url = dirname(__FILE__).'/../../../api/table_settings.json';

View File

@@ -12,7 +12,7 @@ INSTALL_PATH = "/app"
sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
from plugin_helper import Plugin_Object, Plugin_Objects, decodeBase64
from plugin_utils import get_plugins_configs
from utils.plugin_utils import get_plugins_configs
from logger import mylog, Logger
from const import pluginsPath, fullDbPath, logPath
from helper import get_setting_value

View File

@@ -20,7 +20,7 @@ sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
# NetAlertX modules
import conf
from const import apiPath, confFileName, logPath
from plugin_utils import getPluginObject
from utils.plugin_utils import getPluginObject
from plugin_helper import Plugin_Objects
from logger import mylog, Logger, append_line_to_file
from helper import get_setting_value, bytes_to_string, sanitize_string, cleanDeviceName

View File

@@ -20,7 +20,7 @@ sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
# NetAlertX modules
import conf
from const import confFileName, logPath
from plugin_utils import getPluginObject
from utils.plugin_utils import getPluginObject
from plugin_helper import Plugin_Objects
from logger import mylog, Logger
from helper import get_setting_value, bytes_to_string, \

View File

@@ -12,7 +12,7 @@ INSTALL_PATH = "/app"
sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
from plugin_helper import Plugin_Object, Plugin_Objects, decodeBase64
from plugin_utils import get_plugins_configs
from utils.plugin_utils import get_plugins_configs
from logger import mylog, Logger
from const import pluginsPath, fullDbPath, logPath
from helper import get_setting_value

View File

@@ -21,7 +21,7 @@ INSTALL_PATH = "/app"
sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
from plugin_helper import Plugin_Object, Plugin_Objects, decodeBase64
from plugin_utils import get_plugins_configs
from utils.plugin_utils import get_plugins_configs
from logger import mylog, Logger
from const import pluginsPath, fullDbPath, logPath
from helper import get_setting_value

View File

@@ -15,7 +15,7 @@ INSTALL_PATH = "/app"
sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
from plugin_helper import Plugin_Object, Plugin_Objects, decodeBase64, handleEmpty
from plugin_utils import get_plugins_configs
from utils.plugin_utils import get_plugins_configs
from logger import mylog, Logger
from const import pluginsPath, fullDbPath, logPath
from helper import get_setting_value

View File

@@ -12,7 +12,7 @@ INSTALL_PATH = "/app"
sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
from plugin_helper import Plugin_Object, Plugin_Objects, decodeBase64
from plugin_utils import get_plugins_configs
from utils.plugin_utils import get_plugins_configs
from logger import mylog, Logger
from const import pluginsPath, fullDbPath, logPath
from helper import get_setting_value

View File

@@ -41,7 +41,7 @@ INSTALL_PATH = "/app"
sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
from plugin_helper import Plugin_Object, Plugin_Objects, decodeBase64
from plugin_utils import get_plugins_configs
from utils.plugin_utils import get_plugins_configs
from logger import mylog, Logger
from const import pluginsPath, fullDbPath, logPath
from helper import get_setting_value

View File

@@ -15,12 +15,12 @@ INSTALL_PATH = "/app"
sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
from plugin_helper import Plugin_Object, Plugin_Objects, decodeBase64
from plugin_utils import get_plugins_configs, decode_and_rename_files
from utils.plugin_utils import get_plugins_configs, decode_and_rename_files
from logger import mylog, Logger
from const import pluginsPath, fullDbPath, logPath
from helper import get_setting_value
from utils.datetime_utils import timeNowDB
from crypto_utils import encrypt_data
from utils.crypto_utils import encrypt_data
from messaging.in_app import write_notification
import conf
from pytz import timezone

View File

@@ -13,7 +13,7 @@ INSTALL_PATH = "/app"
sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
from plugin_helper import Plugin_Object, Plugin_Objects, decodeBase64, decode_settings_base64
from plugin_utils import get_plugins_configs
from utils.plugin_utils import get_plugins_configs
from logger import mylog, Logger
from const import pluginsPath, fullDbPath, logPath
from helper import get_setting_value

View File

@@ -13,7 +13,7 @@ INSTALL_PATH = "/app"
sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
from plugin_helper import Plugin_Object, Plugin_Objects, decodeBase64
from plugin_utils import get_plugins_configs
from utils.plugin_utils import get_plugins_configs
from logger import mylog, Logger
from const import pluginsPath, fullDbPath, logPath
from helper import get_setting_value

View File

@@ -24,6 +24,8 @@ from .sessions_endpoint import get_sessions, delete_session, create_session, get
from .nettools_endpoint import wakeonlan, traceroute, speedtest, nslookup, nmap_scan, internet_info
from .dbquery_endpoint import read_query, write_query, update_query, delete_query
from .sync_endpoint import handle_sync_post, handle_sync_get
from .logs_endpoint import clean_log
from models.user_events_queue_instance import UserEventsQueueInstance
from messaging.in_app import write_notification, mark_all_notifications_read, delete_notifications, get_unread_notifications, delete_notification, mark_notification_as_read
# Flask application
@@ -40,12 +42,25 @@ CORS(
r"/settings/*": {"origins": "*"},
r"/dbquery/*": {"origins": "*"},
r"/messaging/*": {"origins": "*"},
r"/events/*": {"origins": "*"}
r"/events/*": {"origins": "*"},
r"/logs/*": {"origins": "*"}
},
supports_credentials=True,
allow_headers=["Authorization", "Content-Type"]
)
# -------------------------------------------------------------------
# Custom handler for 404 - Route not found
# -------------------------------------------------------------------
@app.errorhandler(404)
def not_found(error):
response = {
"success": False,
"error": "API route not found",
"message": f"The requested URL {error.description if hasattr(error, 'description') else ''} was not found on the server.",
}
return jsonify(response), 404
# --------------------------
# GraphQL Endpoints
# --------------------------
@@ -63,7 +78,7 @@ def graphql_endpoint():
if not is_authorized():
msg = '[graphql_server] Unauthorized access attempt - make sure your GRAPHQL_PORT and API_TOKEN settings are correct.'
mylog('verbose', [msg])
return jsonify({"error": msg}), 401
return jsonify({"success": False, "message": msg}), 401
# Retrieve and log request data
data = request.get_json()
@@ -89,7 +104,7 @@ def graphql_endpoint():
@app.route("/settings/<setKey>", methods=["GET"])
def api_get_setting(setKey):
if not is_authorized():
return jsonify({"error": "Forbidden"}), 403
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
value = get_setting_value(setKey)
return jsonify({"success": True, "value": value})
@@ -100,58 +115,58 @@ def api_get_setting(setKey):
@app.route("/device/<mac>", methods=["GET"])
def api_get_device(mac):
if not is_authorized():
return jsonify({"error": "Forbidden"}), 403
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
return get_device_data(mac)
@app.route("/device/<mac>", methods=["POST"])
def api_set_device(mac):
if not is_authorized():
return jsonify({"error": "Forbidden"}), 403
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
return set_device_data(mac, request.json)
@app.route("/device/<mac>/delete", methods=["DELETE"])
def api_delete_device(mac):
if not is_authorized():
return jsonify({"error": "Forbidden"}), 403
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
return delete_device(mac)
@app.route("/device/<mac>/events/delete", methods=["DELETE"])
def api_delete_device_events(mac):
if not is_authorized():
return jsonify({"error": "Forbidden"}), 403
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
return delete_device_events(mac)
@app.route("/device/<mac>/reset-props", methods=["POST"])
def api_reset_device_props(mac):
if not is_authorized():
return jsonify({"error": "Forbidden"}), 403
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
return reset_device_props(mac, request.json)
@app.route("/device/copy", methods=["POST"])
def api_copy_device():
if not is_authorized():
return jsonify({"error": "Forbidden"}), 403
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
data = request.get_json() or {}
mac_from = data.get("macFrom")
mac_to = data.get("macTo")
if not mac_from or not mac_to:
return jsonify({"success": False, "error": "macFrom and macTo are required"}), 400
return jsonify({"success": False, "message": "Missing parameters", "error": "macFrom and macTo are required"}), 400
return copy_device(mac_from, mac_to)
@app.route("/device/<mac>/update-column", methods=["POST"])
def api_update_device_column(mac):
if not is_authorized():
return jsonify({"error": "Forbidden"}), 403
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
data = request.get_json() or {}
column_name = data.get("columnName")
column_value = data.get("columnValue")
if not column_name or not column_value:
return jsonify({"success": False, "error": "columnName and columnValue are required"}), 400
return jsonify({"success": False, "message": "Missing parameters", "error": "columnName and columnValue are required"}), 400
return update_device_column(mac, column_name, column_value)
@@ -162,13 +177,13 @@ def api_update_device_column(mac):
@app.route("/devices", methods=["GET"])
def api_get_devices():
if not is_authorized():
return jsonify({"error": "Forbidden"}), 403
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
return get_all_devices()
@app.route("/devices", methods=["DELETE"])
def api_delete_devices():
if not is_authorized():
return jsonify({"error": "Forbidden"}), 403
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
macs = request.json.get("macs") if request.is_json else None
@@ -177,13 +192,13 @@ def api_delete_devices():
@app.route("/devices/empty-macs", methods=["DELETE"])
def api_delete_all_empty_macs():
if not is_authorized():
return jsonify({"error": "Forbidden"}), 403
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
return delete_all_with_empty_macs()
@app.route("/devices/unknown", methods=["DELETE"])
def api_delete_unknown_devices():
if not is_authorized():
return jsonify({"error": "Forbidden"}), 403
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
return delete_unknown_devices()
@@ -191,7 +206,7 @@ def api_delete_unknown_devices():
@app.route("/devices/export/<format>", methods=["GET"])
def api_export_devices(format=None):
if not is_authorized():
return jsonify({"error": "Forbidden"}), 403
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
export_format = (format or request.args.get("format", "csv")).lower()
return export_devices(export_format)
@@ -199,19 +214,19 @@ def api_export_devices(format=None):
@app.route("/devices/import", methods=["POST"])
def api_import_csv():
if not is_authorized():
return jsonify({"error": "Forbidden"}), 403
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
return import_csv(request.files.get("file"))
@app.route("/devices/totals", methods=["GET"])
def api_devices_totals():
if not is_authorized():
return jsonify({"error": "Forbidden"}), 403
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
return devices_totals()
@app.route("/devices/by-status", methods=["GET"])
def api_devices_by_status():
if not is_authorized():
return jsonify({"error": "Forbidden"}), 403
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
status = request.args.get("status", "") if request.args else None
@@ -223,7 +238,7 @@ def api_devices_by_status():
@app.route("/nettools/wakeonlan", methods=["POST"])
def api_wakeonlan():
if not is_authorized():
return jsonify({"error": "Forbidden"}), 403
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
mac = request.json.get("devMac")
return wakeonlan(mac)
@@ -231,14 +246,14 @@ def api_wakeonlan():
@app.route("/nettools/traceroute", methods=["POST"])
def api_traceroute():
if not is_authorized():
return jsonify({"error": "Forbidden"}), 403
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
ip = request.json.get("devLastIP")
return traceroute(ip)
@app.route("/nettools/speedtest", methods=["GET"])
def api_speedtest():
if not is_authorized():
return jsonify({"error": "Forbidden"}), 403
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
return speedtest()
@app.route("/nettools/nslookup", methods=["POST"])
@@ -248,11 +263,11 @@ def api_nslookup():
Expects JSON with 'devLastIP'.
"""
if not is_authorized():
return jsonify({"success": False, "error": "Forbidden"}), 403
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
data = request.get_json(silent=True)
if not data or "devLastIP" not in data:
return jsonify({"success": False, "error": "Missing 'devLastIP'"}), 400
return jsonify({"success": False, "message": "Missing parameters", "error": "Missing 'devLastIP'"}), 400
ip = data["devLastIP"]
return nslookup(ip)
@@ -264,11 +279,11 @@ def api_nmap():
Expects JSON with 'scan' (IP address) and 'mode' (scan mode).
"""
if not is_authorized():
return jsonify({"success": False, "error": "Forbidden"}), 403
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
data = request.get_json(silent=True)
if not data or "scan" not in data or "mode" not in data:
return jsonify({"success": False, "error": "Missing 'scan' or 'mode'"}), 400
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Missing 'scan' or 'mode'"}), 400
ip = data["scan"]
mode = data["mode"]
@@ -278,7 +293,7 @@ def api_nmap():
@app.route("/nettools/internetinfo", methods=["GET"])
def api_internet_info():
if not is_authorized():
return jsonify({"success": False, "error": "Forbidden"}), 403
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
return internet_info()
@@ -289,13 +304,13 @@ def api_internet_info():
@app.route("/dbquery/read", methods=["POST"])
def dbquery_read():
if not is_authorized():
return jsonify({"error": "Forbidden"}), 403
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
data = request.get_json() or {}
raw_sql_b64 = data.get("rawSql")
if not raw_sql_b64:
return jsonify({"error": "rawSql is required"}), 400
return jsonify({"success": False, "message": "Missing parameters", "error": "rawSql is required"}), 400
return read_query(raw_sql_b64)
@@ -303,12 +318,12 @@ def dbquery_read():
@app.route("/dbquery/write", methods=["POST"])
def dbquery_write():
if not is_authorized():
return jsonify({"error": "Forbidden"}), 403
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
data = request.get_json() or {}
raw_sql_b64 = data.get("rawSql")
if not raw_sql_b64:
return jsonify({"error": "rawSql is required"}), 400
return jsonify({"success": False, "message": "Missing parameters", "error": "rawSql is required"}), 400
return write_query(raw_sql_b64)
@@ -316,12 +331,12 @@ def dbquery_write():
@app.route("/dbquery/update", methods=["POST"])
def dbquery_update():
if not is_authorized():
return jsonify({"error": "Forbidden"}), 403
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
data = request.get_json() or {}
required = ["columnName", "id", "dbtable", "columns", "values"]
if not all(data.get(k) for k in required):
return jsonify({"error": "Missing required parameters"}), 400
return jsonify({"success": False, "message": "Missing parameters", "error": "Missing required 'columnName', 'id', 'dbtable', 'columns', or 'values' query parameter"}), 400
return update_query(
column_name=data["columnName"],
@@ -335,12 +350,12 @@ def dbquery_update():
@app.route("/dbquery/delete", methods=["POST"])
def dbquery_delete():
if not is_authorized():
return jsonify({"error": "Forbidden"}), 403
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
data = request.get_json() or {}
required = ["columnName", "id", "dbtable"]
if not all(data.get(k) for k in required):
return jsonify({"error": "Missing required parameters"}), 400
return jsonify({"success": False, "message": "Missing parameters", "error": "Missing required 'columnName', 'id', or 'dbtable' query parameter"}), 400
return delete_query(
column_name=data["columnName"],
@@ -355,9 +370,46 @@ def dbquery_delete():
@app.route("/history", methods=["DELETE"])
def api_delete_online_history():
if not is_authorized():
return jsonify({"error": "Forbidden"}), 403
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
return delete_online_history()
# --------------------------
# Logs
# --------------------------
@app.route("/logs", methods=["DELETE"])
def api_clean_log():
if not is_authorized():
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
file = request.args.get("file")
if not file:
return jsonify({"success": False, "message": "Missing parameters", "error": "Missing 'file' query parameter"}), 400
return clean_log(file)
@app.route("/logs/add-to-execution-queue", methods=["POST"])
def api_add_to_execution_queue():
queue = UserEventsQueueInstance()
# Get JSON payload safely
data = request.get_json(silent=True) or {}
action = data.get("action")
if not action:
return jsonify({
"success": False, "message": "Missing parameters", "error": "Missing required 'action' field in JSON body"}), 400
success, message = queue.add_event(action)
status_code = 200 if success else 400
response = {"success": success, "message": message}
if not success:
response["error"] = "ERROR"
return jsonify(response), status_code
# --------------------------
# Device Events
# --------------------------
@@ -365,7 +417,7 @@ def api_delete_online_history():
@app.route("/events/create/<mac>", methods=["POST"])
def api_create_event(mac):
if not is_authorized():
return jsonify({"error": "Forbidden"}), 403
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
data = request.json or {}
ip = data.get("ip", "0.0.0.0")
@@ -384,19 +436,19 @@ def api_create_event(mac):
@app.route("/events/<mac>", methods=["DELETE"])
def api_events_by_mac(mac):
if not is_authorized():
return jsonify({"error": "Forbidden"}), 403
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
return delete_device_events(mac)
@app.route("/events", methods=["DELETE"])
def api_delete_all_events():
if not is_authorized():
return jsonify({"error": "Forbidden"}), 403
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
return delete_events()
@app.route("/events", methods=["GET"])
def api_get_events():
if not is_authorized():
return jsonify({"error": "Forbidden"}), 403
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
mac = request.args.get("mac")
return get_events(mac)
@@ -408,14 +460,14 @@ def api_delete_old_events(days: int):
Example: DELETE /events/30
"""
if not is_authorized():
return jsonify({"error": "Forbidden"}), 403
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
return delete_events_older_than(days)
@app.route("/sessions/totals", methods=["GET"])
def api_get_events_totals():
if not is_authorized():
return jsonify({"error": "Forbidden"}), 403
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
period = get_date_from_period(request.args.get("period", "7 days"))
return get_events_totals(period)
@@ -427,7 +479,7 @@ def api_get_events_totals():
@app.route("/sessions/create", methods=["POST"])
def api_create_session():
if not is_authorized():
return jsonify({"error": "Forbidden"}), 403
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
data = request.json
mac = data.get("mac")
@@ -438,7 +490,7 @@ def api_create_session():
event_type_disc = data.get("event_type_disc", "Disconnected")
if not mac or not ip or not start_time:
return jsonify({"success": False, "error": "Missing required parameters"}), 400
return jsonify({"success": False, "message": "Missing parameters", "error": "Missing required 'mac', 'ip', or 'start_time' query parameter"}), 400
return create_session(mac, ip, start_time, end_time, event_type_conn, event_type_disc)
@@ -446,11 +498,11 @@ def api_create_session():
@app.route("/sessions/delete", methods=["DELETE"])
def api_delete_session():
if not is_authorized():
return jsonify({"error": "Forbidden"}), 403
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
mac = request.json.get("mac") if request.is_json else None
if not mac:
return jsonify({"success": False, "error": "Missing MAC parameter"}), 400
return jsonify({"success": False, "message": "Missing parameters", "error": "Missing 'mac' query parameter"}), 400
return delete_session(mac)
@@ -458,7 +510,7 @@ def api_delete_session():
@app.route("/sessions/list", methods=["GET"])
def api_get_sessions():
if not is_authorized():
return jsonify({"error": "Forbidden"}), 403
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
mac = request.args.get("mac")
start_date = request.args.get("start_date")
@@ -469,7 +521,7 @@ def api_get_sessions():
@app.route("/sessions/calendar", methods=["GET"])
def api_get_sessions_calendar():
if not is_authorized():
return jsonify({"error": "Forbidden"}), 403
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
# Query params: /sessions/calendar?start=2025-08-01&end=2025-08-21
start_date = request.args.get("start")
@@ -480,7 +532,7 @@ def api_get_sessions_calendar():
@app.route("/sessions/<mac>", methods=["GET"])
def api_device_sessions(mac):
if not is_authorized():
return jsonify({"error": "Forbidden"}), 403
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
period = request.args.get("period", "1 day")
return get_device_sessions(mac, period)
@@ -488,7 +540,7 @@ def api_device_sessions(mac):
@app.route("/sessions/session-events", methods=["GET"])
def api_get_session_events():
if not is_authorized():
return jsonify({"error": "Forbidden"}), 403
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
session_event_type = request.args.get("type", "all")
period = get_date_from_period(request.args.get("period", "7 days"))
@@ -500,7 +552,7 @@ def api_get_session_events():
@app.route("/metrics")
def metrics():
if not is_authorized():
return jsonify({"error": "Forbidden"}), 403
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
# Return Prometheus metrics as plain text
return Response(get_metric_stats(), mimetype="text/plain")
@@ -511,14 +563,14 @@ def metrics():
@app.route("/messaging/in-app/write", methods=["POST"])
def api_write_notification():
if not is_authorized():
return jsonify({"error": "Forbidden"}), 403
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
data = request.json or {}
content = data.get("content")
level = data.get("level", "alert")
if not content:
return jsonify({"success": False, "error": "Missing content"}), 400
return jsonify({"success": False, "message": "Missing parameters", "error": "Missing content"}), 400
write_notification(content, level)
return jsonify({"success": True})
@@ -526,21 +578,21 @@ def api_write_notification():
@app.route("/messaging/in-app/unread", methods=["GET"])
def api_get_unread_notifications():
if not is_authorized():
return jsonify({"error": "Forbidden"}), 403
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
return get_unread_notifications()
@app.route("/messaging/in-app/read/all", methods=["POST"])
def api_mark_all_notifications_read():
if not is_authorized():
return jsonify({"error": "Forbidden"}), 403
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
return jsonify(mark_all_notifications_read())
@app.route("/messaging/in-app/delete", methods=["DELETE"])
def api_delete_all_notifications():
if not is_authorized():
return jsonify({"error": "Forbidden"}), 403
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
return delete_notifications()
@@ -548,25 +600,25 @@ def api_delete_all_notifications():
def api_delete_notification(guid):
"""Delete a single notification by GUID."""
if not is_authorized():
return jsonify({"error": "Forbidden"}), 403
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
result = delete_notification(guid)
if result.get("success"):
return jsonify({"success": True})
else:
return jsonify({"success": False, "error": result.get("error")}), 500
return jsonify({"success": False, "message": "ERROR", "error": result.get("error")}), 500
@app.route("/messaging/in-app/read/<guid>", methods=["POST"])
def api_mark_notification_read(guid):
"""Mark a single notification as read by GUID."""
if not is_authorized():
return jsonify({"error": "Forbidden"}), 403
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
result = mark_notification_as_read(guid)
if result.get("success"):
return jsonify({"success": True})
else:
return jsonify({"success": False, "error": result.get("error")}), 500
return jsonify({"success": False, "message": "ERROR", "error": result.get("error")}), 500
# --------------------------
# SYNC endpoint
@@ -574,7 +626,7 @@ def api_mark_notification_read(guid):
@app.route("/sync", methods=["GET", "POST"])
def sync_endpoint():
if not is_authorized():
return jsonify({"error": "Forbidden"}), 403
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
if request.method == "GET":
return handle_sync_get()
@@ -584,7 +636,7 @@ def sync_endpoint():
msg = "[sync endpoint] Method Not Allowed"
write_notification(msg, "alert")
mylog("verbose", [msg])
return jsonify({"error": "Method Not Allowed"}), 405
return jsonify({"success": False, "message": "ERROR: No allowed", "error": "Method Not Allowed"}), 405
# --------------------------
# Background Server Start

View File

@@ -1,7 +1,8 @@
import graphene
from graphene import ObjectType, String, Int, Boolean, List, Field, InputObjectType
from graphene import ObjectType, String, Int, Boolean, List, Field, InputObjectType, Argument
import json
import sys
import os
# Register NetAlertX directories
INSTALL_PATH="/app"
@@ -102,6 +103,23 @@ class SettingResult(ObjectType):
settings = List(Setting)
count = Int()
# --- LANGSTRINGS ---
# In-memory cache for lang strings
_langstrings_cache = {} # caches lists per file (core JSON or plugin)
_langstrings_cache_mtime = {} # tracks last modified times
# LangString ObjectType
class LangString(ObjectType):
langCode = String()
langStringKey = String()
langStringText = String()
class LangStringResult(ObjectType):
langStrings = List(LangString)
count = Int()
# Define Query Type with Pagination Support
class Query(ObjectType):
@@ -258,6 +276,107 @@ class Query(ObjectType):
return SettingResult(settings=settings, count=len(settings))
# --- LANGSTRINGS ---
langStrings = Field(
LangStringResult,
langCode=Argument(String, required=False),
langStringKey=Argument(String, required=False)
)
def resolve_langStrings(self, info, langCode=None, langStringKey=None, fallback_to_en=True):
"""
Collect language strings, optionally filtered by language code and/or string key.
Caches in memory for performance. Can fallback to 'en_us' if a string is missing.
"""
global _langstrings_cache, _langstrings_cache_mtime
langStrings = []
# --- CORE JSON FILES ---
language_folder = '/app/front/php/templates/language/'
if os.path.exists(language_folder):
for filename in os.listdir(language_folder):
if filename.endswith('.json'):
file_lang_code = filename.replace('.json', '')
# Filter by langCode if provided
if langCode and file_lang_code != langCode:
continue
file_path = os.path.join(language_folder, filename)
file_mtime = os.path.getmtime(file_path)
cache_key = f'core_{file_lang_code}'
# Use cached data if available and not modified
if cache_key in _langstrings_cache_mtime and _langstrings_cache_mtime[cache_key] == file_mtime:
lang_list = _langstrings_cache[cache_key]
else:
try:
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
lang_list = [
LangString(
langCode=file_lang_code,
langStringKey=key,
langStringText=value
) for key, value in data.items()
]
_langstrings_cache[cache_key] = lang_list
_langstrings_cache_mtime[cache_key] = file_mtime
except (FileNotFoundError, json.JSONDecodeError) as e:
mylog('none', f'[graphql_schema] Error loading core language strings from {filename}: {e}')
lang_list = []
langStrings.extend(lang_list)
# --- PLUGIN STRINGS ---
plugin_file = folder + 'table_plugins_language_strings.json'
try:
file_mtime = os.path.getmtime(plugin_file)
cache_key = 'plugin'
if cache_key in _langstrings_cache_mtime and _langstrings_cache_mtime[cache_key] == file_mtime:
plugin_list = _langstrings_cache[cache_key]
else:
with open(plugin_file, 'r', encoding='utf-8') as f:
plugin_data = json.load(f).get("data", [])
plugin_list = [
LangString(
langCode=entry.get("Language_Code"),
langStringKey=entry.get("String_Key"),
langStringText=entry.get("String_Value")
) for entry in plugin_data
]
_langstrings_cache[cache_key] = plugin_list
_langstrings_cache_mtime[cache_key] = file_mtime
except (FileNotFoundError, json.JSONDecodeError) as e:
mylog('none', f'[graphql_schema] Error loading plugin language strings from {plugin_file}: {e}')
plugin_list = []
# Filter plugin strings by langCode if provided
if langCode:
plugin_list = [p for p in plugin_list if p.langCode == langCode]
langStrings.extend(plugin_list)
# --- Filter by string key if requested ---
if langStringKey:
langStrings = [ls for ls in langStrings if ls.langStringKey == langStringKey]
# --- Fallback to en_us if enabled and requested lang is missing ---
if fallback_to_en and langCode and langCode != "en_us":
for i, ls in enumerate(langStrings):
if not ls.langStringText: # empty string triggers fallback
# try to get en_us version
en_list = _langstrings_cache.get("core_en_us", [])
en_list += [p for p in _langstrings_cache.get("plugin", []) if p.langCode == "en_us"]
en_fallback = [e for e in en_list if e.langStringKey == ls.langStringKey]
if en_fallback:
langStrings[i] = en_fallback[0]
mylog('trace', f'[graphql_schema] Collected {len(langStrings)} language strings '
f'(langCode={langCode}, key={langStringKey}, fallback_to_en={fallback_to_en})')
return LangStringResult(langStrings=langStrings, count=len(langStrings))
# helps sorting inconsistent dataset mixed integers and strings

View File

@@ -0,0 +1,58 @@
import os
import sys
from flask import jsonify
# Register NetAlertX directories
INSTALL_PATH="/app"
sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
from const import logPath
from logger import mylog, Logger
from helper import get_setting_value
from utils.datetime_utils import timeNowDB
from messaging.in_app import write_notification
# Make sure log level is initialized correctly
Logger(get_setting_value('LOG_LEVEL'))
def clean_log(log_file):
"""
Purge the content of an allowed log file within the /app/log/ directory.
Args:
log_file (str): Name of the log file to purge.
Returns:
flask.Response: JSON response with success and message keys
"""
allowed_files = [
'app.log', 'app_front.log', 'IP_changes.log', 'stdout.log', 'stderr.log',
'app.php_errors.log', 'execution_queue.log', 'db_is_locked.log'
]
# Validate filename if purging allowed
if log_file not in allowed_files:
msg = f"[clean_log] File {log_file} is not allowed to be purged"
mylog('none', [msg])
write_notification(msg, 'interrupt')
return jsonify({"success": False, "message": msg}), 400
log_path = os.path.join(logPath, log_file)
try:
# Purge content
with open(log_path, "w") as f:
f.write("File manually purged\n")
msg = f"[clean_log] File {log_file} purged successfully"
mylog('minimal', [msg])
write_notification(msg, 'interrupt')
return jsonify({"success": True, "message": msg}), 200
except Exception as e:
msg = f"[clean_log] ERROR Failed to purge {log_file}: {e}"
mylog('none', [])
write_notification(msg)
return jsonify({"success": False, "message": msg}), 200

View File

@@ -19,9 +19,9 @@ from logger import mylog
from api import update_api
from scheduler import schedule_class
from plugin import plugin_manager, print_plugin_info
from plugin_utils import get_plugins_configs, get_set_value_for_init
from utils.plugin_utils import get_plugins_configs, get_set_value_for_init
from messaging.in_app import write_notification
from crypto_utils import get_random_bytes
from utils.crypto_utils import get_random_bytes
#===============================================================================
# Initialise user defined values

View File

@@ -1,5 +1,6 @@
import os
import sys
import uuid
# Register NetAlertX directories
INSTALL_PATH="/app"
@@ -8,6 +9,7 @@ sys.path.extend([f"{INSTALL_PATH}/server"])
# Register NetAlertX modules
from const import pluginsPath, logPath, applicationPath, reportTemplatesPath
from logger import mylog
from utils.datetime_utils import timeNowDB
class UserEventsQueueInstance:
"""
@@ -81,5 +83,43 @@ class UserEventsQueueInstance:
return removed
def add_event(self, action):
"""
Append an action to the execution queue log file.
Args:
action (str): Description of the action to queue.
Returns:
tuple: (success: bool, message: str)
success - True if the event was successfully added.
message - Log message describing the result.
"""
timestamp = timeNowDB()
# Generate GUID
guid = str(uuid.uuid4())
if not action or not isinstance(action, str):
msg = "[UserEventsQueueInstance] Invalid or missing action"
mylog('none', [msg])
return False, msg
try:
with open(self.log_file, "a") as f:
f.write(f"[{timestamp}]|{guid}|{action}\n")
msg = f'[UserEventsQueueInstance] Action "{action}" added to the execution queue.'
mylog('minimal', [msg])
return True, msg
except Exception as e:
msg = f"[UserEventsQueueInstance] ERROR Failed to write to {self.log_file}: {e}"
mylog('none', [msg])
return False, msg

View File

@@ -16,11 +16,11 @@ from helper import get_file_content, write_file, get_setting, get_setting_value
from utils.datetime_utils import timeNowTZ, timeNowDB
from app_state import updateState
from api import update_api
from plugin_utils import logEventStatusCounts, get_plugin_string, get_plugin_setting_obj, print_plugin_info, list_to_csv, combine_plugin_objects, resolve_wildcards_arr, handle_empty, custom_plugin_decoder, decode_and_rename_files
from utils.plugin_utils import logEventStatusCounts, get_plugin_string, get_plugin_setting_obj, print_plugin_info, list_to_csv, combine_plugin_objects, resolve_wildcards_arr, handle_empty, custom_plugin_decoder, decode_and_rename_files
from models.notification_instance import NotificationInstance
from messaging.in_app import write_notification
from models.user_events_queue_instance import UserEventsQueueInstance
from crypto_utils import generate_deterministic_guid
from utils.crypto_utils import generate_deterministic_guid
#-------------------------------------------------------------------------------
class plugin_manager:

View File

View File

@@ -6,7 +6,7 @@ from logger import mylog
from const import pluginsPath, logPath, apiPath
from helper import get_file_content, write_file, get_setting, get_setting_value, setting_value_to_python_type
from app_state import updateState
from crypto_utils import decrypt_data, generate_deterministic_guid
from utils.crypto_utils import decrypt_data, generate_deterministic_guid
module_name = 'Plugin utils'

View File

@@ -44,6 +44,8 @@ def test_graphql_post_unauthorized(client):
assert resp.status_code == 401
assert "Unauthorized access attempt" in resp.json.get("error", "")
# --- DEVICES TESTS ---
def test_graphql_post_devices(client, api_token):
"""POST /graphql with a valid token should return device data"""
query = {
@@ -74,6 +76,8 @@ def test_graphql_post_devices(client, api_token):
assert isinstance(data["devices"]["devices"], list)
assert isinstance(data["devices"]["count"], int)
# --- SETTINGS TESTS ---
def test_graphql_post_settings(client, api_token):
"""POST /graphql should return settings data"""
query = {
@@ -91,3 +95,76 @@ def test_graphql_post_settings(client, api_token):
data = resp.json.get("data", {})
assert "settings" in data
assert isinstance(data["settings"]["settings"], list)
# --- LANGSTRINGS TESTS ---
def test_graphql_post_langstrings_specific(client, api_token):
"""Retrieve a specific langString in a given language"""
query = {
"query": """
{
langStrings(langCode: "en_us", langStringKey: "settings_other_scanners") {
langStrings { langCode langStringKey langStringText }
count
}
}
"""
}
resp = client.post("/graphql", json=query, headers=auth_headers(api_token))
assert resp.status_code == 200
data = resp.json.get("data", {}).get("langStrings", {})
assert data["count"] >= 1
for entry in data["langStrings"]:
assert entry["langCode"] == "en_us"
assert entry["langStringKey"] == "settings_other_scanners"
assert isinstance(entry["langStringText"], str)
def test_graphql_post_langstrings_fallback(client, api_token):
"""Fallback to en_us if requested language string is empty"""
query = {
"query": """
{
langStrings(langCode: "de_de", langStringKey: "settings_other_scanners") {
langStrings { langCode langStringKey langStringText }
count
}
}
"""
}
resp = client.post("/graphql", json=query, headers=auth_headers(api_token))
assert resp.status_code == 200
data = resp.json.get("data", {}).get("langStrings", {})
assert data["count"] >= 1
# Ensure fallback occurred if de_de text is empty
for entry in data["langStrings"]:
assert entry["langStringText"] != ""
def test_graphql_post_langstrings_all_languages(client, api_token):
"""Retrieve all languages for a given key"""
query = {
"query": """
{
enStrings: langStrings(langCode: "en_us", langStringKey: "settings_other_scanners") {
langStrings { langCode langStringKey langStringText }
count
}
deStrings: langStrings(langCode: "de_de", langStringKey: "settings_other_scanners") {
langStrings { langCode langStringKey langStringText }
count
}
}
"""
}
resp = client.post("/graphql", json=query, headers=auth_headers(api_token))
assert resp.status_code == 200
data = resp.json.get("data", {})
assert "enStrings" in data
assert "deStrings" in data
# At least one string in each language
assert data["enStrings"]["count"] >= 1
assert data["deStrings"]["count"] >= 1
# Ensure langCode matches
assert all(e["langCode"] == "en_us" for e in data["enStrings"]["langStrings"])
assert all(e["langCode"] == "de_de" for e in data["deStrings"]["langStrings"])

View File

@@ -0,0 +1,61 @@
import sys
import random
import pytest
INSTALL_PATH = "/app"
sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
from helper import get_setting_value
from api_server.api_server_start import app
# ----------------------------
# Fixtures
# ----------------------------
@pytest.fixture(scope="session")
def api_token():
return get_setting_value("API_TOKEN")
@pytest.fixture
def client():
with app.test_client() as client:
yield client
def auth_headers(token):
return {"Authorization": f"Bearer {token}"}
# ----------------------------
# Logs Endpoint Tests
# ----------------------------
def test_clean_log(client, api_token):
resp = client.delete("/logs?file=app.log", headers=auth_headers(api_token))
assert resp.status_code == 200
assert resp.json.get("success") is True
def test_clean_log_not_allowed(client, api_token):
resp = client.delete("/logs?file=not_allowed.log", headers=auth_headers(api_token))
assert resp.status_code == 400
assert resp.json.get("success") is False
# ----------------------------
# Execution Queue Endpoint Tests
# ----------------------------
def test_add_to_execution_queue(client, api_token):
action_name = f"test_action_{random.randint(0,9999)}"
resp = client.post(
"/logs/add-to-execution-queue",
json={"action": action_name},
headers=auth_headers(api_token)
)
assert resp.status_code == 200
assert resp.json.get("success") is True
assert action_name in resp.json.get("message", "")
def test_add_to_execution_queue_missing_action(client, api_token):
resp = client.post(
"/logs/add-to-execution-queue",
json={},
headers=auth_headers(api_token)
)
assert resp.status_code == 400
assert resp.json.get("success") is False
assert "Missing required 'action'" in resp.json.get("error", "")