mirror of
https://github.com/jokob-sk/NetAlertX.git
synced 2025-12-07 09:36:05 -08:00
api layer v0.3 - /events /sessions work
This commit is contained in:
@@ -4,10 +4,10 @@ from flask_cors import CORS
|
||||
from .graphql_endpoint import devicesSchema
|
||||
from .device_endpoint import get_device_data, set_device_data, delete_device, delete_device_events, reset_device_props, copy_device, update_device_column
|
||||
from .devices_endpoint import get_all_devices, delete_unknown_devices, delete_all_with_empty_macs, delete_devices, export_devices, import_csv, devices_totals, devices_by_status
|
||||
from .events_endpoint import delete_events, delete_events_older_than, get_events, create_event
|
||||
from .events_endpoint import delete_events, delete_events_older_than, get_events, create_event, get_events_totals
|
||||
from .history_endpoint import delete_online_history
|
||||
from .prometheus_endpoint import get_metric_stats
|
||||
from .sessions_endpoint import get_sessions, delete_session, create_session, get_sessions_calendar
|
||||
from .sessions_endpoint import get_sessions, delete_session, create_session, get_sessions_calendar, get_device_sessions, get_session_events
|
||||
from .nettools_endpoint import wakeonlan, traceroute, speedtest, nslookup, nmap_scan, internet_info
|
||||
from .sync_endpoint import handle_sync_post, handle_sync_get
|
||||
import sys
|
||||
@@ -18,6 +18,7 @@ sys.path.extend([f"{INSTALL_PATH}/server"])
|
||||
|
||||
from logger import mylog
|
||||
from helper import get_setting_value, timeNowTZ
|
||||
from db.db_helper import get_date_from_period
|
||||
from app_state import updateState
|
||||
from messaging.in_app import write_notification
|
||||
|
||||
@@ -326,6 +327,14 @@ def api_delete_old_events(days: int):
|
||||
|
||||
return delete_events_older_than(days)
|
||||
|
||||
@app.route("/sessions/totals", methods=["GET"])
|
||||
def api_get_events_totals():
|
||||
if not is_authorized():
|
||||
return jsonify({"error": "Forbidden"}), 403
|
||||
|
||||
period = get_date_from_period(request.args.get("period", "7 days"))
|
||||
return get_events_totals(period)
|
||||
|
||||
# --------------------------
|
||||
# Sessions
|
||||
# --------------------------
|
||||
@@ -383,6 +392,22 @@ def api_get_sessions_calendar():
|
||||
|
||||
return get_sessions_calendar(start_date, end_date)
|
||||
|
||||
@app.route("/sessions/<mac>", methods=["GET"])
|
||||
def api_device_sessions(mac):
|
||||
if not is_authorized():
|
||||
return jsonify({"error": "Forbidden"}), 403
|
||||
|
||||
period = request.args.get("period", "1 day")
|
||||
return get_device_sessions(mac, period)
|
||||
|
||||
@app.route("/sessions/session-events", methods=["GET"])
|
||||
def api_get_session_events():
|
||||
if not is_authorized():
|
||||
return jsonify({"error": "Forbidden"}), 403
|
||||
|
||||
session_event_type = request.args.get("type", "all")
|
||||
period = get_date_from_period(request.args.get("period", "7 days"))
|
||||
return get_session_events(session_event_type, period)
|
||||
|
||||
# --------------------------
|
||||
# Prometheus metrics endpoint
|
||||
|
||||
@@ -15,7 +15,7 @@ sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
|
||||
|
||||
from database import get_temp_db_connection
|
||||
from helper import is_random_mac, format_date, get_setting_value, format_date_iso, format_event_date, timeNowTZ, mylog, ensure_datetime
|
||||
from db.db_helper import row_to_json
|
||||
from db.db_helper import row_to_json, get_date_from_period
|
||||
|
||||
|
||||
# --------------------------
|
||||
@@ -108,3 +108,39 @@ def delete_events():
|
||||
|
||||
|
||||
|
||||
def get_events_totals(period: str = "7 days"):
|
||||
"""
|
||||
Return counts for events and sessions totals over a given period.
|
||||
period: "7 days", "1 month", "1 year", "100 years"
|
||||
"""
|
||||
# Convert period to SQLite date expression
|
||||
period_date_sql = get_date_from_period(period)
|
||||
|
||||
conn = get_temp_db_connection()
|
||||
cur = conn.cursor()
|
||||
|
||||
sql = f"""
|
||||
SELECT
|
||||
(SELECT COUNT(*) FROM Events WHERE eve_DateTime >= {period_date_sql}) AS all_events,
|
||||
(SELECT COUNT(*) FROM Sessions WHERE
|
||||
ses_DateTimeConnection >= {period_date_sql}
|
||||
OR ses_DateTimeDisconnection >= {period_date_sql}
|
||||
OR ses_StillConnected = 1
|
||||
) AS sessions,
|
||||
(SELECT COUNT(*) FROM Sessions WHERE
|
||||
(ses_DateTimeConnection IS NULL AND ses_DateTimeDisconnection >= {period_date_sql})
|
||||
OR (ses_DateTimeDisconnection IS NULL AND ses_StillConnected = 0 AND ses_DateTimeConnection >= {period_date_sql})
|
||||
) AS missing,
|
||||
(SELECT COUNT(*) FROM Events WHERE eve_DateTime >= {period_date_sql} AND eve_EventType LIKE 'VOIDED%') AS voided,
|
||||
(SELECT COUNT(*) FROM Events WHERE eve_DateTime >= {period_date_sql} AND eve_EventType LIKE 'New Device') AS new,
|
||||
(SELECT COUNT(*) FROM Events WHERE eve_DateTime >= {period_date_sql} AND eve_EventType LIKE 'Device Down') AS down
|
||||
"""
|
||||
|
||||
cur.execute(sql)
|
||||
row = cur.fetchone()
|
||||
conn.close()
|
||||
|
||||
# Return as JSON array
|
||||
result_json = [row[0], row[1], row[2], row[3], row[4], row[5]]
|
||||
return jsonify(result_json)
|
||||
|
||||
|
||||
@@ -5,6 +5,8 @@ import subprocess
|
||||
import argparse
|
||||
import os
|
||||
import pathlib
|
||||
import sqlite3
|
||||
import time
|
||||
import sys
|
||||
from datetime import datetime
|
||||
from flask import jsonify, request
|
||||
@@ -14,8 +16,8 @@ INSTALL_PATH="/app"
|
||||
sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
|
||||
|
||||
from database import get_temp_db_connection
|
||||
from helper import is_random_mac, format_date, get_setting_value, format_date_iso, format_event_date, mylog, timeNowTZ
|
||||
from db.db_helper import row_to_json
|
||||
from helper import is_random_mac, format_date, get_setting_value, format_date_iso, format_event_date, mylog, timeNowTZ, format_date_diff, format_ip_long, parse_datetime
|
||||
from db.db_helper import row_to_json, get_date_from_period
|
||||
|
||||
|
||||
# --------------------------
|
||||
@@ -169,4 +171,213 @@ def get_sessions_calendar(start_date, end_date):
|
||||
})
|
||||
|
||||
conn.close()
|
||||
return jsonify({"success": True, "sessions": table_data})
|
||||
return jsonify({"success": True, "sessions": table_data})
|
||||
|
||||
|
||||
|
||||
def get_device_sessions(mac, period):
|
||||
"""
|
||||
Fetch device sessions for a given MAC address and period.
|
||||
"""
|
||||
period_date = get_date_from_period(period)
|
||||
|
||||
conn = get_temp_db_connection()
|
||||
cur = conn.cursor()
|
||||
|
||||
sql = f"""
|
||||
SELECT
|
||||
IFNULL(ses_DateTimeConnection, ses_DateTimeDisconnection) AS ses_DateTimeOrder,
|
||||
ses_EventTypeConnection,
|
||||
ses_DateTimeConnection,
|
||||
ses_EventTypeDisconnection,
|
||||
ses_DateTimeDisconnection,
|
||||
ses_StillConnected,
|
||||
ses_IP,
|
||||
ses_AdditionalInfo
|
||||
FROM Sessions
|
||||
WHERE ses_MAC = ?
|
||||
AND (
|
||||
ses_DateTimeConnection >= {period_date}
|
||||
OR ses_DateTimeDisconnection >= {period_date}
|
||||
OR ses_StillConnected = 1
|
||||
)
|
||||
"""
|
||||
|
||||
|
||||
cur.execute(sql, (mac,))
|
||||
rows = cur.fetchall()
|
||||
conn.close()
|
||||
|
||||
table_data = {"data": []}
|
||||
|
||||
for row in rows:
|
||||
# Connection DateTime
|
||||
if row["ses_EventTypeConnection"] == "<missing event>":
|
||||
ini = row["ses_EventTypeConnection"]
|
||||
else:
|
||||
ini = format_date(row["ses_DateTimeConnection"])
|
||||
|
||||
# Disconnection DateTime
|
||||
if row["ses_StillConnected"]:
|
||||
end = "..."
|
||||
elif row["ses_EventTypeDisconnection"] == "<missing event>":
|
||||
end = row["ses_EventTypeDisconnection"]
|
||||
else:
|
||||
end = format_date(row["ses_DateTimeDisconnection"])
|
||||
|
||||
# Duration
|
||||
if row["ses_EventTypeConnection"] in ("<missing event>", None) or row["ses_EventTypeDisconnection"] in ("<missing event>", None):
|
||||
dur = "..."
|
||||
elif row["ses_StillConnected"]:
|
||||
dur = format_date_diff(row["ses_DateTimeConnection"], None)["text"]
|
||||
else:
|
||||
dur = format_date_diff(row["ses_DateTimeConnection"], row["ses_DateTimeDisconnection"])["text"]
|
||||
|
||||
# Additional Info
|
||||
info = row["ses_AdditionalInfo"]
|
||||
if row["ses_EventTypeConnection"] == "New Device":
|
||||
info = f"{row['ses_EventTypeConnection']}: {info}"
|
||||
|
||||
# Push row data
|
||||
table_data["data"].append({
|
||||
"ses_MAC": mac,
|
||||
"ses_DateTimeOrder": row["ses_DateTimeOrder"],
|
||||
"ses_Connection": ini,
|
||||
"ses_Disconnection": end,
|
||||
"ses_Duration": dur,
|
||||
"ses_IP": row["ses_IP"],
|
||||
"ses_Info": info,
|
||||
})
|
||||
|
||||
# Control no rows
|
||||
if not table_data["data"]:
|
||||
table_data["data"] = []
|
||||
|
||||
sessions = table_data["data"]
|
||||
|
||||
return jsonify({
|
||||
"success": True,
|
||||
"sessions": sessions
|
||||
})
|
||||
|
||||
|
||||
def get_session_events(event_type, period_date):
|
||||
"""
|
||||
Fetch events or sessions based on type and period.
|
||||
"""
|
||||
conn = get_temp_db_connection()
|
||||
conn.row_factory = sqlite3.Row
|
||||
cur = conn.cursor()
|
||||
|
||||
# Base SQLs
|
||||
sql_events = f"""
|
||||
SELECT
|
||||
eve_DateTime AS eve_DateTimeOrder,
|
||||
devName,
|
||||
devOwner,
|
||||
eve_DateTime,
|
||||
eve_EventType,
|
||||
NULL,
|
||||
NULL,
|
||||
NULL,
|
||||
NULL,
|
||||
eve_IP,
|
||||
NULL,
|
||||
eve_AdditionalInfo,
|
||||
NULL,
|
||||
devMac,
|
||||
eve_PendingAlertEmail
|
||||
FROM Events_Devices
|
||||
WHERE eve_DateTime >= {period_date}
|
||||
"""
|
||||
|
||||
sql_sessions = f"""
|
||||
SELECT
|
||||
IFNULL(ses_DateTimeConnection, ses_DateTimeDisconnection) AS ses_DateTimeOrder,
|
||||
devName,
|
||||
devOwner,
|
||||
NULL,
|
||||
NULL,
|
||||
ses_DateTimeConnection,
|
||||
ses_DateTimeDisconnection,
|
||||
NULL,
|
||||
NULL,
|
||||
ses_IP,
|
||||
NULL,
|
||||
ses_AdditionalInfo,
|
||||
ses_StillConnected,
|
||||
devMac
|
||||
FROM Sessions_Devices
|
||||
"""
|
||||
|
||||
# Build SQL based on type
|
||||
if event_type == "all":
|
||||
sql = sql_events
|
||||
elif event_type == "sessions":
|
||||
sql = sql_sessions + f"""
|
||||
WHERE (
|
||||
ses_DateTimeConnection >= {period_date}
|
||||
OR ses_DateTimeDisconnection >= {period_date}
|
||||
OR ses_StillConnected = 1
|
||||
)
|
||||
"""
|
||||
elif event_type == "missing":
|
||||
sql = sql_sessions + f"""
|
||||
WHERE (
|
||||
(ses_DateTimeConnection IS NULL AND ses_DateTimeDisconnection >= {period_date})
|
||||
OR (ses_DateTimeDisconnection IS NULL AND ses_StillConnected = 0 AND ses_DateTimeConnection >= {period_date})
|
||||
)
|
||||
"""
|
||||
elif event_type == "voided":
|
||||
sql = sql_events + ' AND eve_EventType LIKE "VOIDED%"'
|
||||
elif event_type == "new":
|
||||
sql = sql_events + ' AND eve_EventType = "New Device"'
|
||||
elif event_type == "down":
|
||||
sql = sql_events + ' AND eve_EventType = "Device Down"'
|
||||
else:
|
||||
sql = sql_events + ' AND 1=0'
|
||||
|
||||
cur.execute(sql)
|
||||
rows = cur.fetchall()
|
||||
conn.close()
|
||||
|
||||
table_data = {"data": []}
|
||||
|
||||
for row in rows:
|
||||
row = list(row) # make mutable
|
||||
|
||||
if event_type in ("sessions", "missing"):
|
||||
# Duration
|
||||
if row[5] and row[6]:
|
||||
delta = format_date_diff(row[5], row[6])
|
||||
row[7] = delta["text"]
|
||||
row[8] = int(delta["total_minutes"] * 60) # seconds
|
||||
elif row[12] == 1:
|
||||
delta = format_date_diff(row[5], None)
|
||||
row[7] = delta["text"]
|
||||
row[8] = int(delta["total_minutes"] * 60) # seconds
|
||||
else:
|
||||
row[7] = "..."
|
||||
row[8] = 0
|
||||
|
||||
# Connection
|
||||
row[5] = format_date(row[5]) if row[5] else "<missing event>"
|
||||
|
||||
# Disconnection
|
||||
if row[6]:
|
||||
row[6] = format_date(row[6])
|
||||
elif row[12] == 0:
|
||||
row[6] = "<missing event>"
|
||||
else:
|
||||
row[6] = "..."
|
||||
|
||||
else:
|
||||
# Event Date
|
||||
row[3] = format_date(row[3])
|
||||
|
||||
# IP Order
|
||||
row[10] = format_ip_long(row[9])
|
||||
|
||||
table_data["data"].append(row)
|
||||
|
||||
return jsonify(table_data)
|
||||
|
||||
@@ -17,6 +17,7 @@ import requests
|
||||
import base64
|
||||
import hashlib
|
||||
import random
|
||||
import email
|
||||
import string
|
||||
import ipaddress
|
||||
|
||||
@@ -57,24 +58,24 @@ def get_timezone_offset():
|
||||
# Date and time methods
|
||||
#-------------------------------------------------------------------------------
|
||||
|
||||
# -------------------------------------------------------------------------------------------
|
||||
def format_date(date_str: str) -> str:
|
||||
"""Format a date string as 'YYYY-MM-DD HH:MM'"""
|
||||
dt = datetime.datetime.fromisoformat(date_str) if isinstance(date_str, str) else date_str
|
||||
return dt.strftime('%Y-%m-%d %H:%M')
|
||||
# # -------------------------------------------------------------------------------------------
|
||||
# def format_date(date_str: str) -> str:
|
||||
# """Format a date string as 'YYYY-MM-DD HH:MM'"""
|
||||
# dt = datetime.datetime.fromisoformat(date_str) if isinstance(date_str, str) else date_str
|
||||
# return dt.strftime('%Y-%m-%d %H:%M')
|
||||
|
||||
# -------------------------------------------------------------------------------------------
|
||||
def format_date_diff(date1: str, date2: str) -> str:
|
||||
"""Return difference between two dates formatted as 'Xd HH:MM'"""
|
||||
dt1 = datetime.datetime.fromisoformat(date1) if isinstance(date1, str) else date1
|
||||
dt2 = datetime.datetime.fromisoformat(date2) if isinstance(date2, str) else date2
|
||||
delta = dt2 - dt1
|
||||
# # -------------------------------------------------------------------------------------------
|
||||
# def format_date_diff(date1: str, date2: str) -> str:
|
||||
# """Return difference between two dates formatted as 'Xd HH:MM'"""
|
||||
# dt1 = datetime.datetime.fromisoformat(date1) if isinstance(date1, str) else date1
|
||||
# dt2 = datetime.datetime.fromisoformat(date2) if isinstance(date2, str) else date2
|
||||
# delta = dt2 - dt1
|
||||
|
||||
days = delta.days
|
||||
hours, remainder = divmod(delta.seconds, 3600)
|
||||
minutes = remainder // 60
|
||||
# days = delta.days
|
||||
# hours, remainder = divmod(delta.seconds, 3600)
|
||||
# minutes = remainder // 60
|
||||
|
||||
return f"{days}d {hours:02}:{minutes:02}"
|
||||
# return f"{days}d {hours:02}:{minutes:02}"
|
||||
|
||||
# -------------------------------------------------------------------------------------------
|
||||
def format_date_iso(date1: str) -> str:
|
||||
@@ -102,6 +103,67 @@ def ensure_datetime(dt: Union[str, datetime, None]) -> datetime:
|
||||
return datetime.datetime.fromisoformat(dt)
|
||||
return dt
|
||||
|
||||
|
||||
def parse_datetime(dt_str):
|
||||
if not dt_str:
|
||||
return None
|
||||
try:
|
||||
# Try ISO8601 first
|
||||
return datetime.datetime.fromisoformat(dt_str)
|
||||
except ValueError:
|
||||
# Try RFC1123 / HTTP format
|
||||
try:
|
||||
return datetime.datetime.strptime(dt_str, '%a, %d %b %Y %H:%M:%S GMT')
|
||||
except ValueError:
|
||||
return None
|
||||
|
||||
def format_date(date_str: str) -> str:
|
||||
dt = parse_datetime(date_str)
|
||||
return dt.strftime('%Y-%m-%d %H:%M') if dt else "invalid"
|
||||
|
||||
def format_date_diff(date1, date2):
|
||||
"""
|
||||
Return difference between two datetimes as 'Xd HH:MM'.
|
||||
Uses app timezone if datetime is naive.
|
||||
date2 can be None (uses now).
|
||||
"""
|
||||
# Get timezone from settings
|
||||
tz_name = get_setting_value("TIMEZONE") or "UTC"
|
||||
tz = pytz.timezone(tz_name)
|
||||
|
||||
def parse_dt(dt):
|
||||
if dt is None:
|
||||
return datetime.datetime.now(tz)
|
||||
if isinstance(dt, str):
|
||||
try:
|
||||
dt_parsed = email.utils.parsedate_to_datetime(dt)
|
||||
except Exception:
|
||||
# fallback: parse ISO string
|
||||
dt_parsed = datetime.datetime.fromisoformat(dt)
|
||||
# convert naive GMT/UTC to app timezone
|
||||
if dt_parsed.tzinfo is None:
|
||||
dt_parsed = tz.localize(dt_parsed)
|
||||
else:
|
||||
dt_parsed = dt_parsed.astimezone(tz)
|
||||
return dt_parsed
|
||||
return dt if dt.tzinfo else tz.localize(dt)
|
||||
|
||||
dt1 = parse_dt(date1)
|
||||
dt2 = parse_dt(date2)
|
||||
|
||||
delta = dt2 - dt1
|
||||
total_minutes = int(delta.total_seconds() // 60)
|
||||
days, rem_minutes = divmod(total_minutes, 1440) # 1440 mins in a day
|
||||
hours, minutes = divmod(rem_minutes, 60)
|
||||
|
||||
return {
|
||||
"text": f"{days}d {hours:02}:{minutes:02}",
|
||||
"days": days,
|
||||
"hours": hours,
|
||||
"minutes": minutes,
|
||||
"total_minutes": total_minutes
|
||||
}
|
||||
|
||||
#-------------------------------------------------------------------------------
|
||||
# File system permission handling
|
||||
#-------------------------------------------------------------------------------
|
||||
|
||||
@@ -80,6 +80,32 @@ def test_delete_events_for_mac(client, api_token, test_mac):
|
||||
assert resp.status_code == 200
|
||||
assert len(resp.json.get("events", [])) == 0
|
||||
|
||||
def test_get_events_totals(client, api_token):
|
||||
# 1. Request totals with default period
|
||||
resp = client.get(
|
||||
"/sessions/totals",
|
||||
headers=auth_headers(api_token)
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
|
||||
data = resp.json
|
||||
assert isinstance(data, list)
|
||||
# Expecting 6 counts: all_events, sessions, missing, voided, new, down
|
||||
assert len(data) == 6
|
||||
for count in data:
|
||||
assert isinstance(count, int) # each should be a number
|
||||
|
||||
# 2. Request totals with custom period
|
||||
resp_month = client.get(
|
||||
"/sessions/totals?period=1 month",
|
||||
headers=auth_headers(api_token)
|
||||
)
|
||||
assert resp_month.status_code == 200
|
||||
data_month = resp_month.json
|
||||
assert isinstance(data_month, list)
|
||||
assert len(data_month) == 6
|
||||
|
||||
|
||||
|
||||
def test_delete_all_events(client, api_token, test_mac):
|
||||
# create two events
|
||||
|
||||
@@ -43,8 +43,6 @@ def test_create_device(client, api_token, test_mac):
|
||||
assert resp.json.get("success") is True
|
||||
|
||||
|
||||
# -----------------------------
|
||||
# CREATE SESSION
|
||||
# -----------------------------
|
||||
def test_create_session(client, api_token, test_mac):
|
||||
payload = {
|
||||
@@ -59,8 +57,6 @@ def test_create_session(client, api_token, test_mac):
|
||||
assert resp.json.get("success") is True
|
||||
|
||||
|
||||
# -----------------------------
|
||||
# LIST SESSIONS
|
||||
# -----------------------------
|
||||
def test_list_sessions(client, api_token, test_mac):
|
||||
# Ensure at least one session exists
|
||||
@@ -79,8 +75,88 @@ def test_list_sessions(client, api_token, test_mac):
|
||||
assert any(ses["ses_MAC"] == test_mac for ses in sessions)
|
||||
|
||||
|
||||
# -----------------------------
|
||||
# DELETE SESSION
|
||||
def test_device_sessions_by_period(client, api_token, test_mac):
|
||||
# 1. Create a dummy session so we have data
|
||||
payload = {
|
||||
"mac": test_mac,
|
||||
"ip": "192.168.1.200",
|
||||
"start_time": timeNowTZ()
|
||||
}
|
||||
resp_create = client.post("/sessions/create", json=payload, headers=auth_headers(api_token))
|
||||
assert resp_create.status_code == 200
|
||||
assert resp_create.json.get("success") is True
|
||||
|
||||
# 2. Query sessions for the device with a valid period
|
||||
resp = client.get(
|
||||
f"/sessions/{test_mac}?period=7 days",
|
||||
headers=auth_headers(api_token)
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
|
||||
data = resp.json
|
||||
assert data.get("success") is True
|
||||
assert "sessions" in data
|
||||
|
||||
sessions = data["sessions"]
|
||||
|
||||
print(sessions)
|
||||
print(test_mac)
|
||||
|
||||
assert isinstance(sessions, list)
|
||||
assert any(s["ses_MAC"] == test_mac for s in sessions)
|
||||
|
||||
|
||||
def test_device_session_events(client, api_token, test_mac):
|
||||
"""
|
||||
Test fetching session/events from the /sessions/session-events endpoint.
|
||||
"""
|
||||
|
||||
# 1. Create a dummy session to ensure we have data
|
||||
payload = {
|
||||
"mac": test_mac,
|
||||
"ip": "192.168.1.250",
|
||||
"start_time": timeNowTZ()
|
||||
}
|
||||
resp_create = client.post(
|
||||
"/sessions/create",
|
||||
json=payload,
|
||||
headers=auth_headers(api_token)
|
||||
)
|
||||
assert resp_create.status_code == 200
|
||||
assert resp_create.json.get("success") is True
|
||||
|
||||
# 2. Fetch session events with default type ('all') and period ('7 days')
|
||||
resp = client.get(
|
||||
f"/sessions/session-events?type=all&period=7 days",
|
||||
headers=auth_headers(api_token)
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
|
||||
data = resp.json
|
||||
assert "data" in data # table data key
|
||||
events = data["data"]
|
||||
|
||||
# 3. Validate the response structure
|
||||
assert isinstance(events, list)
|
||||
|
||||
# If there is at least one row, check fields for sessions
|
||||
if events:
|
||||
row = events[0]
|
||||
# Expecting row as list with at least expected columns
|
||||
assert isinstance(row, list)
|
||||
# IP and datetime fields should exist
|
||||
assert row[9] # IP column
|
||||
assert row[3] # Event datetime column
|
||||
|
||||
# 4. Optionally, test filtering by session type
|
||||
resp_sessions = client.get(
|
||||
"/sessions/session-events?type=sessions&period=7 days",
|
||||
headers=auth_headers(api_token)
|
||||
)
|
||||
assert resp_sessions.status_code == 200
|
||||
sessions = resp_sessions.json["data"]
|
||||
assert isinstance(sessions, list)
|
||||
|
||||
# -----------------------------
|
||||
def test_delete_session(client, api_token, test_mac):
|
||||
# First create session
|
||||
|
||||
Reference in New Issue
Block a user