mirror of
https://github.com/jokob-sk/NetAlertX.git
synced 2026-04-09 19:51:57 -07:00
api layer v0.2.4 - /nettools/traceroute endpoint
This commit is contained in:
@@ -7,7 +7,7 @@ from .devices_endpoint import get_all_devices, delete_unknown_devices, delete_al
|
|||||||
from .events_endpoint import delete_events, delete_events_30, get_events
|
from .events_endpoint import delete_events, delete_events_30, get_events
|
||||||
from .history_endpoint import delete_online_history
|
from .history_endpoint import delete_online_history
|
||||||
from .prometheus_endpoint import getMetricStats
|
from .prometheus_endpoint import getMetricStats
|
||||||
from .nettools_endpoint import wakeonlan
|
from .nettools_endpoint import wakeonlan, traceroute
|
||||||
from .sync_endpoint import handle_sync_post, handle_sync_get
|
from .sync_endpoint import handle_sync_post, handle_sync_get
|
||||||
import sys
|
import sys
|
||||||
|
|
||||||
@@ -200,6 +200,13 @@ def api_wakeonlan():
|
|||||||
mac = request.json.get("devMac")
|
mac = request.json.get("devMac")
|
||||||
return wakeonlan(mac)
|
return wakeonlan(mac)
|
||||||
|
|
||||||
|
@app.route("/nettools/traceroute", methods=["POST"])
|
||||||
|
def api_traceroute():
|
||||||
|
if not is_authorized():
|
||||||
|
return jsonify({"error": "Forbidden"}), 403
|
||||||
|
ip = request.json.get("devLastIP")
|
||||||
|
return traceroute(ip)
|
||||||
|
|
||||||
# --------------------------
|
# --------------------------
|
||||||
# Online history
|
# Online history
|
||||||
# --------------------------
|
# --------------------------
|
||||||
|
|||||||
@@ -1,5 +1,6 @@
|
|||||||
import subprocess
|
import subprocess
|
||||||
import re
|
import re
|
||||||
|
import ipaddress
|
||||||
from flask import jsonify
|
from flask import jsonify
|
||||||
|
|
||||||
def wakeonlan(mac):
|
def wakeonlan(mac):
|
||||||
@@ -19,3 +20,48 @@ def wakeonlan(mac):
|
|||||||
except subprocess.CalledProcessError as e:
|
except subprocess.CalledProcessError as e:
|
||||||
return jsonify({"success": False, "error": "Failed to send WOL packet", "details": e.stderr.strip()}), 500
|
return jsonify({"success": False, "error": "Failed to send WOL packet", "details": e.stderr.strip()}), 500
|
||||||
|
|
||||||
|
def traceroute(ip):
|
||||||
|
"""
|
||||||
|
Executes a traceroute to the given IP address.
|
||||||
|
|
||||||
|
Parameters:
|
||||||
|
ip (str): The target IP address to trace.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
JSON response with:
|
||||||
|
- success (bool)
|
||||||
|
- output (str) if successful
|
||||||
|
- error (str) and details (str) if failed
|
||||||
|
"""
|
||||||
|
# --------------------------
|
||||||
|
# Step 1: Validate IP address
|
||||||
|
# --------------------------
|
||||||
|
try:
|
||||||
|
ipaddress.ip_address(ip)
|
||||||
|
except ValueError:
|
||||||
|
# Return 400 if IP is invalid
|
||||||
|
return jsonify({"success": False, "error": f"Invalid IP: {ip}"}), 400
|
||||||
|
|
||||||
|
# --------------------------
|
||||||
|
# Step 2: Execute traceroute
|
||||||
|
# --------------------------
|
||||||
|
try:
|
||||||
|
result = subprocess.run(
|
||||||
|
["traceroute", ip], # Command and argument
|
||||||
|
capture_output=True, # Capture stdout/stderr
|
||||||
|
text=True, # Return output as string
|
||||||
|
check=True # Raise CalledProcessError on non-zero exit
|
||||||
|
)
|
||||||
|
# Return success response with traceroute output
|
||||||
|
return jsonify({"success": True, "output": result.stdout.strip()})
|
||||||
|
|
||||||
|
# --------------------------
|
||||||
|
# Step 3: Handle command errors
|
||||||
|
# --------------------------
|
||||||
|
except subprocess.CalledProcessError as e:
|
||||||
|
# Return 500 if traceroute fails
|
||||||
|
return jsonify({
|
||||||
|
"success": False,
|
||||||
|
"error": "Traceroute failed",
|
||||||
|
"details": e.stderr.strip()
|
||||||
|
}), 500
|
||||||
|
|||||||
@@ -72,5 +72,38 @@ def test_wakeonlan_device(client, api_token, test_mac):
|
|||||||
assert data.get("success") is True
|
assert data.get("success") is True
|
||||||
assert "WOL packet sent" in data.get("message", "")
|
assert "WOL packet sent" in data.get("message", "")
|
||||||
|
|
||||||
|
def test_traceroute_device(client, api_token, test_mac):
|
||||||
|
# 1. Ensure at least one device exists
|
||||||
|
create_dummy(client, api_token, test_mac)
|
||||||
|
|
||||||
|
# 2. Fetch all devices
|
||||||
|
resp = client.get("/devices", headers=auth_headers(api_token))
|
||||||
|
assert resp.status_code == 200
|
||||||
|
devices = resp.json.get("devices", [])
|
||||||
|
assert len(devices) > 0
|
||||||
|
|
||||||
|
# 3. Pick the first device
|
||||||
|
device_ip = devices[0].get("devLastIP", "192.168.1.1") # fallback if dummy has no IP
|
||||||
|
|
||||||
|
# 4. Call the traceroute endpoint
|
||||||
|
resp = client.post(
|
||||||
|
"/nettools/traceroute",
|
||||||
|
json={"devLastIP": device_ip},
|
||||||
|
headers=auth_headers(api_token)
|
||||||
|
)
|
||||||
|
|
||||||
|
# 5. Assertions
|
||||||
|
if not device_ip or device_ip.lower() == 'invalid':
|
||||||
|
# Expect 400 if IP is missing or invalid
|
||||||
|
assert resp.status_code == 400
|
||||||
|
data = resp.json
|
||||||
|
assert data.get("success") is False
|
||||||
|
else:
|
||||||
|
# Expect 200 and valid traceroute output
|
||||||
|
assert resp.status_code == 200
|
||||||
|
data = resp.json
|
||||||
|
assert data.get("success") is True
|
||||||
|
assert "output" in data
|
||||||
|
assert isinstance(data["output"], str)
|
||||||
|
|
||||||
|
|
||||||
Reference in New Issue
Block a user