From fbb4a2f8b4d125be24d37fac4977a5390a004a26 Mon Sep 17 00:00:00 2001 From: jokob-sk Date: Mon, 1 Dec 2025 09:24:44 +1100 Subject: [PATCH] BE: added /auth endpoint Signed-off-by: jokob-sk --- server/api_server/api_server_start.py | 20 ++++++- test/api_endpoints/test_auth_endpoints.py | 66 +++++++++++++++++++++++ 2 files changed, 85 insertions(+), 1 deletion(-) create mode 100644 test/api_endpoints/test_auth_endpoints.py diff --git a/server/api_server/api_server_start.py b/server/api_server/api_server_start.py index 980dcbd0..b44cd3c5 100755 --- a/server/api_server/api_server_start.py +++ b/server/api_server/api_server_start.py @@ -87,7 +87,8 @@ CORS( r"/dbquery/*": {"origins": "*"}, r"/messaging/*": {"origins": "*"}, r"/events/*": {"origins": "*"}, - r"/logs/*": {"origins": "*"} + r"/logs/*": {"origins": "*"}, + r"/auth/*": {"origins": "*"} }, supports_credentials=True, allow_headers=["Authorization", "Content-Type"], @@ -744,6 +745,23 @@ def sync_endpoint(): return jsonify({"success": False, "message": "ERROR: No allowed", "error": "Method Not Allowed"}), 405 +# -------------------------- +# Auth endpoint +# -------------------------- +@app.route("/auth", methods=["GET"]) +def check_auth(): + if not is_authorized(): + return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403 + + elif request.method == "GET": + return jsonify({"success": True, "message": "Authentication check successful"}), 200 + else: + msg = "[sync endpoint] Method Not Allowed" + write_notification(msg, "alert") + mylog("verbose", [msg]) + return jsonify({"success": False, "message": "ERROR: No allowed", "error": "Method Not Allowed"}), 405 + + # -------------------------- # Background Server Start # -------------------------- diff --git a/test/api_endpoints/test_auth_endpoints.py b/test/api_endpoints/test_auth_endpoints.py new file mode 100644 index 00000000..8e14a2b7 --- /dev/null +++ b/test/api_endpoints/test_auth_endpoints.py @@ -0,0 +1,66 @@ +# tests/test_auth.py + +import sys +import os +import pytest + +# Register NetAlertX directories +INSTALL_PATH = os.getenv("NETALERTX_APP", "/app") +sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"]) + +from helper import get_setting_value # noqa: E402 +from api_server.api_server_start import app # noqa: E402 + + +@pytest.fixture(scope="session") +def api_token(): + """Load API token from system settings (same as other tests).""" + return get_setting_value("API_TOKEN") + + +@pytest.fixture +def client(): + """Flask test client.""" + with app.test_client() as client: + yield client + + +def auth_headers(token): + return {"Authorization": f"Bearer {token}"} + + +# ------------------------- +# AUTH ENDPOINT TESTS +# ------------------------- + +def test_auth_ok(client, api_token): + """Valid token should allow access.""" + resp = client.get("/auth", headers=auth_headers(api_token)) + assert resp.status_code == 200 + + data = resp.get_json() + assert data is not None + assert data.get("success") is True + assert "successful" in data.get("message", "").lower() + + +def test_auth_missing_token(client): + """Missing token should be forbidden.""" + resp = client.get("/auth") + assert resp.status_code == 403 + + data = resp.get_json() + assert data is not None + assert data.get("success") is False + assert "not authorized" in data.get("message", "").lower() + + +def test_auth_invalid_token(client): + """Invalid bearer token should be forbidden.""" + resp = client.get("/auth", headers=auth_headers("INVALID-TOKEN")) + assert resp.status_code == 403 + + data = resp.get_json() + assert data is not None + assert data.get("success") is False + assert "not authorized" in data.get("message", "").lower()