mirror of
https://github.com/jokob-sk/NetAlertX.git
synced 2025-12-07 09:36:05 -08:00
api layer v0.2.1 - /events /history
This commit is contained in:
@@ -4,6 +4,8 @@ from flask_cors import CORS
|
||||
from .graphql_endpoint import devicesSchema
|
||||
from .device_endpoint import get_device_data, set_device_data, delete_device, delete_device_events, reset_device_props
|
||||
from .devices_endpoint import delete_unknown_devices, delete_all_with_empty_macs, delete_devices
|
||||
from .events_endpoint import delete_device_events, delete_events, delete_events_30, get_events
|
||||
from .history_endpoint import delete_online_history
|
||||
from .prometheus_endpoint import getMetricStats
|
||||
from .sync_endpoint import handle_sync_post, handle_sync_get
|
||||
import sys
|
||||
@@ -24,7 +26,9 @@ CORS(
|
||||
resources={
|
||||
r"/metrics": {"origins": "*"},
|
||||
r"/device/*": {"origins": "*"},
|
||||
r"/devices/*": {"origins": "*"}
|
||||
r"/devices/*": {"origins": "*"},
|
||||
r"/history/*": {"origins": "*"},
|
||||
r"/events/*": {"origins": "*"}
|
||||
},
|
||||
supports_credentials=True,
|
||||
allow_headers=["Authorization", "Content-Type"]
|
||||
@@ -126,27 +130,46 @@ def api_get_devices_totals():
|
||||
|
||||
|
||||
# --------------------------
|
||||
# Device Events / History
|
||||
# Online history
|
||||
# --------------------------
|
||||
|
||||
@app.route("/history", methods=["DELETE"])
|
||||
def api_delete_online_history():
|
||||
if not is_authorized():
|
||||
return jsonify({"error": "Forbidden"}), 403
|
||||
return delete_online_history()
|
||||
|
||||
# --------------------------
|
||||
# Device Events
|
||||
# --------------------------
|
||||
|
||||
@app.route("/events/<mac>", methods=["DELETE"])
|
||||
def api_delete_device_events(mac):
|
||||
if not is_authorized():
|
||||
return jsonify({"error": "Forbidden"}), 403
|
||||
return delete_device_events(mac)
|
||||
|
||||
@app.route("/events", methods=["DELETE"])
|
||||
def api_delete_events():
|
||||
def api_delete_all_events():
|
||||
if not is_authorized():
|
||||
return jsonify({"error": "Forbidden"}), 403
|
||||
return delete_events()
|
||||
|
||||
@app.route("/events", methods=["GET"])
|
||||
def api_delete_all_events():
|
||||
if not is_authorized():
|
||||
return jsonify({"error": "Forbidden"}), 403
|
||||
|
||||
mac = request.json.get("mac") if request.is_json else None
|
||||
|
||||
return get_events(mac)
|
||||
|
||||
@app.route("/events/30days", methods=["DELETE"])
|
||||
def api_delete_events_30():
|
||||
def api_delete_old_events():
|
||||
if not is_authorized():
|
||||
return jsonify({"error": "Forbidden"}), 403
|
||||
return delete_events_30()
|
||||
|
||||
@app.route("/history/actions", methods=["DELETE"])
|
||||
def api_delete_act_history():
|
||||
if not is_authorized():
|
||||
return jsonify({"error": "Forbidden"}), 403
|
||||
return delete_act_history()
|
||||
|
||||
# --------------------------
|
||||
# CSV Import / Export
|
||||
# --------------------------
|
||||
|
||||
83
server/api_server/events_endpoint.py
Executable file
83
server/api_server/events_endpoint.py
Executable file
@@ -0,0 +1,83 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
import json
|
||||
import subprocess
|
||||
import argparse
|
||||
import os
|
||||
import pathlib
|
||||
import sys
|
||||
from datetime import datetime
|
||||
from flask import jsonify, request
|
||||
|
||||
# Register NetAlertX directories
|
||||
INSTALL_PATH="/app"
|
||||
sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
|
||||
|
||||
from database import get_temp_db_connection
|
||||
from helper import row_to_json, get_date_from_period, is_random_mac, format_date, get_setting_value
|
||||
|
||||
|
||||
# --------------------------
|
||||
# Events Endpoints Functions
|
||||
# --------------------------
|
||||
|
||||
def get_events(mac=None):
|
||||
"""
|
||||
Fetch all events, or events for a specific MAC if provided.
|
||||
Returns JSON list of events.
|
||||
"""
|
||||
conn = get_temp_db_connection()
|
||||
cur = conn.cursor()
|
||||
|
||||
if mac:
|
||||
sql = "SELECT * FROM Events WHERE eve_MAC=? ORDER BY eve_DateTime DESC"
|
||||
cur.execute(sql, (mac,))
|
||||
else:
|
||||
sql = "SELECT * FROM Events ORDER BY eve_DateTime DESC"
|
||||
cur.execute(sql)
|
||||
|
||||
rows = cur.fetchall()
|
||||
events = [row_to_json(list(r.keys()), r) for r in rows]
|
||||
|
||||
conn.close()
|
||||
return jsonify({"success": True, "events": events})
|
||||
|
||||
def delete_events_30():
|
||||
"""Delete all events older than 30 days"""
|
||||
|
||||
conn = get_temp_db_connection()
|
||||
cur = conn.cursor()
|
||||
|
||||
sql = "DELETE FROM Events WHERE eve_DateTime <= date('now', '-30 day')"
|
||||
cur.execute(sql)
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
return jsonify({"success": True, "message": "Deleted events older than 30 days"})
|
||||
|
||||
def delete_events():
|
||||
"""Delete all events"""
|
||||
|
||||
conn = get_temp_db_connection()
|
||||
cur = conn.cursor()
|
||||
|
||||
sql = "DELETE FROM Events"
|
||||
cur.execute(sql)
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
return jsonify({"success": True, "message": "Deleted all events"})
|
||||
|
||||
def delete_device_events(mac):
|
||||
"""Delete all events"""
|
||||
|
||||
conn = get_temp_db_connection()
|
||||
cur = conn.cursor()
|
||||
|
||||
sql = "DELETE FROM Events WHERE eve_MAC= ? "
|
||||
cur.execute(sql, (mac,))
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
return jsonify({"success": True, "message": "Deleted all events for the device"})
|
||||
|
||||
35
server/api_server/history_endpoint.py
Executable file
35
server/api_server/history_endpoint.py
Executable file
@@ -0,0 +1,35 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
import json
|
||||
import subprocess
|
||||
import argparse
|
||||
import os
|
||||
import pathlib
|
||||
import sys
|
||||
from datetime import datetime
|
||||
from flask import jsonify, request
|
||||
|
||||
# Register NetAlertX directories
|
||||
INSTALL_PATH="/app"
|
||||
sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
|
||||
|
||||
from database import get_temp_db_connection
|
||||
from helper import row_to_json, get_date_from_period, is_random_mac, format_date, get_setting_value
|
||||
|
||||
|
||||
# --------------------------------------------------
|
||||
# Online History Activity Endpoints Functions
|
||||
# --------------------------------------------------
|
||||
|
||||
def delete_online_history():
|
||||
"""Delete all online history activity"""
|
||||
|
||||
conn = get_temp_db_connection()
|
||||
cur = conn.cursor()
|
||||
|
||||
sql = "DELETE FROM Online_History"
|
||||
cur.execute(sql)
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
return jsonify({"success": True, "message": "Deleted online history"})
|
||||
104
test/test_events_endpoints.py
Executable file
104
test/test_events_endpoints.py
Executable file
@@ -0,0 +1,104 @@
|
||||
import sys
|
||||
import pathlib
|
||||
import sqlite3
|
||||
import random
|
||||
import string
|
||||
import uuid
|
||||
import pytest
|
||||
|
||||
INSTALL_PATH = "/app"
|
||||
sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
|
||||
|
||||
from helper import timeNowTZ, get_setting_value
|
||||
from api_server.api_server_start import app
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def api_token():
|
||||
return get_setting_value("API_TOKEN")
|
||||
|
||||
@pytest.fixture
|
||||
def client():
|
||||
with app.test_client() as client:
|
||||
yield client
|
||||
|
||||
@pytest.fixture
|
||||
def test_mac():
|
||||
# Generate a unique MAC for each test run
|
||||
return "AA:BB:CC:" + ":".join(f"{random.randint(0,255):02X}" for _ in range(3))
|
||||
|
||||
def auth_headers(token):
|
||||
return {"Authorization": f"Bearer {token}"}
|
||||
|
||||
def create_event(client, api_token, mac, event="UnitTest Event", days_old=None):
|
||||
"""
|
||||
Create event using API (POST /event/<mac>).
|
||||
If days_old is set, adds it to payload for backdating support.
|
||||
"""
|
||||
payload = {
|
||||
"event": event,
|
||||
}
|
||||
if days_old:
|
||||
payload["days_old"] = days_old
|
||||
return client.post(f"/event/{mac}", json=payload, headers=auth_headers(api_token))
|
||||
|
||||
def list_events(client, api_token, mac=None):
|
||||
url = "/events" if mac is None else f"/events/{mac}"
|
||||
return client.get(url, headers=auth_headers(api_token))
|
||||
|
||||
|
||||
def test_delete_events_for_mac(client, api_token, test_mac):
|
||||
# create event
|
||||
resp = create_event(client, api_token, test_mac)
|
||||
assert resp.status_code == 200
|
||||
|
||||
# confirm exists
|
||||
resp = list_events(client, api_token, test_mac)
|
||||
assert resp.status_code == 200
|
||||
assert any(ev["eve_MAC"] == test_mac for ev in resp.json)
|
||||
|
||||
# delete
|
||||
resp = client.delete(f"/events/{test_mac}", headers=auth_headers(api_token))
|
||||
assert resp.status_code == 200
|
||||
assert resp.json.get("success") is True
|
||||
|
||||
# confirm deleted
|
||||
resp = list_events(client, api_token, test_mac)
|
||||
assert resp.status_code == 200
|
||||
assert len(resp.json) == 0
|
||||
|
||||
|
||||
def test_delete_all_events(client, api_token, test_mac):
|
||||
# create two events
|
||||
create_event(client, api_token, test_mac)
|
||||
create_event(client, api_token, "FF:FF:FF:FF:FF:FF")
|
||||
|
||||
resp = list_events(client, api_token)
|
||||
assert len(resp.json) >= 2
|
||||
|
||||
# delete all
|
||||
resp = client.delete("/events", headers=auth_headers(api_token))
|
||||
assert resp.status_code == 200
|
||||
assert resp.json.get("success") is True
|
||||
|
||||
# confirm no events
|
||||
resp = list_events(client, api_token)
|
||||
assert len(resp.json) == 0
|
||||
|
||||
|
||||
def test_delete_events_30days(client, api_token, test_mac):
|
||||
# create old + new events
|
||||
create_event(client, api_token, test_mac, days_old=40) # should be deleted
|
||||
create_event(client, api_token, test_mac, days_old=5) # should remain
|
||||
|
||||
resp = list_events(client, api_token, test_mac)
|
||||
assert len(resp.json) == 2
|
||||
|
||||
# delete events older than 30 days
|
||||
resp = client.delete("/events/30days", headers=auth_headers(api_token))
|
||||
assert resp.status_code == 200
|
||||
assert resp.json.get("success") is True
|
||||
|
||||
# confirm only recent remains
|
||||
resp = list_events(client, api_token, test_mac)
|
||||
mac_events = [ev for ev in resp.json if ev["eve_MAC"] == test_mac]
|
||||
assert len(mac_events) == 1
|
||||
35
test/test_history_endpoints.py
Executable file
35
test/test_history_endpoints.py
Executable file
@@ -0,0 +1,35 @@
|
||||
import sys
|
||||
import pathlib
|
||||
import sqlite3
|
||||
import random
|
||||
import string
|
||||
import uuid
|
||||
import pytest
|
||||
|
||||
INSTALL_PATH = "/app"
|
||||
sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
|
||||
|
||||
from helper import timeNowTZ, get_setting_value
|
||||
from api_server.api_server_start import app
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def api_token():
|
||||
return get_setting_value("API_TOKEN")
|
||||
|
||||
@pytest.fixture
|
||||
def client():
|
||||
with app.test_client() as client:
|
||||
yield client
|
||||
|
||||
@pytest.fixture
|
||||
def test_mac():
|
||||
# Generate a unique MAC for each test run
|
||||
return "AA:BB:CC:" + ":".join(f"{random.randint(0,255):02X}" for _ in range(3))
|
||||
|
||||
def auth_headers(token):
|
||||
return {"Authorization": f"Bearer {token}"}
|
||||
|
||||
def test_delete_history(client, api_token):
|
||||
resp = client.delete(f"/history", headers=auth_headers(api_token))
|
||||
assert resp.status_code == 200
|
||||
assert resp.json.get("success") is True
|
||||
Reference in New Issue
Block a user