mirror of
https://github.com/jokob-sk/NetAlertX.git
synced 2026-04-06 02:01:37 -07:00
Merge pull request #1521 from netalertx/chore_timestamps
feat: add health check endpoint and related schemas with tests
This commit is contained in:
8
.github/skills/code-standards/SKILL.md
vendored
8
.github/skills/code-standards/SKILL.md
vendored
@@ -5,6 +5,14 @@ description: NetAlertX coding standards and conventions. Use this when writing c
|
|||||||
|
|
||||||
# Code Standards
|
# Code Standards
|
||||||
|
|
||||||
|
- ask me to review before going to each next step (mention n step out of x)
|
||||||
|
- before starting, prepare implementation plan
|
||||||
|
- ask me to review it and ask any clarifying questions first
|
||||||
|
- add test creation as last step - follow repo architecture patterns - do not place in the root of /test
|
||||||
|
- code has to be maintainable, no duplicate code
|
||||||
|
- follow DRY principle
|
||||||
|
- code files should be less than 500 LOC for better maintainability
|
||||||
|
|
||||||
## File Length
|
## File Length
|
||||||
|
|
||||||
Keep code files under 500 lines. Split larger files into modules.
|
Keep code files under 500 lines. Split larger files into modules.
|
||||||
|
|||||||
@@ -41,6 +41,7 @@ from .nettools_endpoint import ( # noqa: E402 [flake8 lint suppression]
|
|||||||
from .dbquery_endpoint import read_query, write_query, update_query, delete_query # noqa: E402 [flake8 lint suppression]
|
from .dbquery_endpoint import read_query, write_query, update_query, delete_query # noqa: E402 [flake8 lint suppression]
|
||||||
from .sync_endpoint import handle_sync_post, handle_sync_get # noqa: E402 [flake8 lint suppression]
|
from .sync_endpoint import handle_sync_post, handle_sync_get # noqa: E402 [flake8 lint suppression]
|
||||||
from .logs_endpoint import clean_log # noqa: E402 [flake8 lint suppression]
|
from .logs_endpoint import clean_log # noqa: E402 [flake8 lint suppression]
|
||||||
|
from .health_endpoint import get_health_status # noqa: E402 [flake8 lint suppression]
|
||||||
from models.user_events_queue_instance import UserEventsQueueInstance # noqa: E402 [flake8 lint suppression]
|
from models.user_events_queue_instance import UserEventsQueueInstance # noqa: E402 [flake8 lint suppression]
|
||||||
|
|
||||||
from models.event_instance import EventInstance # noqa: E402 [flake8 lint suppression]
|
from models.event_instance import EventInstance # noqa: E402 [flake8 lint suppression]
|
||||||
@@ -86,6 +87,7 @@ from .openapi.schemas import ( # noqa: E402 [flake8 lint suppression]
|
|||||||
RecentEventsResponse, LastEventsResponse,
|
RecentEventsResponse, LastEventsResponse,
|
||||||
NetworkTopologyResponse,
|
NetworkTopologyResponse,
|
||||||
InternetInfoResponse, NetworkInterfacesResponse,
|
InternetInfoResponse, NetworkInterfacesResponse,
|
||||||
|
HealthCheckResponse,
|
||||||
CreateEventRequest, CreateSessionRequest,
|
CreateEventRequest, CreateSessionRequest,
|
||||||
DeleteSessionRequest, CreateNotificationRequest,
|
DeleteSessionRequest, CreateNotificationRequest,
|
||||||
SyncPushRequest, SyncPullResponse,
|
SyncPushRequest, SyncPullResponse,
|
||||||
@@ -1930,6 +1932,33 @@ def check_auth(payload=None):
|
|||||||
if request.method == "GET":
|
if request.method == "GET":
|
||||||
return jsonify({"success": True, "message": "Authentication check successful"}), 200
|
return jsonify({"success": True, "message": "Authentication check successful"}), 200
|
||||||
|
|
||||||
|
|
||||||
|
# --------------------------
|
||||||
|
# Health endpoint
|
||||||
|
# --------------------------
|
||||||
|
@app.route("/health", methods=["GET"])
|
||||||
|
@validate_request(
|
||||||
|
operation_id="check_health",
|
||||||
|
summary="System Health Check",
|
||||||
|
description="Retrieve system vitality metrics including database size, memory pressure, system load, disk usage, and CPU temperature.",
|
||||||
|
response_model=HealthCheckResponse,
|
||||||
|
tags=["system", "health"],
|
||||||
|
auth_callable=is_authorized
|
||||||
|
)
|
||||||
|
def check_health(payload=None):
|
||||||
|
"""Get system health metrics for monitoring and diagnostics."""
|
||||||
|
try:
|
||||||
|
health_data = get_health_status()
|
||||||
|
return jsonify({"success": True, **health_data}), 200
|
||||||
|
except Exception as e:
|
||||||
|
mylog("none", [f"[health] Error retrieving health status: {e}"])
|
||||||
|
return jsonify({
|
||||||
|
"success": False,
|
||||||
|
"error": "Failed to retrieve health status",
|
||||||
|
"message": "Internal server error"
|
||||||
|
}), 500
|
||||||
|
|
||||||
|
|
||||||
# --------------------------
|
# --------------------------
|
||||||
# Background Server Start
|
# Background Server Start
|
||||||
# --------------------------
|
# --------------------------
|
||||||
|
|||||||
137
server/api_server/health_endpoint.py
Normal file
137
server/api_server/health_endpoint.py
Normal file
@@ -0,0 +1,137 @@
|
|||||||
|
"""Health check endpoint for NetAlertX system vitality monitoring."""
|
||||||
|
|
||||||
|
import os
|
||||||
|
import psutil
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
from const import dbPath, dataPath
|
||||||
|
from logger import mylog
|
||||||
|
|
||||||
|
|
||||||
|
# ===============================================================================
|
||||||
|
# Database Vitality
|
||||||
|
# ===============================================================================
|
||||||
|
|
||||||
|
def get_db_size_mb():
|
||||||
|
"""
|
||||||
|
Calculate total database size in MB (app.db + app.db-wal).
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
float: Size in MB, or 0 if database files don't exist.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
db_file = Path(dbPath)
|
||||||
|
wal_file = Path(f"{dbPath}-wal")
|
||||||
|
|
||||||
|
size_bytes = 0
|
||||||
|
if db_file.exists():
|
||||||
|
size_bytes += db_file.stat().st_size
|
||||||
|
if wal_file.exists():
|
||||||
|
size_bytes += wal_file.stat().st_size
|
||||||
|
|
||||||
|
return round(size_bytes / (1024 * 1024), 2)
|
||||||
|
except Exception as e:
|
||||||
|
mylog("verbose", [f"[health] Error calculating DB size: {e}"])
|
||||||
|
return 0.0
|
||||||
|
|
||||||
|
|
||||||
|
# ===============================================================================
|
||||||
|
# Memory Pressure
|
||||||
|
# ===============================================================================
|
||||||
|
|
||||||
|
def get_mem_usage_pct():
|
||||||
|
"""
|
||||||
|
Calculate memory usage percentage (used / total * 100).
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
int: Memory usage as integer percentage (0-100), or None on error.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
vm = psutil.virtual_memory()
|
||||||
|
pct = int((vm.used / vm.total) * 100)
|
||||||
|
return max(0, min(100, pct)) # Clamp to 0-100
|
||||||
|
except Exception as e:
|
||||||
|
mylog("verbose", [f"[health] Error calculating memory usage: {e}"])
|
||||||
|
return None
|
||||||
|
|
||||||
|
def get_load_avg_1m():
|
||||||
|
"""
|
||||||
|
Get 1-minute load average.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
float: 1-minute load average, or -1 on error.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
load_1m, _, _ = os.getloadavg()
|
||||||
|
return round(load_1m, 2)
|
||||||
|
except Exception as e:
|
||||||
|
mylog("verbose", [f"[health] Error getting load average: {e}"])
|
||||||
|
return -1.0
|
||||||
|
|
||||||
|
|
||||||
|
# ===============================================================================
|
||||||
|
# Disk Headroom
|
||||||
|
# ===============================================================================
|
||||||
|
|
||||||
|
def get_storage_pct():
|
||||||
|
"""
|
||||||
|
Calculate disk usage percentage of /data mount.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
int: Disk usage as integer percentage (0-100), or None on error.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
stat = os.statvfs(dataPath)
|
||||||
|
total = stat.f_blocks * stat.f_frsize
|
||||||
|
used = (stat.f_blocks - stat.f_bfree) * stat.f_frsize
|
||||||
|
pct = int((used / total) * 100) if total > 0 else 0
|
||||||
|
return max(0, min(100, pct)) # Clamp to 0-100
|
||||||
|
except Exception as e:
|
||||||
|
mylog("verbose", [f"[health] Error calculating storage usage: {e}"])
|
||||||
|
return None
|
||||||
|
|
||||||
|
def get_cpu_temp():
|
||||||
|
"""
|
||||||
|
Get CPU temperature from hardware sensors if available.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
int: CPU temperature in Celsius, or None if unavailable.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
temps = psutil.sensors_temperatures()
|
||||||
|
if not temps:
|
||||||
|
return None
|
||||||
|
|
||||||
|
# Prefer 'coretemp' (Intel), fallback to first available
|
||||||
|
if "coretemp" in temps and temps["coretemp"]:
|
||||||
|
return int(temps["coretemp"][0].current)
|
||||||
|
|
||||||
|
# Fallback to first sensor with data
|
||||||
|
for sensor_type, readings in temps.items():
|
||||||
|
if readings:
|
||||||
|
return int(readings[0].current)
|
||||||
|
|
||||||
|
return None
|
||||||
|
except Exception as e:
|
||||||
|
mylog("verbose", [f"[health] Error reading CPU temperature: {e}"])
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
# ===============================================================================
|
||||||
|
# Aggregator
|
||||||
|
# ===============================================================================
|
||||||
|
|
||||||
|
def get_health_status():
|
||||||
|
"""
|
||||||
|
Collect all health metrics into a single dict.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
dict: Dictionary with all health metrics.
|
||||||
|
"""
|
||||||
|
return {
|
||||||
|
"db_size_mb": get_db_size_mb(),
|
||||||
|
"mem_usage_pct": get_mem_usage_pct(),
|
||||||
|
"load_1m": get_load_avg_1m(),
|
||||||
|
"storage_pct": get_storage_pct(),
|
||||||
|
"cpu_temp": get_cpu_temp(),
|
||||||
|
}
|
||||||
@@ -651,6 +651,34 @@ class NetworkInterfacesResponse(BaseResponse):
|
|||||||
interfaces: Dict[str, Any] = Field(..., description="Details about network interfaces.")
|
interfaces: Dict[str, Any] = Field(..., description="Details about network interfaces.")
|
||||||
|
|
||||||
|
|
||||||
|
# =============================================================================
|
||||||
|
# HEALTH CHECK SCHEMAS
|
||||||
|
# =============================================================================
|
||||||
|
|
||||||
|
|
||||||
|
class HealthCheckResponse(BaseResponse):
|
||||||
|
"""System health check with vitality metrics."""
|
||||||
|
model_config = ConfigDict(
|
||||||
|
extra="allow",
|
||||||
|
json_schema_extra={
|
||||||
|
"examples": [{
|
||||||
|
"success": True,
|
||||||
|
"db_size_mb": 125.45,
|
||||||
|
"mem_usage_pct": 65,
|
||||||
|
"load_1m": 2.15,
|
||||||
|
"storage_pct": 42,
|
||||||
|
"cpu_temp": 58
|
||||||
|
}]
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
db_size_mb: float = Field(..., description="Database size in MB (app.db + app.db-wal)")
|
||||||
|
mem_usage_pct: Optional[int] = Field(None, ge=0, le=100, description="Memory usage percentage (0-100, nullable if unavailable)")
|
||||||
|
load_1m: float = Field(..., description="1-minute load average")
|
||||||
|
storage_pct: Optional[int] = Field(None, ge=0, le=100, description="Disk usage percentage of /data mount (0-100, nullable if unavailable)")
|
||||||
|
cpu_temp: Optional[int] = Field(None, description="CPU temperature in Celsius (nullable if unavailable)")
|
||||||
|
|
||||||
|
|
||||||
# =============================================================================
|
# =============================================================================
|
||||||
# EVENTS SCHEMAS
|
# EVENTS SCHEMAS
|
||||||
# =============================================================================
|
# =============================================================================
|
||||||
|
|||||||
237
test/api_endpoints/test_health_endpoints.py
Normal file
237
test/api_endpoints/test_health_endpoints.py
Normal file
@@ -0,0 +1,237 @@
|
|||||||
|
"""Tests for health check endpoint."""
|
||||||
|
|
||||||
|
import sys
|
||||||
|
import os
|
||||||
|
import pytest
|
||||||
|
from unittest.mock import patch
|
||||||
|
|
||||||
|
INSTALL_PATH = os.getenv("NETALERTX_APP", "/app")
|
||||||
|
sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
|
||||||
|
|
||||||
|
from helper import get_setting_value # noqa: E402
|
||||||
|
from api_server.api_server_start import app # noqa: E402
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(scope="session")
|
||||||
|
def api_token():
|
||||||
|
"""Load API token from system settings."""
|
||||||
|
return get_setting_value("API_TOKEN")
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def client():
|
||||||
|
"""Flask test client."""
|
||||||
|
with app.test_client() as client:
|
||||||
|
yield client
|
||||||
|
|
||||||
|
|
||||||
|
def auth_headers(token):
|
||||||
|
"""Helper to construct Authorization header."""
|
||||||
|
return {"Authorization": f"Bearer {token}"}
|
||||||
|
|
||||||
|
|
||||||
|
# ========================================================================
|
||||||
|
# AUTHENTICATION TESTS
|
||||||
|
# ========================================================================
|
||||||
|
|
||||||
|
def test_health_unauthorized(client):
|
||||||
|
"""Missing token should be forbidden."""
|
||||||
|
resp = client.get("/health")
|
||||||
|
assert resp.status_code == 403
|
||||||
|
|
||||||
|
data = resp.get_json()
|
||||||
|
assert data is not None
|
||||||
|
assert data.get("success") is False
|
||||||
|
|
||||||
|
|
||||||
|
def test_health_invalid_token(client):
|
||||||
|
"""Invalid bearer token should be forbidden."""
|
||||||
|
resp = client.get("/health", headers=auth_headers("INVALID-TOKEN"))
|
||||||
|
assert resp.status_code == 403
|
||||||
|
|
||||||
|
data = resp.get_json()
|
||||||
|
assert data is not None
|
||||||
|
assert data.get("success") is False
|
||||||
|
|
||||||
|
|
||||||
|
def test_health_valid_token(client, api_token):
|
||||||
|
"""Valid token should allow access."""
|
||||||
|
resp = client.get("/health", headers=auth_headers(api_token))
|
||||||
|
assert resp.status_code == 200
|
||||||
|
|
||||||
|
data = resp.get_json()
|
||||||
|
assert data is not None
|
||||||
|
assert data.get("success") is True
|
||||||
|
|
||||||
|
|
||||||
|
# ========================================================================
|
||||||
|
# RESPONSE STRUCTURE TESTS
|
||||||
|
# ========================================================================
|
||||||
|
|
||||||
|
def test_health_response_structure(client, api_token):
|
||||||
|
"""Response should contain all required health metrics."""
|
||||||
|
resp = client.get("/health", headers=auth_headers(api_token))
|
||||||
|
assert resp.status_code == 200
|
||||||
|
|
||||||
|
data = resp.get_json()
|
||||||
|
assert data.get("success") is True
|
||||||
|
|
||||||
|
# Check all required fields are present
|
||||||
|
assert "db_size_mb" in data
|
||||||
|
assert "mem_usage_pct" in data
|
||||||
|
assert "load_1m" in data
|
||||||
|
assert "storage_pct" in data
|
||||||
|
assert "cpu_temp" in data
|
||||||
|
|
||||||
|
|
||||||
|
def test_health_db_size_type(client, api_token):
|
||||||
|
"""db_size_mb should be a float."""
|
||||||
|
resp = client.get("/health", headers=auth_headers(api_token))
|
||||||
|
data = resp.get_json()
|
||||||
|
|
||||||
|
assert isinstance(data["db_size_mb"], (int, float))
|
||||||
|
assert data["db_size_mb"] >= 0
|
||||||
|
|
||||||
|
|
||||||
|
def test_health_mem_usage_type(client, api_token):
|
||||||
|
"""mem_usage_pct should be an integer in range [0, 100]."""
|
||||||
|
resp = client.get("/health", headers=auth_headers(api_token))
|
||||||
|
data = resp.get_json()
|
||||||
|
|
||||||
|
mem = data["mem_usage_pct"]
|
||||||
|
assert isinstance(mem, int)
|
||||||
|
assert 0 <= mem <= 100 or mem == -1 # -1 on error
|
||||||
|
|
||||||
|
|
||||||
|
def test_health_load_avg_type(client, api_token):
|
||||||
|
"""load_1m should be a float."""
|
||||||
|
resp = client.get("/health", headers=auth_headers(api_token))
|
||||||
|
data = resp.get_json()
|
||||||
|
|
||||||
|
load = data["load_1m"]
|
||||||
|
assert isinstance(load, (int, float))
|
||||||
|
assert load >= -1 # -1 on error
|
||||||
|
|
||||||
|
|
||||||
|
def test_health_storage_pct_type(client, api_token):
|
||||||
|
"""storage_pct should be an integer in range [0, 100]."""
|
||||||
|
resp = client.get("/health", headers=auth_headers(api_token))
|
||||||
|
data = resp.get_json()
|
||||||
|
|
||||||
|
storage = data["storage_pct"]
|
||||||
|
assert isinstance(storage, int)
|
||||||
|
assert 0 <= storage <= 100 or storage == -1 # -1 on error
|
||||||
|
|
||||||
|
|
||||||
|
def test_health_cpu_temp_optional(client, api_token):
|
||||||
|
"""cpu_temp should be optional (int or null)."""
|
||||||
|
resp = client.get("/health", headers=auth_headers(api_token))
|
||||||
|
data = resp.get_json()
|
||||||
|
|
||||||
|
cpu_temp = data["cpu_temp"]
|
||||||
|
assert cpu_temp is None or isinstance(cpu_temp, int)
|
||||||
|
if isinstance(cpu_temp, int):
|
||||||
|
assert cpu_temp > -100 # Reasonable temperature bounds
|
||||||
|
|
||||||
|
|
||||||
|
# ========================================================================
|
||||||
|
# METRIC CALCULATION TESTS
|
||||||
|
# ========================================================================
|
||||||
|
|
||||||
|
def test_health_db_size_realistic(client, api_token):
|
||||||
|
"""Database size should be reasonable (>0 MB in active system)."""
|
||||||
|
resp = client.get("/health", headers=auth_headers(api_token))
|
||||||
|
data = resp.get_json()
|
||||||
|
|
||||||
|
# In a real system with data, DB should be > 1 MB
|
||||||
|
# Allow 0 for minimal installations without data
|
||||||
|
assert data["db_size_mb"] >= 0
|
||||||
|
# Sanity check: file shouldn't exceed 5GB
|
||||||
|
assert data["db_size_mb"] < 5000
|
||||||
|
|
||||||
|
|
||||||
|
def test_health_mem_usage_reasonable(client, api_token):
|
||||||
|
"""Memory usage should be reasonable for normal operation."""
|
||||||
|
resp = client.get("/health", headers=auth_headers(api_token))
|
||||||
|
data = resp.get_json()
|
||||||
|
|
||||||
|
# Sanity check: should be between 0% and 100%
|
||||||
|
if data["mem_usage_pct"] != -1:
|
||||||
|
assert 0 <= data["mem_usage_pct"] <= 100
|
||||||
|
|
||||||
|
|
||||||
|
def test_health_storage_pct_reasonable(client, api_token):
|
||||||
|
"""Storage percentage should be reasonable."""
|
||||||
|
resp = client.get("/health", headers=auth_headers(api_token))
|
||||||
|
data = resp.get_json()
|
||||||
|
|
||||||
|
# Sanity check: should be between 0% and 100%
|
||||||
|
if data["storage_pct"] != -1:
|
||||||
|
assert 0 <= data["storage_pct"] <= 100
|
||||||
|
|
||||||
|
|
||||||
|
# ========================================================================
|
||||||
|
# ERROR HANDLING TESTS
|
||||||
|
# ========================================================================
|
||||||
|
|
||||||
|
@patch('api_server.api_server_start.get_health_status')
|
||||||
|
def test_health_exception_handling(mock_health, client, api_token):
|
||||||
|
"""Health endpoint should handle exceptions gracefully."""
|
||||||
|
mock_health.side_effect = Exception("Test error")
|
||||||
|
|
||||||
|
resp = client.get("/health", headers=auth_headers(api_token))
|
||||||
|
assert resp.status_code == 500
|
||||||
|
|
||||||
|
data = resp.get_json()
|
||||||
|
assert data.get("success") is False
|
||||||
|
assert "error" in data
|
||||||
|
|
||||||
|
|
||||||
|
# ========================================================================
|
||||||
|
# METRIC INDEPENDENCE TESTS
|
||||||
|
# ========================================================================
|
||||||
|
|
||||||
|
def test_health_multiple_calls_consistency(client, api_token):
|
||||||
|
"""Multiple calls should return consistent structure."""
|
||||||
|
for _ in range(3):
|
||||||
|
resp = client.get("/health", headers=auth_headers(api_token))
|
||||||
|
assert resp.status_code == 200
|
||||||
|
|
||||||
|
data = resp.get_json()
|
||||||
|
assert data.get("success") is True
|
||||||
|
assert "db_size_mb" in data
|
||||||
|
assert "mem_usage_pct" in data
|
||||||
|
assert "load_1m" in data
|
||||||
|
assert "storage_pct" in data
|
||||||
|
assert "cpu_temp" in data
|
||||||
|
|
||||||
|
|
||||||
|
# ========================================================================
|
||||||
|
# HTTP METHOD TESTS
|
||||||
|
# ========================================================================
|
||||||
|
|
||||||
|
def test_health_post_not_allowed(client, api_token):
|
||||||
|
"""POST to /health should not be allowed."""
|
||||||
|
resp = client.post("/health", headers=auth_headers(api_token))
|
||||||
|
# Either 405 Method Not Allowed or 404 Not Found is acceptable
|
||||||
|
assert resp.status_code in (404, 405)
|
||||||
|
|
||||||
|
|
||||||
|
def test_health_delete_not_allowed(client, api_token):
|
||||||
|
"""DELETE to /health should not be allowed."""
|
||||||
|
resp = client.delete("/health", headers=auth_headers(api_token))
|
||||||
|
# Either 405 Method Not Allowed or 404 Not Found is acceptable
|
||||||
|
assert resp.status_code in (404, 405)
|
||||||
|
|
||||||
|
|
||||||
|
# ========================================================================
|
||||||
|
# QUERY TOKEN AUTHENTICATION TEST
|
||||||
|
# ========================================================================
|
||||||
|
|
||||||
|
def test_health_query_token_auth(client, api_token):
|
||||||
|
"""Query token should also work for authentication."""
|
||||||
|
resp = client.get(f"/health?token={api_token}")
|
||||||
|
assert resp.status_code == 200
|
||||||
|
|
||||||
|
data = resp.get_json()
|
||||||
|
assert data.get("success") is True
|
||||||
Reference in New Issue
Block a user