Files
NetAlertX/server/api_server/devices_endpoint.py
jokob-sk 962bbaa5a1
Some checks failed
Code checks / check-url-paths (push) Has been cancelled
docker / docker_dev (push) Has been cancelled
Deploy MkDocs / deploy (push) Has been cancelled
api layer v0.2.2 - CSV import/export, refactor
2025-08-19 07:56:54 +10:00

195 lines
5.6 KiB
Python
Executable File

#!/usr/bin/env python
import json
import subprocess
import argparse
import os
import pathlib
import base64
import re
import sys
from datetime import datetime
from flask import jsonify, request, Response
import csv
import io
from io import StringIO
# Register NetAlertX directories
INSTALL_PATH="/app"
sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
from database import get_temp_db_connection
from helper import is_random_mac, format_date, get_setting_value
from db.db_helper import get_table_json
# --------------------------
# Device Endpoints Functions
# --------------------------
def delete_devices(macs):
"""
Delete devices from the Devices table.
- If `macs` is None → delete ALL devices.
- If `macs` is a list → delete only matching MACs (supports wildcard '*').
"""
conn = get_temp_db_connection()
cur = conn.cursor()
if not macs:
# No MACs provided → delete all
cur.execute("DELETE FROM Devices")
conn.commit()
conn.close()
return jsonify({"success": True, "deleted": "all"})
deleted_count = 0
for mac in macs:
if "*" in mac:
# Wildcard matching
sql_pattern = mac.replace("*", "%")
cur.execute("DELETE FROM Devices WHERE devMAC LIKE ?", (sql_pattern,))
else:
# Exact match
cur.execute("DELETE FROM Devices WHERE devMAC = ?", (mac,))
deleted_count += cur.rowcount
conn.commit()
conn.close()
return jsonify({"success": True, "deleted_count": deleted_count})
def delete_all_with_empty_macs():
"""Delete devices with empty MAC addresses."""
conn = get_temp_db_connection()
cur = conn.cursor()
cur.execute("DELETE FROM Devices WHERE devMAC IS NULL OR devMAC = ''")
deleted = cur.rowcount
conn.commit()
conn.close()
return jsonify({"success": True, "deleted": deleted})
def delete_unknown_devices():
"""Delete devices marked as unknown."""
conn = get_temp_db_connection()
cur = conn.cursor()
cur.execute("""DELETE FROM Devices WHERE devName='(unknown)' OR devName='(name not found)'""")
conn.commit()
conn.close()
return jsonify({"success": True, "deleted": cur.rowcount})
def export_devices(export_format):
"""
Export devices from the Devices table in teh desired format.
- If `macs` is None → delete ALL devices.
- If `macs` is a list → delete only matching MACs (supports wildcard '*').
"""
conn = get_temp_db_connection()
cur = conn.cursor()
# Fetch all devices
devices_json = get_table_json(cur, "SELECT * FROM Devices")
conn.close()
# Ensure columns exist
columns = devices_json.columnNames or (
list(devices_json["data"][0].keys()) if devices_json["data"] else []
)
if export_format == "json":
# Convert to standard dict for Flask JSON
return jsonify({
"data": [row for row in devices_json["data"]],
"columns": list(columns)
})
elif export_format == "csv":
si = StringIO()
writer = csv.DictWriter(si, fieldnames=columns, quoting=csv.QUOTE_ALL)
writer.writeheader()
for row in devices_json.json["data"]:
writer.writerow(row)
return Response(
si.getvalue(),
mimetype="text/csv",
headers={"Content-Disposition": "attachment; filename=devices.csv"},
)
else:
return jsonify({"error": f"Unsupported format '{export_format}'"}), 400
def import_csv(file_storage=None):
data = ""
skipped = []
error = None
# 1. Try JSON `content` (base64-encoded CSV)
if request.is_json and request.json.get("content"):
try:
data = base64.b64decode(request.json["content"], validate=True).decode("utf-8")
except Exception as e:
return jsonify({"error": f"Base64 decode failed: {e}"}), 400
# 2. Otherwise, try uploaded file
elif file_storage:
data = file_storage.read().decode("utf-8")
# 3. Fallback: try local file (same as PHP `$file = '../../../config/devices.csv';`)
else:
local_file = "/app/config/devices.csv"
try:
with open(local_file, "r", encoding="utf-8") as f:
data = f.read()
except FileNotFoundError:
return jsonify({"error": "CSV file missing"}), 404
if not data:
return jsonify({"error": "No CSV data found"}), 400
# --- Clean up newlines inside quoted fields ---
data = re.sub(
r'"([^"]*)"',
lambda m: m.group(0).replace("\n", " "),
data
)
# --- Parse CSV ---
lines = data.splitlines()
reader = csv.reader(lines)
try:
header = [h.strip() for h in next(reader)]
except StopIteration:
return jsonify({"error": "CSV missing header"}), 400
# --- Wipe Devices table ---
conn = get_temp_db_connection()
sql = conn.cursor()
sql.execute("DELETE FROM Devices")
# --- Prepare insert ---
placeholders = ",".join(["?"] * len(header))
insert_sql = f"INSERT INTO Devices ({', '.join(header)}) VALUES ({placeholders})"
row_count = 0
for idx, row in enumerate(reader, start=1):
if len(row) != len(header):
skipped.append(idx)
continue
try:
sql.execute(insert_sql, [col.strip() for col in row])
row_count += 1
except sqlite3.Error as e:
mylog("error", [f"[ImportCSV] SQL ERROR row {idx}: {e}"])
skipped.append(idx)
conn.commit()
conn.close()
return jsonify({
"success": True,
"inserted": row_count,
"skipped_lines": skipped
})