mirror of
https://github.com/jokob-sk/NetAlertX.git
synced 2026-03-31 07:12:23 -07:00
BE: /nettoos/interfaces endpoint
Signed-off-by: jokob-sk <jokob.sk@gmail.com>
This commit is contained in:
@@ -58,7 +58,8 @@ from .nettools_endpoint import ( # noqa: E402 [flake8 lint suppression]
|
||||
speedtest,
|
||||
nslookup,
|
||||
nmap_scan,
|
||||
internet_info
|
||||
internet_info,
|
||||
network_interfaces
|
||||
)
|
||||
from .dbquery_endpoint import read_query, write_query, update_query, delete_query # noqa: E402 [flake8 lint suppression]
|
||||
from .sync_endpoint import handle_sync_post, handle_sync_get # noqa: E402 [flake8 lint suppression]
|
||||
@@ -535,6 +536,13 @@ def api_internet_info():
|
||||
return internet_info()
|
||||
|
||||
|
||||
@app.route("/nettools/interfaces", methods=["GET"])
|
||||
def api_network_interfaces():
|
||||
if not is_authorized():
|
||||
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
|
||||
return network_interfaces()
|
||||
|
||||
|
||||
@app.route('/mcp/sse/nettools/trigger-scan', methods=['POST'])
|
||||
@app.route("/nettools/trigger-scan", methods=["GET"])
|
||||
def api_trigger_scan():
|
||||
|
||||
@@ -277,3 +277,90 @@ def internet_info():
|
||||
"details": str(e),
|
||||
}
|
||||
), 500
|
||||
|
||||
|
||||
def network_interfaces():
|
||||
"""
|
||||
API endpoint to fetch network interface info using `nmap --iflist`.
|
||||
Returns JSON with interface info and RX/TX bytes.
|
||||
"""
|
||||
try:
|
||||
# Run Nmap
|
||||
nmap_output = subprocess.run(
|
||||
["nmap", "--iflist"],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
check=True,
|
||||
).stdout.strip()
|
||||
|
||||
# Read /proc/net/dev for RX/TX
|
||||
rx_tx = {}
|
||||
with open("/proc/net/dev") as f:
|
||||
for line in f.readlines()[2:]:
|
||||
if ":" not in line:
|
||||
continue
|
||||
iface, data = line.split(":")
|
||||
iface = iface.strip()
|
||||
cols = data.split()
|
||||
rx_bytes = int(cols[0])
|
||||
tx_bytes = int(cols[8])
|
||||
rx_tx[iface] = {"rx": rx_bytes, "tx": tx_bytes}
|
||||
|
||||
interfaces = {}
|
||||
|
||||
for line in nmap_output.splitlines():
|
||||
line = line.strip()
|
||||
if not line:
|
||||
continue
|
||||
|
||||
# Skip header line
|
||||
if line.startswith("DEV") or line.startswith("----"):
|
||||
continue
|
||||
|
||||
# Regex to parse: DEV (SHORT) IP/MASK TYPE UP MTU MAC
|
||||
match = re.match(
|
||||
r"^(\S+)\s+\(([^)]*)\)\s+(\S+)\s+(\S+)\s+(\S+)\s+(\S+)\s*(\S*)",
|
||||
line
|
||||
)
|
||||
if not match:
|
||||
continue
|
||||
|
||||
dev, short, ipmask, type_, state, mtu_str, mac = match.groups()
|
||||
|
||||
# Only parse MTU if it's a number
|
||||
try:
|
||||
mtu = int(mtu_str)
|
||||
except ValueError:
|
||||
mtu = None
|
||||
|
||||
if dev not in interfaces:
|
||||
interfaces[dev] = {
|
||||
"name": dev,
|
||||
"short": short,
|
||||
"type": type_,
|
||||
"state": state.lower(),
|
||||
"mtu": mtu,
|
||||
"mac": mac if mac else None,
|
||||
"ipv4": [],
|
||||
"ipv6": [],
|
||||
"rx_bytes": rx_tx.get(dev, {}).get("rx", 0),
|
||||
"tx_bytes": rx_tx.get(dev, {}).get("tx", 0),
|
||||
}
|
||||
|
||||
# Parse IP/MASK
|
||||
if ipmask != "(none)/0":
|
||||
if ":" in ipmask:
|
||||
interfaces[dev]["ipv6"].append(ipmask)
|
||||
else:
|
||||
interfaces[dev]["ipv4"].append(ipmask)
|
||||
|
||||
return jsonify({"success": True, "interfaces": interfaces}), 200
|
||||
|
||||
except (subprocess.CalledProcessError, ValueError, FileNotFoundError) as e:
|
||||
return jsonify(
|
||||
{
|
||||
"success": False,
|
||||
"error": "Failed to fetch network interface info",
|
||||
"details": str(e),
|
||||
}
|
||||
), 500
|
||||
|
||||
Reference in New Issue
Block a user