api layer v0.1
Some checks failed
Code checks / check-url-paths (push) Has been cancelled
docker / docker_dev (push) Has been cancelled
Deploy MkDocs / deploy (push) Has been cancelled

This commit is contained in:
jokob-sk
2025-08-15 08:04:02 +10:00
parent 840bfe32d2
commit b155fe2b06
8 changed files with 540 additions and 343 deletions

View File

@@ -53,14 +53,6 @@
var deviceData = JSON.parse(data);
// // Deactivate next previous buttons
// if (readAllData) {
// $('#btnPrevious').attr ('disabled','');
// $('#btnPrevious').addClass ('text-gray50');
// $('#btnNext').attr ('disabled','');
// $('#btnNext').addClass ('text-gray50');
// }
// some race condition, need to implement delay
setTimeout(() => {
$.get('php/server/query_json.php', {
@@ -256,25 +248,27 @@
// update readonly fields
handleReadOnly(settingsData, disabledFields);
};
};
// console.log(relevantSettings)
// console.log(relevantSettings)
generateSimpleForm(relevantSettings);
generateSimpleForm(relevantSettings);
toggleNetworkConfiguration(mac == 'Internet')
toggleNetworkConfiguration(mac == 'Internet')
initSelect2();
initHoverNodeInfo();
initSelect2();
initHoverNodeInfo();
hideSpinner();
hideSpinner();
})
})
}, 100);
});
}, 100);
});
}
}
// ----------------------------------------

View File

@@ -37,10 +37,7 @@
case 'deleteEvents30': deleteEvents30(); break;
case 'deleteActHistory': deleteActHistory(); break;
case 'deleteDeviceEvents': deleteDeviceEvents(); break;
case 'resetDeviceProps': resetDeviceProps(); break;
case 'PiaBackupDBtoArchive': PiaBackupDBtoArchive(); break;
case 'PiaRestoreDBfromArchive': PiaRestoreDBfromArchive(); break;
case 'PiaPurgeDBBackups': PiaPurgeDBBackups(); break;
case 'resetDeviceProps': resetDeviceProps(); break;
case 'ExportCSV': ExportCSV(); break;
case 'ImportCSV': ImportCSV(); break;
@@ -48,8 +45,7 @@
case 'getDevicesListCalendar': getDevicesListCalendar(); break; //todo: slowly deprecate this
case 'updateNetworkLeaf': updateNetworkLeaf(); break;
case 'getIcons': getIcons(); break;
case 'getActions': getActions(); break;
case 'getDevices': getDevices(); break;
case 'copyFromDevice': copyFromDevice(); break;
case 'wakeonlan': wakeonlan(); break;
@@ -517,92 +513,6 @@ function deleteActHistory() {
}
}
//------------------------------------------------------------------------------
// Backup DB to Archiv
//------------------------------------------------------------------------------
function PiaBackupDBtoArchive() {
// prepare fast Backup
$dbfilename = 'app.db';
$file = '../../../db/'.$dbfilename;
$newfile = '../../../db/'.$dbfilename.'.latestbackup';
// copy files as a fast Backup
if (!copy($file, $newfile)) {
echo lang('BackDevices_Backup_CopError');
} else {
// Create archive with actual date
$Pia_Archive_Name = 'appdb_'.date("Ymd_His").'.zip';
$Pia_Archive_Path = '../../../db/';
exec('zip -j '.$Pia_Archive_Path.$Pia_Archive_Name.' ../../../db/'.$dbfilename, $output);
// chheck if archive exists
if (file_exists($Pia_Archive_Path.$Pia_Archive_Name) && filesize($Pia_Archive_Path.$Pia_Archive_Name) > 0) {
echo lang('BackDevices_Backup_okay').': ('.$Pia_Archive_Name.')';
unlink($newfile);
echo("<meta http-equiv='refresh' content='1'>");
} else {
echo lang('BackDevices_Backup_Failed').' ('.$dbfilename.'.latestbackup)';
}
}
}
//------------------------------------------------------------------------------
// Restore DB from Archiv
//------------------------------------------------------------------------------
function PiaRestoreDBfromArchive() {
// prepare fast Backup
$file = '../../../db/'.$dbfilename;
$oldfile = '../../../db/'.$dbfilename.'.prerestore';
// copy files as a fast Backup
if (!copy($file, $oldfile)) {
echo lang('BackDevices_Restore_CopError');
} else {
// extract latest archive and overwrite the actual .db
$Pia_Archive_Path = '../../../db/';
exec('/bin/ls -Art '.$Pia_Archive_Path.'*.zip | /bin/tail -n 1 | /usr/bin/xargs -n1 /bin/unzip -o -d ../../../db/', $output);
// check if the .db exists
if (file_exists($file)) {
echo lang('BackDevices_Restore_okay');
unlink($oldfile);
echo("<meta http-equiv='refresh' content='1'>");
} else {
echo lang('BackDevices_Restore_Failed');
}
}
}
//------------------------------------------------------------------------------
// Purge Backups
//------------------------------------------------------------------------------
function PiaPurgeDBBackups() {
$Pia_Archive_Path = '../../../db';
$Pia_Backupfiles = array();
$files = array_diff(scandir($Pia_Archive_Path, SCANDIR_SORT_DESCENDING), array('.', '..', $dbfilename, 'netalertxdb-reset.zip'));
foreach ($files as &$item)
{
$item = $Pia_Archive_Path.'/'.$item;
if (stristr($item, 'setting_') == '') {array_push($Pia_Backupfiles, $item);}
}
if (sizeof($Pia_Backupfiles) > 3)
{
rsort($Pia_Backupfiles);
unset($Pia_Backupfiles[0], $Pia_Backupfiles[1], $Pia_Backupfiles[2]);
$Pia_Backupfiles_Purge = array_values($Pia_Backupfiles);
for ($i = 0; $i < sizeof($Pia_Backupfiles_Purge); $i++)
{
unlink($Pia_Backupfiles_Purge[$i]);
}
}
echo lang('BackDevices_DBTools_Purge');
echo("<meta http-equiv='refresh' content='1'>");
}
//------------------------------------------------------------------------------
// Export CSV of devices
//------------------------------------------------------------------------------
@@ -827,44 +737,6 @@ function getDevicesListCalendar() {
// Query Device Data
//------------------------------------------------------------------------------
//------------------------------------------------------------------------------
function getIcons() {
global $db;
// Device Data
$sql = 'select devIcon from Devices group by devIcon';
$result = $db->query($sql);
// arrays of rows
$tableData = array();
while ($row = $result -> fetchArray (SQLITE3_ASSOC)) {
$icon = handleNull($row['devIcon'], "<i class='fa fa-laptop'></i>");
// Push row data
$tableData[] = array('id' => $icon,
'name' => $icon );
}
// Control no rows
if (empty($tableData)) {
$tableData = [];
}
// Return json
echo (json_encode ($tableData));
}
//------------------------------------------------------------------------------
function getActions() {
$tableData = array(
array('id' => 'wake-on-lan',
'name' => lang('DevDetail_WOL_Title'))
);
// Return json
echo (json_encode ($tableData));
}
//------------------------------------------------------------------------------
function getDevices() {

View File

@@ -2,6 +2,7 @@ import threading
from flask import Flask, request, jsonify, Response
from flask_cors import CORS
from .graphql_endpoint import devicesSchema
from .device_endpoint import get_device_data, set_device_data
from .prometheus_endpoint import getMetricStats
from .sync_endpoint import handle_sync_post, handle_sync_get
import sys
@@ -17,7 +18,15 @@ from messaging.in_app import write_notification
# Flask application
app = Flask(__name__)
CORS(app, resources={r"/metrics": {"origins": "*"}}, supports_credentials=True, allow_headers=["Authorization"])
CORS(
app,
resources={
r"/metrics": {"origins": "*"},
r"/device/*": {"origins": "*"}
},
supports_credentials=True,
allow_headers=["Authorization", "Content-Type"]
)
# --------------------------
# GraphQL Endpoints
@@ -49,9 +58,108 @@ def graphql_endpoint():
return jsonify(result.data)
# --------------------------
# Prometheus /metrics Endpoint
# Device Endpoints
# --------------------------
@app.route("/device/<mac>", methods=["GET"])
def api_get_device(mac):
if not is_authorized():
return jsonify({"error": "Forbidden"}), 403
return get_device_data(mac)
@app.route("/device/<mac>", methods=["POST"])
def api_set_device(mac):
if not is_authorized():
return jsonify({"error": "Forbidden"}), 403
return set_device_data(mac, request.json)
@app.route("/device/<mac>/delete", methods=["DELETE"])
def api_delete_device(mac):
if not is_authorized():
return jsonify({"error": "Forbidden"}), 403
return delete_device(mac)
@app.route("/device/<mac>/events/delete", methods=["DELETE"])
def api_delete_device_events(mac):
if not is_authorized():
return jsonify({"error": "Forbidden"}), 403
return delete_device_events(mac)
@app.route("/device/<mac>/reset-props", methods=["POST"])
def api_reset_device_props(mac):
if not is_authorized():
return jsonify({"error": "Forbidden"}), 403
return reset_device_props(mac, request.json)
# --------------------------
# Device Collections
# --------------------------
@app.route("/devices", methods=["DELETE"])
def api_delete_all_devices():
if not is_authorized():
return jsonify({"error": "Forbidden"}), 403
return delete_all_devices()
@app.route("/devices/empty-macs", methods=["DELETE"])
def api_delete_all_empty_macs():
if not is_authorized():
return jsonify({"error": "Forbidden"}), 403
return delete_all_with_empty_macs()
@app.route("/devices/unknown", methods=["DELETE"])
def api_delete_unknown_devices():
if not is_authorized():
return jsonify({"error": "Forbidden"}), 403
return delete_unknown_devices()
@app.route("/devices/totals", methods=["GET"])
def api_get_devices_totals():
if not is_authorized():
return jsonify({"error": "Forbidden"}), 403
return get_devices_totals()
# --------------------------
# Device Events / History
# --------------------------
@app.route("/events", methods=["DELETE"])
def api_delete_events():
if not is_authorized():
return jsonify({"error": "Forbidden"}), 403
return delete_events()
@app.route("/events/30days", methods=["DELETE"])
def api_delete_events_30():
if not is_authorized():
return jsonify({"error": "Forbidden"}), 403
return delete_events_30()
@app.route("/history/actions", methods=["DELETE"])
def api_delete_act_history():
if not is_authorized():
return jsonify({"error": "Forbidden"}), 403
return delete_act_history()
# --------------------------
# CSV Import / Export
# --------------------------
@app.route("/devices/export", methods=["GET"])
def api_export_csv():
if not is_authorized():
return jsonify({"error": "Forbidden"}), 403
return export_csv()
@app.route("/devices/import", methods=["POST"])
def api_import_csv():
if not is_authorized():
return jsonify({"error": "Forbidden"}), 403
return import_csv(request.files.get("file"))
# --------------------------
# Prometheus metrics endpoint
# --------------------------
@app.route("/metrics")
def metrics():

View File

@@ -0,0 +1,275 @@
#!/usr/bin/env python
import json
import subprocess
import argparse
import os
import pathlib
import sys
from datetime import datetime
from flask import jsonify, request
# Register NetAlertX directories
INSTALL_PATH="/app"
sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
from database import get_temp_db_connection
from helper import row_to_json, get_date_from_period, is_random_mac, format_date, get_setting_value
# --------------------------
# Device Endpoints Functions
# --------------------------
def get_device_data(mac):
"""Fetch device info with children, event stats, and presence calculation."""
# Open temporary connection for this request
conn = get_temp_db_connection()
cur = conn.cursor()
# Special case for new device
if mac.lower() == "new":
now = datetime.now().strftime("%Y-%m-%d %H:%M")
device_data = {
"devMac": "",
"devName": "",
"devOwner": "",
"devType": "",
"devVendor": "",
"devFavorite": 0,
"devGroup": "",
"devComments": "",
"devFirstConnection": now,
"devLastConnection": now,
"devLastIP": "",
"devStaticIP": 0,
"devScan": 0,
"devLogEvents": 0,
"devAlertEvents": 0,
"devAlertDown": 0,
"devParentRelType": "default",
"devReqNicsOnline": 0,
"devSkipRepeated": 0,
"devLastNotification": "",
"devPresentLastScan": 0,
"devIsNew": 1,
"devLocation": "",
"devIsArchived": 0,
"devParentMAC": "",
"devParentPort": "",
"devIcon": "",
"devGUID": "",
"devSite": "",
"devSSID": "",
"devSyncHubNode": "",
"devSourcePlugin": "",
"devCustomProps": "",
"devStatus": "Unknown",
"devIsRandomMAC": False,
"devSessions": 0,
"devEvents": 0,
"devDownAlerts": 0,
"devPresenceHours": 0,
"devFQDN": ""
}
return jsonify(device_data)
# Compute period date for sessions/events
period = request.args.get('period', '') # e.g., '7 days', '1 month', etc.
period_date_sql = get_date_from_period(period)
current_date = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# Fetch device info + computed fields
sql = f"""
SELECT
d.*,
CASE
WHEN d.devAlertDown != 0 AND d.devPresentLastScan = 0 THEN 'Down'
WHEN d.devPresentLastScan = 1 THEN 'On-line'
ELSE 'Off-line'
END AS devStatus,
(SELECT COUNT(*) FROM Sessions
WHERE ses_MAC = d.devMac AND (
ses_DateTimeConnection >= {period_date_sql} OR
ses_DateTimeDisconnection >= {period_date_sql} OR
ses_StillConnected = 1
)) AS devSessions,
(SELECT COUNT(*) FROM Events
WHERE eve_MAC = d.devMac AND eve_DateTime >= {period_date_sql}
AND eve_EventType NOT IN ('Connected','Disconnected')) AS devEvents,
(SELECT COUNT(*) FROM Events
WHERE eve_MAC = d.devMac AND eve_DateTime >= {period_date_sql}
AND eve_EventType = 'Device Down') AS devDownAlerts,
(SELECT CAST(MAX(0, SUM(
julianday(IFNULL(ses_DateTimeDisconnection,'{current_date}')) -
julianday(CASE WHEN ses_DateTimeConnection < {period_date_sql}
THEN {period_date_sql} ELSE ses_DateTimeConnection END)
) * 24) AS INT)
FROM Sessions
WHERE ses_MAC = d.devMac
AND ses_DateTimeConnection IS NOT NULL
AND (ses_DateTimeDisconnection IS NOT NULL OR ses_StillConnected = 1)
AND (ses_DateTimeConnection >= {period_date_sql}
OR ses_DateTimeDisconnection >= {period_date_sql} OR ses_StillConnected = 1)
) AS devPresenceHours
FROM Devices d
WHERE d.devMac = ? OR CAST(d.rowid AS TEXT) = ?
"""
# Fetch device
cur.execute(sql, (mac, mac))
row = cur.fetchone()
if not row:
return jsonify({"error": "Device not found"}), 404
device_data = row_to_json(list(row.keys()), row)
device_data['devFirstConnection'] = format_date(device_data['devFirstConnection'])
device_data['devLastConnection'] = format_date(device_data['devLastConnection'])
device_data['devIsRandomMAC'] = is_random_mac(device_data['devMac'])
# Fetch children
cur.execute("SELECT * FROM Devices WHERE devParentMAC = ? ORDER BY devPresentLastScan DESC", ( device_data['devMac'],))
children_rows = cur.fetchall()
children = [row_to_json(list(r.keys()), r) for r in children_rows]
children_nics = [c for c in children if c.get("devParentRelType") == "nic"]
device_data['devChildrenDynamic'] = children
device_data['devChildrenNicsDynamic'] = children_nics
conn.close()
return jsonify(device_data)
def set_device_data(mac, data):
"""Update or create a device."""
if data.get("createNew", False):
sql = """
INSERT INTO Devices (
devMac, devName, devOwner, devType, devVendor, devIcon,
devFavorite, devGroup, devLocation, devComments,
devParentMAC, devParentPort, devSSID, devSite,
devStaticIP, devScan, devAlertEvents, devAlertDown,
devParentRelType, devReqNicsOnline, devSkipRepeated,
devIsNew, devIsArchived, devLastConnection,
devFirstConnection, devLastIP, devGUID, devCustomProps,
devSourcePlugin
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
"""
values = (
mac,
data.get("name", ""),
data.get("owner", ""),
data.get("type", ""),
data.get("vendor", ""),
data.get("icon", ""),
data.get("favorite", 0),
data.get("group", ""),
data.get("location", ""),
data.get("comments", ""),
data.get("networknode", ""),
data.get("networknodeport", ""),
data.get("ssid", ""),
data.get("networksite", ""),
data.get("staticIP", 0),
data.get("scancycle", 0),
data.get("alertevents", 0),
data.get("alertdown", 0),
data.get("relType", "default"),
data.get("reqNics", 0),
data.get("skiprepeated", 0),
data.get("newdevice", 0),
data.get("archived", 0),
data.get("devLastConnection", datetime.now().strftime("%Y-%m-%d %H:%M:%S")),
data.get("devFirstConnection", datetime.now().strftime("%Y-%m-%d %H:%M:%S")),
data.get("ip", ""),
data.get("devGUID", ""),
data.get("devCustomProps", ""),
"DUMMY"
)
else:
sql = """
UPDATE Devices SET
devName=?, devOwner=?, devType=?, devVendor=?, devIcon=?,
devFavorite=?, devGroup=?, devLocation=?, devComments=?,
devParentMAC=?, devParentPort=?, devSSID=?, devSite=?,
devStaticIP=?, devScan=?, devAlertEvents=?, devAlertDown=?,
devParentRelType=?, devReqNicsOnline=?, devSkipRepeated=?,
devIsNew=?, devIsArchived=?, devCustomProps=?
WHERE devMac=?
"""
values = (
data.get("name", ""),
data.get("owner", ""),
data.get("type", ""),
data.get("vendor", ""),
data.get("icon", ""),
data.get("favorite", 0),
data.get("group", ""),
data.get("location", ""),
data.get("comments", ""),
data.get("networknode", ""),
data.get("networknodeport", ""),
data.get("ssid", ""),
data.get("networksite", ""),
data.get("staticIP", 0),
data.get("scancycle", 0),
data.get("alertevents", 0),
data.get("alertdown", 0),
data.get("relType", "default"),
data.get("reqNics", 0),
data.get("skiprepeated", 0),
data.get("newdevice", 0),
data.get("archived", 0),
data.get("devCustomProps", ""),
mac
)
conn = get_temp_db_connection()
cur = conn.cursor()
cur.execute(sql, values)
conn.commit()
conn.close()
return jsonify({"success": True})
def delete_device(mac):
"""Delete a device by MAC."""
conn = get_temp_db_connection()
cur = conn.cursor()
cur.execute("DELETE FROM Devices WHERE devMac=?", (mac,))
conn.commit()
conn.close()
return jsonify({"success": True})
def delete_device_events(mac):
"""Delete all events for a device."""
conn = get_temp_db_connection()
cur = conn.cursor()
cur.execute("DELETE FROM Events WHERE eve_MAC=?", (mac,))
conn.commit()
conn.close()
return jsonify({"success": True})
def reset_device_props(mac, data=None):
"""Reset device custom properties to default."""
from .helpers import get_setting_value
default_props = get_setting_value("NEWDEV_devCustomProps")
conn = get_temp_db_connection()
cur = conn.cursor()
cur.execute(
"UPDATE Devices SET devCustomProps=? WHERE devMac=?",
(default_props, mac),
)
conn.commit()
conn.close()
return jsonify({"success": True})

View File

@@ -204,3 +204,11 @@ def get_array_from_sql_rows(rows):
#-------------------------------------------------------------------------------
def get_temp_db_connection():
"""
Returns a new SQLite connection with Row factory.
Should be used per-thread/request to avoid cross-thread issues.
"""
conn = sqlite3.connect(fullDbPath)
conn.row_factory = sqlite3.Row
return conn

View File

@@ -636,6 +636,69 @@ def collect_lang_strings(json, pref, stringSqlParams):
# Misc
#-------------------------------------------------------------------------------
# -------------------------------------------------------------------------------------------
def format_date(date_str: str) -> str:
"""Format a date string as 'YYYY-MM-DD HH:MM'"""
dt = datetime.datetime.fromisoformat(date_str) if isinstance(date_str, str) else date_str
return dt.strftime('%Y-%m-%d %H:%M')
# -------------------------------------------------------------------------------------------
def format_date_diff(date1: str, date2: str) -> str:
"""Return difference between two dates formatted as 'Xd HH:MM'"""
dt1 = datetime.datetime.fromisoformat(date1) if isinstance(date1, str) else date1
dt2 = datetime.datetime.fromisoformat(date2) if isinstance(date2, str) else date2
delta = dt2 - dt1
days = delta.days
hours, remainder = divmod(delta.seconds, 3600)
minutes = remainder // 60
return f"{days}d {hours:02}:{minutes:02}"
# -------------------------------------------------------------------------------------------
def format_date_iso(date1: str) -> str:
"""Return ISO 8601 string for a date"""
dt = datetime.datetime.fromisoformat(date1) if isinstance(date1, str) else date1
return dt.isoformat()
# -------------------------------------------------------------------------------------------
def is_random_mac(mac: str) -> bool:
"""Determine if a MAC address is random, respecting user-defined prefixes not to mark as random."""
is_random = mac[1].upper() in ["2", "6", "A", "E"]
# Get prefixes from settings
prefixes = get_setting_value("UI_NOT_RANDOM_MAC")
# If detected as random, make sure it doesn't start with a prefix the user wants to exclude
if is_random:
for prefix in prefixes:
if mac.upper().startswith(prefix.upper()):
is_random = False
break
return is_random
# -------------------------------------------------------------------------------------------
def get_date_from_period(period):
"""
Convert a period request parameter into an SQLite date expression.
Equivalent to PHP getDateFromPeriod().
Returns a string like "date('now', '-7 day')"
"""
days_map = {
'7 days': 7,
'1 month': 30,
'1 year': 365,
'100 years': 3650, # actually 10 years in original PHP
}
days = days_map.get(period, 1) # default 1 day
period_sql = f"date('now', '-{days} day')"
return period_sql
#-------------------------------------------------------------------------------
def print_table_schema(db, table):
sql = db.sql

68
test/test_devices_endpoint.py Executable file
View File

@@ -0,0 +1,68 @@
import sys
import pathlib
import sqlite3
import random
import string
import uuid
import pytest
INSTALL_PATH = "/app"
sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
from helper import timeNowTZ, get_setting_value
from api_server.api_server_start import app
@pytest.fixture(scope="session")
def api_token():
return get_setting_value("API_TOKEN")
@pytest.fixture
def client():
with app.test_client() as client:
yield client
@pytest.fixture
def test_mac():
# Generate a unique MAC for each test run
return "AA:BB:CC:" + ":".join(f"{random.randint(0,255):02X}" for _ in range(3))
def auth_headers(token):
return {"Authorization": f"Bearer {token}"}
def test_create_device(client, api_token, test_mac):
payload = {
"createNew": True,
"name": "Test Device",
"owner": "Unit Test",
"type": "Router",
"vendor": "TestVendor",
}
resp = client.post(f"/device/{test_mac}", json=payload, headers=auth_headers(api_token))
assert resp.status_code == 200
assert resp.json.get("success") is True
def test_get_device(client, api_token, test_mac):
# First create it
client.post(f"/device/{test_mac}", json={"createNew": True}, headers=auth_headers(api_token))
# Then retrieve it
resp = client.get(f"/device/{test_mac}", headers=auth_headers(api_token))
assert resp.status_code == 200
assert resp.json.get("devMac") == test_mac
def test_reset_device_props(client, api_token, test_mac):
client.post(f"/device/{test_mac}", json={"createNew": True}, headers=auth_headers(api_token))
resp = client.post(f"/device/{test_mac}/reset-props", json={}, headers=auth_headers(api_token))
assert resp.status_code == 200
assert resp.json.get("success") is True
def test_delete_device_events(client, api_token, test_mac):
client.post(f"/device/{test_mac}", json={"createNew": True}, headers=auth_headers(api_token))
resp = client.delete(f"/device/{test_mac}/events/delete", headers=auth_headers(api_token))
assert resp.status_code == 200
assert resp.json.get("success") is True
def test_delete_device(client, api_token, test_mac):
client.post(f"/device/{test_mac}", json={"createNew": True}, headers=auth_headers(api_token))
resp = client.delete(f"/device/{test_mac}/delete", headers=auth_headers(api_token))
assert resp.status_code == 200
assert resp.json.get("success") is True

View File

@@ -1,191 +0,0 @@
import sys
import pathlib
import sqlite3
import random
import string
import uuid
from datetime import datetime, timedelta
sys.path.append(str(pathlib.Path(__file__).parent.parent.resolve()) + "/server/")
from helper import timeNowTZ, updateSubnets
# -------------------------------------------------------------------------------
def test_helper():
assert timeNow() == datetime.datetime.now().replace(microsecond=0)
# -------------------------------------------------------------------------------
def test_updateSubnets():
# test single subnet
subnet = "192.168.1.0/24 --interface=eth0"
result = updateSubnets(subnet)
assert type(result) is list
assert len(result) == 1
# test multiple subnets
subnet = ["192.168.1.0/24 --interface=eth0", "192.168.2.0/24 --interface=eth1"]
result = updateSubnets(subnet)
assert type(result) is list
assert len(result) == 2
# -------------------------------------------------------------------------------
# Function to insert N random device entries
def insert_devices(db_path, num_entries=1):
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
print(f"{num_entries} entries to generate.")
# Function to generate a random MAC address
def generate_mac():
return '00:1A:2B:{:02X}:{:02X}:{:02X}'.format(random.randint(0, 255), random.randint(0, 255), random.randint(0, 255))
# Function to generate a random string of given length
def generate_random_string(length):
return ''.join(random.choices(string.ascii_letters + string.digits, k=length))
# Function to generate a random date within the last `n` days
def generate_random_date(n_days=365):
start_date = datetime.now() - timedelta(days=random.randint(0, n_days))
return start_date.strftime('%Y-%m-%d %H:%M:%S')
# Function to generate a GUID (Globally Unique Identifier)
def generate_guid():
return str(uuid.uuid4()) # Generates a unique GUID
# SQL query to insert a new row into Devices table
insert_query = """
INSERT INTO Devices (
devMac,
devName,
devOwner,
devType,
devVendor,
devFavorite,
devGroup,
devComments,
devFirstConnection,
devLastConnection,
devLastIP,
devStaticIP,
devScan,
devLogEvents,
devAlertEvents,
devAlertDown,
devSkipRepeated,
devLastNotification,
devPresentLastScan,
devIsNew,
devLocation,
devIsArchived,
devParentMAC,
devParentPort,
devIcon,
devGUID,
devSite,
devSSID,
devSyncHubNode,
devSourcePlugin,
devCustomProps,
devFQDN,
devParentRelType,
devReqNicsOnline
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);
"""
# List of device types, vendors, groups, locations
device_types = ['Phone', 'Laptop', 'Tablet', 'Other']
vendors = ['Vendor A', 'Vendor B', 'Vendor C']
groups = ['Group1', 'Group2']
locations = ['Location A', 'Location B']
# Insert the specified number of rows (default is 10,000)
for i in range(num_entries):
dev_mac = generate_mac()
dev_name = f'Device_{i:04d}'
dev_owner = f'Owner_{i % 100:03d}'
dev_type = random.choice(device_types)
dev_vendor = random.choice(vendors)
dev_favorite = random.choice([0, 1])
dev_group = random.choice(groups)
dev_comments = "" # Left as NULL
dev_first_connection = generate_random_date(365) # Within last 365 days
dev_last_connection = generate_random_date(30) # Within last 30 days
dev_last_ip = f'192.168.0.{random.randint(0, 255)}'
dev_static_ip = random.choice([0, 1])
dev_scan = random.randint(1, 10)
dev_log_events = random.choice([0, 1])
dev_alert_events = random.choice([0, 1])
dev_alert_down = random.choice([0, 1])
dev_skip_repeated = random.randint(0, 5)
dev_last_notification = "" # Left as NULL
dev_present_last_scan = random.choice([0, 1])
dev_is_new = random.choice([0, 1])
dev_location = random.choice(locations)
dev_is_archived = random.choice([0, 1])
dev_parent_mac = "" # Left as NULL
dev_parent_port = "" # Left as NULL
dev_icon = "" # Left as NULL
dev_guid = generate_guid() # Left as NULL
dev_site = "" # Left as NULL
dev_ssid = "" # Left as NULL
dev_sync_hub_node = "" # Left as NULL
dev_source_plugin = "" # Left as NULL
dev_devCustomProps = "" # Left as NULL
dev_devFQDN = "" # Left as NULL
# Execute the insert query
cursor.execute(insert_query, (
dev_mac,
dev_name,
dev_owner,
dev_type,
dev_vendor,
dev_favorite,
dev_group,
dev_comments,
dev_first_connection,
dev_last_connection,
dev_last_ip,
dev_static_ip,
dev_scan,
dev_log_events,
dev_alert_events,
dev_alert_down,
dev_skip_repeated,
dev_last_notification,
dev_present_last_scan,
dev_is_new,
dev_location,
dev_is_archived,
dev_parent_mac,
dev_parent_port,
dev_icon,
dev_guid,
dev_site,
dev_ssid,
dev_sync_hub_node,
dev_source_plugin,
dev_devCustomProps,
dev_devFQDN
))
# Commit after every 1000 rows to improve performance
if i % 1000 == 0:
conn.commit()
# Final commit to save all remaining data
conn.commit()
# Close the database connection
conn.close()
print(f"{num_entries} entries have been successfully inserted into the Devices table.")
# -------------------------------------------------------------------------------
if __name__ == "__main__":
# Call insert_devices with database path and number of entries as arguments
db_path = "/app/db/app.db"
num_entries = int(sys.argv[1]) if len(sys.argv) > 1 else 10000
insert_devices(db_path, num_entries)