mirror of
https://github.com/jokob-sk/NetAlertX.git
synced 2025-12-06 17:15:38 -08:00
Compare commits
30 Commits
00a47ab5d3
...
fix-pr-130
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
36e5751221 | ||
|
|
5af760f5ee | ||
|
|
dfd836527e | ||
|
|
8d5a663817 | ||
|
|
fbb4a2f8b4 | ||
|
|
54bce6505b | ||
|
|
6da47cc830 | ||
|
|
9cabbf3622 | ||
|
|
6c28a08bee | ||
|
|
86e3decd4e | ||
|
|
e14e0bb9e8 | ||
|
|
b6023d1373 | ||
|
|
1812cc8ef8 | ||
|
|
e64c490c8a | ||
|
|
5df39f984a | ||
|
|
d007ed711a | ||
|
|
61824abb9f | ||
|
|
33c5548fe1 | ||
|
|
fd41c395ae | ||
|
|
1a980844f0 | ||
|
|
82e018e284 | ||
|
|
e0e1233b1c | ||
|
|
74677f940e | ||
|
|
21a4d20579 | ||
|
|
9634e4e0f7 | ||
|
|
59b417705e | ||
|
|
531b66effe | ||
|
|
5e4ad10fe0 | ||
|
|
541b932b6d | ||
|
|
2bf3ff9f00 |
6
.github/workflows/docker_dev.yml
vendored
6
.github/workflows/docker_dev.yml
vendored
@@ -47,6 +47,12 @@ jobs:
|
||||
id: get_version
|
||||
run: echo "version=Dev" >> $GITHUB_OUTPUT
|
||||
|
||||
# --- debug output
|
||||
- name: Debug version
|
||||
run: |
|
||||
echo "GITHUB_REF: $GITHUB_REF"
|
||||
echo "Version: '${{ steps.get_version.outputs.version }}'"
|
||||
|
||||
# --- Write the timestamped version to .VERSION file
|
||||
- name: Create .VERSION file
|
||||
run: echo "${{ steps.timestamp.outputs.version }}" > .VERSION
|
||||
|
||||
22
.github/workflows/docker_prod.yml
vendored
22
.github/workflows/docker_prod.yml
vendored
@@ -32,14 +32,34 @@ jobs:
|
||||
- name: Set up Docker Buildx
|
||||
uses: docker/setup-buildx-action@v3
|
||||
|
||||
# --- Previous approach Get release version from tag
|
||||
- name: Set up dynamic build ARGs
|
||||
id: getargs
|
||||
run: echo "version=$(cat ./stable/VERSION)" >> $GITHUB_OUTPUT
|
||||
|
||||
- name: Get release version
|
||||
id: get_version_prev
|
||||
run: echo "::set-output name=version::${GITHUB_REF#refs/tags/}"
|
||||
|
||||
- name: Create .VERSION file
|
||||
run: echo "${{ steps.get_version.outputs.version }}" >> .VERSION_PREV
|
||||
|
||||
# --- Get release version from tag
|
||||
- name: Get release version
|
||||
id: get_version
|
||||
run: echo "version=${GITHUB_REF#refs/tags/}" >> $GITHUB_OUTPUT
|
||||
|
||||
|
||||
# --- debug output
|
||||
- name: Debug version
|
||||
run: |
|
||||
echo "GITHUB_REF: $GITHUB_REF"
|
||||
echo "Version: '${{ steps.get_version.outputs.version }}'"
|
||||
echo "Version prev: '${{ steps.get_version_prev.outputs.version }}'"
|
||||
|
||||
# --- Write version to .VERSION file
|
||||
- name: Create .VERSION file
|
||||
run: echo "${{ steps.get_version.outputs.version }}" > .VERSION
|
||||
run: echo -n "${{ steps.get_version.outputs.version }}" > .VERSION
|
||||
|
||||
# --- Generate Docker metadata and tags
|
||||
- name: Docker meta
|
||||
|
||||
1
.gitignore
vendored
1
.gitignore
vendored
@@ -11,6 +11,7 @@ nohup.out
|
||||
config/*
|
||||
.ash_history
|
||||
.VERSION
|
||||
.VERSION_PREV
|
||||
config/pialert.conf
|
||||
config/app.conf
|
||||
db/*
|
||||
|
||||
15
Dockerfile
15
Dockerfile
@@ -26,7 +26,7 @@ ENV PATH="/opt/venv/bin:$PATH"
|
||||
|
||||
# Install build dependencies
|
||||
COPY requirements.txt /tmp/requirements.txt
|
||||
RUN apk add --no-cache bash shadow python3 python3-dev gcc musl-dev libffi-dev openssl-dev git \
|
||||
RUN apk add --no-cache bash shadow python3 python3-dev gcc musl-dev libffi-dev openssl-dev git rust cargo \
|
||||
&& python -m venv /opt/venv
|
||||
|
||||
# Create virtual environment owned by root, but readable by everyone else. This makes it easy to copy
|
||||
@@ -138,6 +138,7 @@ RUN install -d -o ${NETALERTX_USER} -g ${NETALERTX_GROUP} -m 700 ${READ_WRITE_FO
|
||||
|
||||
# Copy version information into the image
|
||||
COPY --chown=${NETALERTX_USER}:${NETALERTX_GROUP} .[V]ERSION ${NETALERTX_APP}/.VERSION
|
||||
COPY --chown=${NETALERTX_USER}:${NETALERTX_GROUP} .[V]ERSION ${NETALERTX_APP}/.VERSION_PREV
|
||||
|
||||
# Copy the virtualenv from the builder stage
|
||||
COPY --from=builder --chown=20212:20212 ${VIRTUAL_ENV} ${VIRTUAL_ENV}
|
||||
@@ -147,12 +148,12 @@ COPY --from=builder --chown=20212:20212 ${VIRTUAL_ENV} ${VIRTUAL_ENV}
|
||||
# This is done after the copy of the venv to ensure the venv is in place
|
||||
# although it may be quicker to do it before the copy, it keeps the image
|
||||
# layers smaller to do it after.
|
||||
RUN if [ -f '.VERSION' ]; then \
|
||||
cp '.VERSION' "${NETALERTX_APP}/.VERSION"; \
|
||||
else \
|
||||
echo "DEVELOPMENT 00000000" > "${NETALERTX_APP}/.VERSION"; \
|
||||
fi && \
|
||||
chown 20212:20212 "${NETALERTX_APP}/.VERSION" && \
|
||||
RUN for vfile in .VERSION .VERSION_PREV; do \
|
||||
if [ ! -f "${NETALERTX_APP}/${vfile}" ]; then \
|
||||
echo "DEVELOPMENT 00000000" > "${NETALERTX_APP}/${vfile}"; \
|
||||
fi; \
|
||||
chown 20212:20212 "${NETALERTX_APP}/${vfile}"; \
|
||||
done && \
|
||||
apk add --no-cache libcap && \
|
||||
setcap cap_net_raw+ep /bin/busybox && \
|
||||
setcap cap_net_raw,cap_net_admin+eip /usr/bin/nmap && \
|
||||
|
||||
@@ -239,29 +239,7 @@ services:
|
||||
|
||||
4. Start the container and verify everything works as expected.
|
||||
5. Stop the container.
|
||||
6. Perform a one-off migration to the latest `netalertx` image and `20211` user:
|
||||
|
||||
> [!NOTE]
|
||||
> The example below assumes your `/config` and `/db` folders are stored in `local_data_dir`.
|
||||
> Replace this path with your actual configuration directory. `netalertx` is the container name, which might differ from your setup.
|
||||
|
||||
```sh
|
||||
docker run -it --rm --name netalertx --user "0" \
|
||||
-v /local_data_dir/config:/data/config \
|
||||
-v /local_data_dir/db:/data/db \
|
||||
--tmpfs /tmp:uid=20211,gid=20211,mode=1700 \
|
||||
ghcr.io/jokob-sk/netalertx:latest
|
||||
```
|
||||
|
||||
..or alternatively execute:
|
||||
|
||||
```bash
|
||||
sudo chown -R 20211:20211 /local_data_dir
|
||||
sudo chmod -R a+rwx /local_data_dir/
|
||||
```
|
||||
|
||||
7. Stop the container
|
||||
8. Update the `docker-compose.yml` as per example below.
|
||||
6. Update the `docker-compose.yml` as per example below.
|
||||
|
||||
```yaml
|
||||
services:
|
||||
@@ -288,5 +266,33 @@ services:
|
||||
- "/tmp:uid=20211,gid=20211,mode=1700,rw,noexec,nosuid,nodev,async,noatime,nodiratime"
|
||||
# 🆕 New "tmpfs" section END 🔼
|
||||
```
|
||||
7. Perform a one-off migration to the latest `netalertx` image and `20211` user.
|
||||
|
||||
9. Start the container and verify everything works as expected.
|
||||
> [!NOTE]
|
||||
> The examples below assumes your `/config` and `/db` folders are stored in `local_data_dir`.
|
||||
> Replace this path with your actual configuration directory. `netalertx` is the container name, which might differ from your setup.
|
||||
|
||||
**Automated approach**:
|
||||
|
||||
Run the container with the `--user "0"` parameter. Please note, some systems will require the manual approach below.
|
||||
|
||||
```sh
|
||||
docker run -it --rm --name netalertx --user "0" \
|
||||
-v /local_data_dir/config:/data/config \
|
||||
-v /local_data_dir/db:/data/db \
|
||||
--tmpfs /tmp:uid=20211,gid=20211,mode=1700 \
|
||||
ghcr.io/jokob-sk/netalertx:latest
|
||||
```
|
||||
|
||||
Stop the container and run it as you would normally.
|
||||
|
||||
**Manual approach**:
|
||||
|
||||
Use the manual approach if the Automated approach fails. Execute the below commands:
|
||||
|
||||
```bash
|
||||
sudo chown -R 20211:20211 /local_data_dir
|
||||
sudo chmod -R a+rwx /local_data_dir
|
||||
```
|
||||
|
||||
8. Start the container and verify everything works as expected.
|
||||
@@ -63,7 +63,6 @@ There is also an in-app Help / FAQ section that should be answering frequently a
|
||||
|
||||
#### ♻ Misc
|
||||
|
||||
- [Version history (legacy)](./VERSIONS_HISTORY.md)
|
||||
- [Reverse proxy (Nginx, Apache, SWAG)](./REVERSE_PROXY.md)
|
||||
- [Installing Updates](./UPDATES.md)
|
||||
- [Setting up Authelia](./AUTHELIA.md) (DRAFT)
|
||||
|
||||
@@ -378,7 +378,7 @@ function localizeTimestamp(input) {
|
||||
let tz = getSetting("TIMEZONE") || 'Europe/Berlin';
|
||||
input = String(input || '').trim();
|
||||
|
||||
// ✅ 1. Unix timestamps (10 or 13 digits)
|
||||
// 1. Unix timestamps (10 or 13 digits)
|
||||
if (/^\d+$/.test(input)) {
|
||||
const ms = input.length === 10 ? parseInt(input, 10) * 1000 : parseInt(input, 10);
|
||||
return new Intl.DateTimeFormat('default', {
|
||||
@@ -389,39 +389,59 @@ function localizeTimestamp(input) {
|
||||
}).format(new Date(ms));
|
||||
}
|
||||
|
||||
// ✅ 2. European DD/MM/YYYY
|
||||
let match = input.match(/^(\d{1,2})\/(\d{1,2})\/(\d{4})(?:[ ,]+(\d{1,2}:\d{2}(?::\d{2})?))?(.*)$/);
|
||||
// 2. European DD/MM/YYYY
|
||||
let match = input.match(/^(\d{1,2})\/(\d{1,2})\/(\d{4})(?:[ ,]+(\d{1,2}:\d{2}(?::\d{2})?))?$/);
|
||||
if (match) {
|
||||
let [ , d, m, y, t = "00:00:00", tzPart = "" ] = match;
|
||||
const iso = `${y}-${m.padStart(2,'0')}-${d.padStart(2,'0')}T${t.length===5?t+":00":t}${tzPart}`;
|
||||
return formatSafe(iso, tz);
|
||||
let [, d, m, y, t = "00:00:00", tzPart = ""] = match;
|
||||
const dNum = parseInt(d, 10);
|
||||
const mNum = parseInt(m, 10);
|
||||
|
||||
if (dNum <= 12 && mNum > 12) {
|
||||
} else {
|
||||
const iso = `${y}-${m.padStart(2,'0')}-${d.padStart(2,'0')}T${t.length===5 ? t + ":00" : t}${tzPart}`;
|
||||
return formatSafe(iso, tz);
|
||||
}
|
||||
}
|
||||
|
||||
// ✅ 3. US MM/DD/YYYY
|
||||
// 3. US MM/DD/YYYY
|
||||
match = input.match(/^(\d{1,2})\/(\d{1,2})\/(\d{4})(?:[ ,]+(\d{1,2}:\d{2}(?::\d{2})?))?(.*)$/);
|
||||
if (match) {
|
||||
let [ , m, d, y, t = "00:00:00", tzPart = "" ] = match;
|
||||
let [, m, d, y, t = "00:00:00", tzPart = ""] = match;
|
||||
const iso = `${y}-${m.padStart(2,'0')}-${d.padStart(2,'0')}T${t.length===5?t+":00":t}${tzPart}`;
|
||||
return formatSafe(iso, tz);
|
||||
}
|
||||
|
||||
// ✅ 4. ISO-style (with T, Z, offsets)
|
||||
match = input.match(/^(\d{4}-\d{1,2}-\d{1,2})[ T](\d{1,2}:\d{2}(?::\d{2})?)(Z|[+-]\d{2}:?\d{2})?$/);
|
||||
// 4. ISO YYYY-MM-DD with optional Z/+offset
|
||||
match = input.match(/^(\d{4})-(0[1-9]|1[0-2])-(0[1-9]|[12]\d|3[01])[ T](\d{1,2}:\d{2}(?::\d{2})?)(Z|[+-]\d{2}:?\d{2})?$/);
|
||||
if (match) {
|
||||
let [ , ymd, time, offset = "" ] = match;
|
||||
// normalize to YYYY-MM-DD
|
||||
let [y, m, d] = ymd.split('-').map(x => x.padStart(2,'0'));
|
||||
let [, y, m, d, time, offset = ""] = match;
|
||||
const iso = `${y}-${m}-${d}T${time.length===5?time+":00":time}${offset}`;
|
||||
return formatSafe(iso, tz);
|
||||
}
|
||||
|
||||
// ✅ 5. RFC2822 / "25 Aug 2025 13:45:22 +0200"
|
||||
// 5. RFC2822 / "25 Aug 2025 13:45:22 +0200"
|
||||
match = input.match(/^\d{1,2} [A-Za-z]{3,} \d{4}/);
|
||||
if (match) {
|
||||
return formatSafe(input, tz);
|
||||
}
|
||||
|
||||
// ✅ 6. Fallback (whatever Date() can parse)
|
||||
// 6. DD-MM-YYYY with optional time
|
||||
match = input.match(/^(\d{1,2})-(\d{1,2})-(\d{4})(?:[ T](\d{1,2}:\d{2}(?::\d{2})?))?$/);
|
||||
if (match) {
|
||||
let [, d, m, y, time = "00:00:00"] = match;
|
||||
const iso = `${y}-${m.padStart(2,'0')}-${d.padStart(2,'0')}T${time.length===5?time+":00":time}`;
|
||||
return formatSafe(iso, tz);
|
||||
}
|
||||
|
||||
// 7. Strict YYYY-DD-MM with optional time
|
||||
match = input.match(/^(\d{4})-(0[1-9]|[12]\d|3[01])-(0[1-9]|1[0-2])(?:[ T](\d{1,2}:\d{2}(?::\d{2})?))?$/);
|
||||
if (match) {
|
||||
let [, y, d, m, time = "00:00:00"] = match;
|
||||
const iso = `${y}-${m}-${d}T${time.length === 5 ? time + ":00" : time}`;
|
||||
return formatSafe(iso, tz);
|
||||
}
|
||||
|
||||
// 8. Fallback
|
||||
return formatSafe(input, tz);
|
||||
|
||||
function formatSafe(str, tz) {
|
||||
@@ -440,6 +460,7 @@ function localizeTimestamp(input) {
|
||||
}
|
||||
|
||||
|
||||
|
||||
// ----------------------------------------------------
|
||||
/**
|
||||
* Replaces double quotes within single-quoted strings, then converts all single quotes to double quotes,
|
||||
@@ -1629,7 +1650,7 @@ async function executeOnce() {
|
||||
await cacheSettings();
|
||||
await cacheStrings();
|
||||
|
||||
console.log("✅ All AJAX callbacks have completed");
|
||||
console.log("All AJAX callbacks have completed");
|
||||
onAllCallsComplete();
|
||||
} catch (error) {
|
||||
console.error("Error:", error);
|
||||
|
||||
@@ -521,13 +521,17 @@ function getChildren(node, list, path, visited = [])
|
||||
|
||||
// Loop through all items to find children of the current node
|
||||
for (var i in list) {
|
||||
if (list[i].devParentMAC.toLowerCase() == node.devMac.toLowerCase() && !hiddenMacs.includes(list[i].devParentMAC)) {
|
||||
const item = list[i];
|
||||
const parentMac = item.devParentMAC || ""; // null-safe
|
||||
const nodeMac = node.devMac || ""; // null-safe
|
||||
|
||||
visibleNodesCount++;
|
||||
if (parentMac != "" && parentMac.toLowerCase() == nodeMac.toLowerCase() && !hiddenMacs.includes(parentMac)) {
|
||||
|
||||
// Process children recursively, passing a copy of the visited list
|
||||
children.push(getChildren(list[i], list, path + ((path == "") ? "" : '|') + list[i].devParentMAC, visited));
|
||||
}
|
||||
visibleNodesCount++;
|
||||
|
||||
// Process children recursively, passing a copy of the visited list
|
||||
children.push(getChildren(list[i], list, path + ((path == "") ? "" : '|') + parentMac, visited));
|
||||
}
|
||||
}
|
||||
|
||||
// Track leaf and parent node counts
|
||||
@@ -565,14 +569,27 @@ function getChildren(node, list, path, visited = [])
|
||||
// ---------------------------------------------------------------------------
|
||||
function getHierarchy()
|
||||
{
|
||||
let internetNode = null;
|
||||
|
||||
for(i in deviceListGlobal)
|
||||
{
|
||||
if(deviceListGlobal[i].devMac == 'Internet')
|
||||
{
|
||||
return (getChildren(deviceListGlobal[i], deviceListGlobal, ''))
|
||||
internetNode = deviceListGlobal[i];
|
||||
|
||||
return (getChildren(internetNode, deviceListGlobal, ''))
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (!internetNode) {
|
||||
showModalOk(
|
||||
getString('Network_Configuration_Error'),
|
||||
getString('Network_Root_Not_Configured')
|
||||
);
|
||||
console.error("getHierarchy(): Internet node not found");
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
//---------------------------------------------------------------------------
|
||||
@@ -671,8 +688,6 @@ function handleNodeClick(el)
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
var myTree;
|
||||
|
||||
|
||||
var emSize;
|
||||
var nodeHeight;
|
||||
// var sizeCoefficient = 1.4
|
||||
@@ -689,140 +704,139 @@ function emToPx(em, element) {
|
||||
|
||||
function initTree(myHierarchy)
|
||||
{
|
||||
// calculate the drawing area based on teh tree width and available screen size
|
||||
|
||||
let baseFontSize = parseFloat($('html').css('font-size'));
|
||||
let treeAreaHeight = ($(window).height() - 155); ;
|
||||
// calculate the font size of the leaf nodes to fit everything into the tree area
|
||||
leafNodesCount == 0 ? 1 : leafNodesCount;
|
||||
|
||||
emSize = pxToEm((treeAreaHeight/(leafNodesCount)).toFixed(2));
|
||||
|
||||
let screenWidthEm = pxToEm($('.networkTable').width()-15);
|
||||
|
||||
// init the drawing area size
|
||||
$("#networkTree").attr('style', `height:${treeAreaHeight}px; width:${emToPx(screenWidthEm)}px`)
|
||||
|
||||
if(myHierarchy.type == "")
|
||||
if(myHierarchy && myHierarchy.type !== "")
|
||||
{
|
||||
showModalOk(getString('Network_Configuration_Error'), getString('Network_Root_Not_Configured'))
|
||||
// calculate the drawing area based on the tree width and available screen size
|
||||
let baseFontSize = parseFloat($('html').css('font-size'));
|
||||
let treeAreaHeight = ($(window).height() - 155); ;
|
||||
|
||||
return;
|
||||
}
|
||||
// calculate the font size of the leaf nodes to fit everything into the tree area
|
||||
leafNodesCount == 0 ? 1 : leafNodesCount;
|
||||
|
||||
// handle canvas and node size if only a few nodes
|
||||
emSize > 1 ? emSize = 1 : emSize = emSize;
|
||||
emSize = pxToEm((treeAreaHeight/(leafNodesCount)).toFixed(2));
|
||||
|
||||
let nodeHeightPx = emToPx(emSize*1);
|
||||
let nodeWidthPx = emToPx(screenWidthEm / (parentNodesCount));
|
||||
let screenWidthEm = pxToEm($('.networkTable').width()-15);
|
||||
|
||||
// handle if only a few nodes
|
||||
nodeWidthPx > 160 ? nodeWidthPx = 160 : nodeWidthPx = nodeWidthPx;
|
||||
// init the drawing area size
|
||||
$("#networkTree").attr('style', `height:${treeAreaHeight}px; width:${emToPx(screenWidthEm)}px`)
|
||||
|
||||
console.log(Treeviz);
|
||||
// handle canvas and node size if only a few nodes
|
||||
emSize > 1 ? emSize = 1 : emSize = emSize;
|
||||
|
||||
myTree = Treeviz.create({
|
||||
htmlId: "networkTree",
|
||||
renderNode: nodeData => {
|
||||
let nodeHeightPx = emToPx(emSize*1);
|
||||
let nodeWidthPx = emToPx(screenWidthEm / (parentNodesCount));
|
||||
|
||||
(!emptyArr.includes(nodeData.data.port )) ? port = nodeData.data.port : port = "";
|
||||
// handle if only a few nodes
|
||||
nodeWidthPx > 160 ? nodeWidthPx = 160 : nodeWidthPx = nodeWidthPx;
|
||||
|
||||
(port == "" || port == 0 || port == 'None' ) ? portBckgIcon = `<i class="fa fa-wifi"></i>` : portBckgIcon = `<i class="fa fa-ethernet"></i>`;
|
||||
console.log(Treeviz);
|
||||
|
||||
portHtml = (port == "" || port == 0 || port == 'None' ) ? "   " : port;
|
||||
myTree = Treeviz.create({
|
||||
htmlId: "networkTree",
|
||||
renderNode: nodeData => {
|
||||
|
||||
// Build HTML for individual nodes in the network diagram
|
||||
deviceIcon = (!emptyArr.includes(nodeData.data.icon )) ?
|
||||
`<div class="netIcon">
|
||||
${atob(nodeData.data.icon)}
|
||||
</div>` : "";
|
||||
devicePort = `<div class="netPort"
|
||||
style="width:${emSize}em;height:${emSize}em">
|
||||
${portHtml}</div>
|
||||
<div class="portBckgIcon"
|
||||
style="margin-left:-${emSize*0.7}em;">
|
||||
${portBckgIcon}
|
||||
</div>`;
|
||||
collapseExpandIcon = nodeData.data.hiddenChildren ?
|
||||
"square-plus" : "square-minus";
|
||||
(!emptyArr.includes(nodeData.data.port )) ? port = nodeData.data.port : port = "";
|
||||
|
||||
// generate +/- icon if node has children nodes
|
||||
collapseExpandHtml = nodeData.data.hasChildren ?
|
||||
`<div class="netCollapse"
|
||||
style="font-size:${nodeHeightPx/2}px;top:${Math.floor(nodeHeightPx / 4)}px"
|
||||
data-mytreepath="${nodeData.data.path}"
|
||||
data-mytreemac="${nodeData.data.mac}">
|
||||
<i class="fa fa-${collapseExpandIcon} pointer"></i>
|
||||
</div>` : "";
|
||||
(port == "" || port == 0 || port == 'None' ) ? portBckgIcon = `<i class="fa fa-wifi"></i>` : portBckgIcon = `<i class="fa fa-ethernet"></i>`;
|
||||
|
||||
selectedNodeMac = $(".nav-tabs-custom .active a").attr('data-mytabmac')
|
||||
portHtml = (port == "" || port == 0 || port == 'None' ) ? "   " : port;
|
||||
|
||||
highlightedCss = nodeData.data.mac == selectedNodeMac ?
|
||||
" highlightedNode " : "";
|
||||
cssNodeType = nodeData.data.devIsNetworkNodeDynamic ?
|
||||
" node-network-device " : " node-standard-device ";
|
||||
// Build HTML for individual nodes in the network diagram
|
||||
deviceIcon = (!emptyArr.includes(nodeData.data.icon )) ?
|
||||
`<div class="netIcon">
|
||||
${atob(nodeData.data.icon)}
|
||||
</div>` : "";
|
||||
devicePort = `<div class="netPort"
|
||||
style="width:${emSize}em;height:${emSize}em">
|
||||
${portHtml}</div>
|
||||
<div class="portBckgIcon"
|
||||
style="margin-left:-${emSize*0.7}em;">
|
||||
${portBckgIcon}
|
||||
</div>`;
|
||||
collapseExpandIcon = nodeData.data.hiddenChildren ?
|
||||
"square-plus" : "square-minus";
|
||||
|
||||
networkHardwareIcon = nodeData.data.devIsNetworkNodeDynamic ? `<span class="network-hw-icon">
|
||||
<i class="fa-solid fa-hard-drive"></i>
|
||||
</span>` : "";
|
||||
// generate +/- icon if node has children nodes
|
||||
collapseExpandHtml = nodeData.data.hasChildren ?
|
||||
`<div class="netCollapse"
|
||||
style="font-size:${nodeHeightPx/2}px;top:${Math.floor(nodeHeightPx / 4)}px"
|
||||
data-mytreepath="${nodeData.data.path}"
|
||||
data-mytreemac="${nodeData.data.mac}">
|
||||
<i class="fa fa-${collapseExpandIcon} pointer"></i>
|
||||
</div>` : "";
|
||||
|
||||
const badgeConf = getStatusBadgeParts(nodeData.data.presentLastScan, nodeData.data.alertDown, nodeData.data.mac, statusText = '')
|
||||
selectedNodeMac = $(".nav-tabs-custom .active a").attr('data-mytabmac')
|
||||
|
||||
return result = `<div
|
||||
class="node-inner hover-node-info box pointer ${highlightedCss} ${cssNodeType}"
|
||||
style="height:${nodeHeightPx}px;font-size:${nodeHeightPx-5}px;"
|
||||
onclick="handleNodeClick(this)"
|
||||
data-mac="${nodeData.data.mac}"
|
||||
data-parentMac="${nodeData.data.parentMac}"
|
||||
data-name="${nodeData.data.name}"
|
||||
data-ip="${nodeData.data.ip}"
|
||||
data-mac="${nodeData.data.mac}"
|
||||
data-vendor="${nodeData.data.vendor}"
|
||||
data-type="${nodeData.data.type}"
|
||||
data-devIsNetworkNodeDynamic="${nodeData.data.devIsNetworkNodeDynamic}"
|
||||
data-lastseen="${nodeData.data.lastseen}"
|
||||
data-firstseen="${nodeData.data.firstseen}"
|
||||
data-relationship="${nodeData.data.relType}"
|
||||
data-status="${nodeData.data.status}"
|
||||
data-present="${nodeData.data.presentLastScan}"
|
||||
data-alert="${nodeData.data.alertDown}"
|
||||
data-icon="${nodeData.data.icon}"
|
||||
>
|
||||
<div class="netNodeText">
|
||||
<strong><span>${devicePort} <span class="${badgeConf.cssText}">${deviceIcon}</span></span>
|
||||
<span class="spanNetworkTree anonymizeDev" style="width:${nodeWidthPx-50}px">${nodeData.data.name}</span>
|
||||
${networkHardwareIcon}
|
||||
</strong>
|
||||
highlightedCss = nodeData.data.mac == selectedNodeMac ?
|
||||
" highlightedNode " : "";
|
||||
cssNodeType = nodeData.data.devIsNetworkNodeDynamic ?
|
||||
" node-network-device " : " node-standard-device ";
|
||||
|
||||
networkHardwareIcon = nodeData.data.devIsNetworkNodeDynamic ? `<span class="network-hw-icon">
|
||||
<i class="fa-solid fa-hard-drive"></i>
|
||||
</span>` : "";
|
||||
|
||||
const badgeConf = getStatusBadgeParts(nodeData.data.presentLastScan, nodeData.data.alertDown, nodeData.data.mac, statusText = '')
|
||||
|
||||
return result = `<div
|
||||
class="node-inner hover-node-info box pointer ${highlightedCss} ${cssNodeType}"
|
||||
style="height:${nodeHeightPx}px;font-size:${nodeHeightPx-5}px;"
|
||||
onclick="handleNodeClick(this)"
|
||||
data-mac="${nodeData.data.mac}"
|
||||
data-parentMac="${nodeData.data.parentMac}"
|
||||
data-name="${nodeData.data.name}"
|
||||
data-ip="${nodeData.data.ip}"
|
||||
data-mac="${nodeData.data.mac}"
|
||||
data-vendor="${nodeData.data.vendor}"
|
||||
data-type="${nodeData.data.type}"
|
||||
data-devIsNetworkNodeDynamic="${nodeData.data.devIsNetworkNodeDynamic}"
|
||||
data-lastseen="${nodeData.data.lastseen}"
|
||||
data-firstseen="${nodeData.data.firstseen}"
|
||||
data-relationship="${nodeData.data.relType}"
|
||||
data-status="${nodeData.data.status}"
|
||||
data-present="${nodeData.data.presentLastScan}"
|
||||
data-alert="${nodeData.data.alertDown}"
|
||||
data-icon="${nodeData.data.icon}"
|
||||
>
|
||||
<div class="netNodeText">
|
||||
<strong><span>${devicePort} <span class="${badgeConf.cssText}">${deviceIcon}</span></span>
|
||||
<span class="spanNetworkTree anonymizeDev" style="width:${nodeWidthPx-50}px">${nodeData.data.name}</span>
|
||||
${networkHardwareIcon}
|
||||
</strong>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
${collapseExpandHtml}`;
|
||||
},
|
||||
mainAxisNodeSpacing: 'auto',
|
||||
// secondaryAxisNodeSpacing: 0.3,
|
||||
nodeHeight: nodeHeightPx,
|
||||
nodeWidth: nodeWidthPx,
|
||||
marginTop: '5',
|
||||
isHorizontal : true,
|
||||
hasZoom: true,
|
||||
hasPan: true,
|
||||
marginLeft: '10',
|
||||
marginRight: '10',
|
||||
idKey: "mac",
|
||||
hasFlatData: false,
|
||||
relationnalField: "children",
|
||||
linkWidth: (nodeData) => 2,
|
||||
linkColor: (nodeData) => {
|
||||
relConf = getRelationshipConf(nodeData.data.relType)
|
||||
return relConf.color;
|
||||
}
|
||||
// onNodeClick: (nodeData) => handleNodeClick(nodeData),
|
||||
});
|
||||
${collapseExpandHtml}`;
|
||||
},
|
||||
mainAxisNodeSpacing: 'auto',
|
||||
// secondaryAxisNodeSpacing: 0.3,
|
||||
nodeHeight: nodeHeightPx,
|
||||
nodeWidth: nodeWidthPx,
|
||||
marginTop: '5',
|
||||
isHorizontal : true,
|
||||
hasZoom: true,
|
||||
hasPan: true,
|
||||
marginLeft: '10',
|
||||
marginRight: '10',
|
||||
idKey: "mac",
|
||||
hasFlatData: false,
|
||||
relationnalField: "children",
|
||||
linkWidth: (nodeData) => 2,
|
||||
linkColor: (nodeData) => {
|
||||
relConf = getRelationshipConf(nodeData.data.relType)
|
||||
return relConf.color;
|
||||
}
|
||||
// onNodeClick: (nodeData) => handleNodeClick(nodeData),
|
||||
});
|
||||
|
||||
console.log(deviceListGlobal);
|
||||
myTree.refresh(myHierarchy);
|
||||
console.log(deviceListGlobal);
|
||||
myTree.refresh(myHierarchy);
|
||||
|
||||
// hide spinning icon
|
||||
hideSpinner()
|
||||
// hide spinning icon
|
||||
hideSpinner()
|
||||
} else
|
||||
{
|
||||
console.error("getHierarchy() not returning expected result");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
2
front/php/templates/language/fr_fr.json
Executable file → Normal file
2
front/php/templates/language/fr_fr.json
Executable file → Normal file
@@ -311,7 +311,7 @@
|
||||
"Gen_Filter": "Filtrer",
|
||||
"Gen_Generate": "Générer",
|
||||
"Gen_InvalidMac": "Adresse MAC invalide.",
|
||||
"Gen_Invalid_Value": "",
|
||||
"Gen_Invalid_Value": "Une valeur invalide a été renseignée",
|
||||
"Gen_LockedDB": "Erreur - La base de données est peut-être verrouillée - Vérifier avec les outils de dév via F12 -> Console ou essayer plus tard.",
|
||||
"Gen_NetworkMask": "Masque réseau",
|
||||
"Gen_Offline": "Hors ligne",
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -311,7 +311,7 @@
|
||||
"Gen_Filter": "Фильтр",
|
||||
"Gen_Generate": "Генерировать",
|
||||
"Gen_InvalidMac": "Неверный Mac-адрес.",
|
||||
"Gen_Invalid_Value": "",
|
||||
"Gen_Invalid_Value": "Введено некорректное значение",
|
||||
"Gen_LockedDB": "ОШИБКА - Возможно, база данных заблокирована. Проверьте инструменты разработчика F12 -> Консоль или повторите попытку позже.",
|
||||
"Gen_NetworkMask": "Маска сети",
|
||||
"Gen_Offline": "Оффлайн",
|
||||
|
||||
2
front/php/templates/language/uk_ua.json
Executable file → Normal file
2
front/php/templates/language/uk_ua.json
Executable file → Normal file
@@ -311,7 +311,7 @@
|
||||
"Gen_Filter": "Фільтр",
|
||||
"Gen_Generate": "Генерувати",
|
||||
"Gen_InvalidMac": "Недійсна Mac-адреса.",
|
||||
"Gen_Invalid_Value": "",
|
||||
"Gen_Invalid_Value": "Введено недійсне значення",
|
||||
"Gen_LockedDB": "ПОМИЛКА – БД може бути заблоковано – перевірте F12 Інструменти розробника -> Консоль або спробуйте пізніше.",
|
||||
"Gen_NetworkMask": "Маска мережі",
|
||||
"Gen_Offline": "Офлайн",
|
||||
|
||||
@@ -14,6 +14,14 @@ Specify the following settings in the Settings section of NetAlertX:
|
||||
|
||||
If unsure, please check [snmpwalk examples](https://www.comparitech.com/net-admin/snmpwalk-examples-windows-linux/).
|
||||
|
||||
Supported output formats:
|
||||
|
||||
```
|
||||
ipNetToMediaPhysAddress[3][192.168.1.9] 6C:6C:6C:6C:6C:b6C1
|
||||
IP-MIB::ipNetToMediaPhysAddress.17.10.10.3.202 = STRING: f8:81:1a:ef:ef:ef
|
||||
mib-2.3.1.1.2.15.1.192.168.1.14 "2C F4 32 18 61 43 "
|
||||
```
|
||||
|
||||
### Setup Cisco IOS
|
||||
|
||||
Enable IOS SNMP service and restrict to selected (internal) IP/Subnet.
|
||||
|
||||
@@ -30,7 +30,7 @@ RESULT_FILE = os.path.join(LOG_PATH, f'last_result.{pluginName}.log')
|
||||
|
||||
|
||||
def main():
|
||||
mylog('verbose', ['[SNMPDSC] In script '])
|
||||
mylog('verbose', f"[{pluginName}] In script ")
|
||||
|
||||
# init global variables
|
||||
global snmpWalkCmds
|
||||
@@ -57,7 +57,7 @@ def main():
|
||||
commands = [snmpWalkCmds]
|
||||
|
||||
for cmd in commands:
|
||||
mylog('verbose', ['[SNMPDSC] Router snmpwalk command: ', cmd])
|
||||
mylog('verbose', [f"[{pluginName}] Router snmpwalk command: ", cmd])
|
||||
# split the string, remove white spaces around each item, and exclude any empty strings
|
||||
snmpwalkArgs = [arg.strip() for arg in cmd.split(' ') if arg.strip()]
|
||||
|
||||
@@ -72,7 +72,7 @@ def main():
|
||||
timeout=(timeoutSetting)
|
||||
)
|
||||
|
||||
mylog('verbose', ['[SNMPDSC] output: ', output])
|
||||
mylog('verbose', [f"[{pluginName}] output: ", output])
|
||||
|
||||
lines = output.split('\n')
|
||||
|
||||
@@ -80,6 +80,8 @@ def main():
|
||||
|
||||
tmpSplt = line.split('"')
|
||||
|
||||
# Expected Format:
|
||||
# mib-2.3.1.1.2.15.1.192.168.1.14 "2C F4 32 18 61 43 "
|
||||
if len(tmpSplt) == 3:
|
||||
|
||||
ipStr = tmpSplt[0].split('.')[-4:] # Get the last 4 elements to extract the IP
|
||||
@@ -89,7 +91,7 @@ def main():
|
||||
macAddress = ':'.join(macStr)
|
||||
ipAddress = '.'.join(ipStr)
|
||||
|
||||
mylog('verbose', [f'[SNMPDSC] IP: {ipAddress} MAC: {macAddress}'])
|
||||
mylog('verbose', [f"[{pluginName}] IP: {ipAddress} MAC: {macAddress}"])
|
||||
|
||||
plugin_objects.add_object(
|
||||
primaryId = handleEmpty(macAddress),
|
||||
@@ -100,8 +102,40 @@ def main():
|
||||
foreignKey = handleEmpty(macAddress) # Use the primary ID as the foreign key
|
||||
)
|
||||
else:
|
||||
mylog('verbose', ['[SNMPDSC] ipStr does not seem to contain a valid IP:', ipStr])
|
||||
mylog('verbose', [f"[{pluginName}] ipStr does not seem to contain a valid IP:", ipStr])
|
||||
|
||||
# Expected Format:
|
||||
# IP-MIB::ipNetToMediaPhysAddress.17.10.10.3.202 = STRING: f8:81:1a:ef:ef:ef
|
||||
elif "ipNetToMediaPhysAddress" in line and "=" in line and "STRING:" in line:
|
||||
|
||||
# Split on "=" → ["IP-MIB::ipNetToMediaPhysAddress.xxx.xxx.xxx.xxx ", " STRING: aa:bb:cc:dd:ee:ff"]
|
||||
left, right = line.split("=", 1)
|
||||
|
||||
# Extract the MAC (right side)
|
||||
macAddress = right.split("STRING:")[-1].strip()
|
||||
macAddress = normalize_mac(macAddress)
|
||||
|
||||
# Extract IP address from the left side
|
||||
# tail of the OID: last 4 integers = IPv4 address
|
||||
oid_parts = left.strip().split('.')
|
||||
ip_parts = oid_parts[-4:]
|
||||
ipAddress = ".".join(ip_parts)
|
||||
|
||||
mylog('verbose', [f"[{pluginName}] (fallback) IP: {ipAddress} MAC: {macAddress}"])
|
||||
|
||||
plugin_objects.add_object(
|
||||
primaryId = handleEmpty(macAddress),
|
||||
secondaryId = handleEmpty(ipAddress),
|
||||
watched1 = '(unknown)',
|
||||
watched2 = handleEmpty(snmpwalkArgs[6]),
|
||||
extra = handleEmpty(line),
|
||||
foreignKey = handleEmpty(macAddress)
|
||||
)
|
||||
|
||||
continue
|
||||
|
||||
# Expected Format:
|
||||
# ipNetToMediaPhysAddress[3][192.168.1.9] 6C:6C:6C:6C:6C:b6C1
|
||||
elif line.startswith('ipNetToMediaPhysAddress'):
|
||||
# Format: snmpwalk -OXsq output
|
||||
parts = line.split()
|
||||
@@ -110,7 +144,7 @@ def main():
|
||||
ipAddress = parts[0].split('[')[-1][:-1]
|
||||
macAddress = normalize_mac(parts[1])
|
||||
|
||||
mylog('verbose', [f'[SNMPDSC] IP: {ipAddress} MAC: {macAddress}'])
|
||||
mylog('verbose', [f"[{pluginName}] IP: {ipAddress} MAC: {macAddress}"])
|
||||
|
||||
plugin_objects.add_object(
|
||||
primaryId = handleEmpty(macAddress),
|
||||
@@ -121,7 +155,7 @@ def main():
|
||||
foreignKey = handleEmpty(macAddress)
|
||||
)
|
||||
|
||||
mylog('verbose', ['[SNMPDSC] Entries found: ', len(plugin_objects)])
|
||||
mylog('verbose', [f"[{pluginName}] Entries found: ", len(plugin_objects)])
|
||||
|
||||
plugin_objects.write_result_file()
|
||||
|
||||
|
||||
@@ -14,7 +14,7 @@ if ! awk '$2 == "/" && $4 ~ /ro/ {found=1} END {exit !found}' /proc/mounts; then
|
||||
══════════════════════════════════════════════════════════════════════════════
|
||||
⚠️ Warning: Container is running as read-write, not in read-only mode.
|
||||
|
||||
Please mount the root filesystem as --read-only or use read-only: true
|
||||
Please mount the root filesystem as --read-only or use read_only: true
|
||||
https://github.com/jokob-sk/NetAlertX/blob/main/docs/docker-troubleshooting/read-only-filesystem.md
|
||||
══════════════════════════════════════════════════════════════════════════════
|
||||
EOF
|
||||
|
||||
@@ -30,3 +30,4 @@ urllib3
|
||||
httplib2
|
||||
gunicorn
|
||||
git+https://github.com/foreign-sub/aiofreepybox.git
|
||||
mcp
|
||||
|
||||
@@ -2,12 +2,19 @@ import threading
|
||||
import sys
|
||||
import os
|
||||
|
||||
from flask import Flask, request, jsonify, Response
|
||||
from flask import Flask, request, jsonify, Response, stream_with_context
|
||||
import json
|
||||
import uuid
|
||||
import queue
|
||||
import requests
|
||||
import logging
|
||||
from datetime import datetime, timedelta
|
||||
from models.device_instance import DeviceInstance # noqa: E402
|
||||
from flask_cors import CORS
|
||||
|
||||
# Register NetAlertX directories
|
||||
INSTALL_PATH = os.getenv("NETALERTX_APP", "/app")
|
||||
sys.path.extend([f"{INSTALL_PATH}/server"])
|
||||
sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
|
||||
|
||||
from logger import mylog # noqa: E402 [flake8 lint suppression]
|
||||
from helper import get_setting_value # noqa: E402 [flake8 lint suppression]
|
||||
@@ -63,6 +70,9 @@ from .dbquery_endpoint import read_query, write_query, update_query, delete_quer
|
||||
from .sync_endpoint import handle_sync_post, handle_sync_get # noqa: E402 [flake8 lint suppression]
|
||||
from .logs_endpoint import clean_log # noqa: E402 [flake8 lint suppression]
|
||||
from models.user_events_queue_instance import UserEventsQueueInstance # noqa: E402 [flake8 lint suppression]
|
||||
from database import DB # noqa: E402 [flake8 lint suppression]
|
||||
from models.plugin_object_instance import PluginObjectInstance # noqa: E402 [flake8 lint suppression]
|
||||
from plugin_helper import is_mac # noqa: E402 [flake8 lint suppression]
|
||||
from messaging.in_app import ( # noqa: E402 [flake8 lint suppression]
|
||||
write_notification,
|
||||
mark_all_notifications_read,
|
||||
@@ -71,9 +81,14 @@ from messaging.in_app import ( # noqa: E402 [flake8 lint suppression]
|
||||
delete_notification,
|
||||
mark_notification_as_read
|
||||
)
|
||||
from .tools_routes import openapi_spec as tools_openapi_spec # noqa: E402 [flake8 lint suppression]
|
||||
# tools and mcp routes have been moved into this module (api_server_start)
|
||||
|
||||
# Flask application
|
||||
app = Flask(__name__)
|
||||
|
||||
# Register Blueprints
|
||||
# No separate blueprints for tools or mcp - routes are registered below
|
||||
CORS(
|
||||
app,
|
||||
resources={
|
||||
@@ -87,16 +102,221 @@ CORS(
|
||||
r"/dbquery/*": {"origins": "*"},
|
||||
r"/messaging/*": {"origins": "*"},
|
||||
r"/events/*": {"origins": "*"},
|
||||
r"/logs/*": {"origins": "*"}
|
||||
r"/logs/*": {"origins": "*"},
|
||||
r"/api/tools/*": {"origins": "*"}
|
||||
r"/auth/*": {"origins": "*"}
|
||||
},
|
||||
supports_credentials=True,
|
||||
allow_headers=["Authorization", "Content-Type"],
|
||||
)
|
||||
|
||||
# -----------------------------------------------
|
||||
# DB model instances for helper usage
|
||||
# -----------------------------------------------
|
||||
db_helper = DB()
|
||||
db_helper.open()
|
||||
device_handler = DeviceInstance(db_helper)
|
||||
plugin_object_handler = PluginObjectInstance(db_helper)
|
||||
|
||||
# -------------------------------------------------------------------------------
|
||||
# MCP bridge variables + helpers (moved from mcp_routes)
|
||||
# -------------------------------------------------------------------------------
|
||||
mcp_sessions = {}
|
||||
mcp_sessions_lock = threading.Lock()
|
||||
mcp_openapi_spec_cache = None
|
||||
|
||||
BACKEND_PORT = get_setting_value("GRAPHQL_PORT")
|
||||
API_BASE_URL = f"http://localhost:{BACKEND_PORT}/api/tools"
|
||||
|
||||
|
||||
def get_openapi_spec_local():
|
||||
global mcp_openapi_spec_cache
|
||||
if mcp_openapi_spec_cache:
|
||||
return mcp_openapi_spec_cache
|
||||
try:
|
||||
resp = requests.get(f"{API_BASE_URL}/openapi.json", timeout=10)
|
||||
resp.raise_for_status()
|
||||
mcp_openapi_spec_cache = resp.json()
|
||||
return mcp_openapi_spec_cache
|
||||
except Exception as e:
|
||||
mylog('minimal', [f"Error fetching OpenAPI spec: {e}"])
|
||||
return None
|
||||
|
||||
|
||||
def map_openapi_to_mcp_tools(spec):
|
||||
tools = []
|
||||
if not spec or 'paths' not in spec:
|
||||
return tools
|
||||
for path, methods in spec['paths'].items():
|
||||
for method, details in methods.items():
|
||||
if 'operationId' in details:
|
||||
tool = {
|
||||
'name': details['operationId'],
|
||||
'description': details.get('description', details.get('summary', '')),
|
||||
'inputSchema': {'type': 'object', 'properties': {}, 'required': []},
|
||||
}
|
||||
if 'requestBody' in details:
|
||||
content = details['requestBody'].get('content', {})
|
||||
if 'application/json' in content:
|
||||
schema = content['application/json'].get('schema', {})
|
||||
tool['inputSchema'] = schema.copy()
|
||||
if 'properties' not in tool['inputSchema']:
|
||||
tool['inputSchema']['properties'] = {}
|
||||
if 'parameters' in details:
|
||||
for param in details['parameters']:
|
||||
if param.get('in') == 'query':
|
||||
tool['inputSchema']['properties'][param['name']] = {
|
||||
'type': param.get('schema', {}).get('type', 'string'),
|
||||
'description': param.get('description', ''),
|
||||
}
|
||||
if param.get('required'):
|
||||
tool['inputSchema'].setdefault('required', []).append(param['name'])
|
||||
tools.append(tool)
|
||||
return tools
|
||||
|
||||
|
||||
def process_mcp_request(data):
|
||||
method = data.get('method')
|
||||
msg_id = data.get('id')
|
||||
response = None
|
||||
if method == 'initialize':
|
||||
response = {
|
||||
'jsonrpc': '2.0',
|
||||
'id': msg_id,
|
||||
'result': {
|
||||
'protocolVersion': '2024-11-05',
|
||||
'capabilities': {'tools': {}},
|
||||
'serverInfo': {'name': 'NetAlertX', 'version': '1.0.0'},
|
||||
},
|
||||
}
|
||||
elif method == 'notifications/initialized':
|
||||
pass
|
||||
elif method == 'tools/list':
|
||||
spec = get_openapi_spec_local()
|
||||
tools = map_openapi_to_mcp_tools(spec)
|
||||
response = {'jsonrpc': '2.0', 'id': msg_id, 'result': {'tools': tools}}
|
||||
elif method == 'tools/call':
|
||||
params = data.get('params', {})
|
||||
tool_name = params.get('name')
|
||||
tool_args = params.get('arguments', {})
|
||||
spec = get_openapi_spec_local()
|
||||
target_path = None
|
||||
target_method = None
|
||||
if spec and 'paths' in spec:
|
||||
for path, methods in spec['paths'].items():
|
||||
for m, details in methods.items():
|
||||
if details.get('operationId') == tool_name:
|
||||
target_path = path
|
||||
target_method = m.upper()
|
||||
break
|
||||
if target_path:
|
||||
break
|
||||
if target_path:
|
||||
try:
|
||||
headers = {'Content-Type': 'application/json'}
|
||||
if 'Authorization' in request.headers:
|
||||
headers['Authorization'] = request.headers['Authorization']
|
||||
url = f"{API_BASE_URL}{target_path}"
|
||||
if target_method == 'POST':
|
||||
api_res = requests.post(url, json=tool_args, headers=headers, timeout=30)
|
||||
elif target_method == 'GET':
|
||||
api_res = requests.get(url, params=tool_args, headers=headers, timeout=30)
|
||||
else:
|
||||
api_res = None
|
||||
if api_res:
|
||||
content = []
|
||||
try:
|
||||
json_content = api_res.json()
|
||||
content.append({'type': 'text', 'text': json.dumps(json_content, indent=2)})
|
||||
except Exception:
|
||||
content.append({'type': 'text', 'text': api_res.text})
|
||||
is_error = api_res.status_code >= 400
|
||||
response = {'jsonrpc': '2.0', 'id': msg_id, 'result': {'content': content, 'isError': is_error}}
|
||||
else:
|
||||
response = {'jsonrpc': '2.0', 'id': msg_id, 'error': {'code': -32601, 'message': f"Method {target_method} not supported"}}
|
||||
except Exception as e:
|
||||
response = {'jsonrpc': '2.0', 'id': msg_id, 'result': {'content': [{'type': 'text', 'text': f"Error calling tool: {str(e)}"}], 'isError': True}}
|
||||
else:
|
||||
response = {'jsonrpc': '2.0', 'id': msg_id, 'error': {'code': -32601, 'message': f"Tool {tool_name} not found"}}
|
||||
elif method == 'ping':
|
||||
response = {'jsonrpc': '2.0', 'id': msg_id, 'result': {}}
|
||||
else:
|
||||
if msg_id:
|
||||
response = {'jsonrpc': '2.0', 'id': msg_id, 'error': {'code': -32601, 'message': 'Method not found'}}
|
||||
return response
|
||||
|
||||
|
||||
@app.route('/api/mcp/sse', methods=['GET', 'POST'])
|
||||
def api_mcp_sse():
|
||||
if request.method == 'POST':
|
||||
try:
|
||||
data = request.get_json(silent=True)
|
||||
if data and 'method' in data and 'jsonrpc' in data:
|
||||
response = process_mcp_request(data)
|
||||
if response:
|
||||
return jsonify(response)
|
||||
else:
|
||||
return '', 202
|
||||
except Exception as e:
|
||||
logging.getLogger(__name__).debug(f'SSE POST processing error: {e}')
|
||||
return jsonify({'status': 'ok', 'message': 'MCP SSE endpoint active'}), 200
|
||||
|
||||
session_id = uuid.uuid4().hex
|
||||
q = queue.Queue()
|
||||
with mcp_sessions_lock:
|
||||
mcp_sessions[session_id] = q
|
||||
|
||||
def stream():
|
||||
yield f"event: endpoint\ndata: /api/mcp/messages?session_id={session_id}\n\n"
|
||||
try:
|
||||
while True:
|
||||
try:
|
||||
message = q.get(timeout=20)
|
||||
yield f"event: message\ndata: {json.dumps(message)}\n\n"
|
||||
except queue.Empty:
|
||||
yield ": keep-alive\n\n"
|
||||
except GeneratorExit:
|
||||
with mcp_sessions_lock:
|
||||
if session_id in mcp_sessions:
|
||||
del mcp_sessions[session_id]
|
||||
return Response(stream_with_context(stream()), mimetype='text/event-stream')
|
||||
|
||||
|
||||
@app.route('/api/mcp/messages', methods=['POST'])
|
||||
def api_mcp_messages():
|
||||
session_id = request.args.get('session_id')
|
||||
if not session_id:
|
||||
return jsonify({"error": "Missing session_id"}), 400
|
||||
with mcp_sessions_lock:
|
||||
if session_id not in mcp_sessions:
|
||||
return jsonify({"error": "Session not found"}), 404
|
||||
q = mcp_sessions[session_id]
|
||||
data = request.json
|
||||
if not data:
|
||||
return jsonify({"error": "Invalid JSON"}), 400
|
||||
response = process_mcp_request(data)
|
||||
if response:
|
||||
q.put(response)
|
||||
return jsonify({"status": "accepted"}), 202
|
||||
|
||||
|
||||
# -------------------------------------------------------------------
|
||||
# Custom handler for 404 - Route not found
|
||||
# -------------------------------------------------------------------
|
||||
@app.before_request
|
||||
def log_request_info():
|
||||
"""Log details of every incoming request."""
|
||||
# Filter out noisy requests if needed, but user asked for drastic logging
|
||||
mylog("verbose", [f"[HTTP] {request.method} {request.path} from {request.remote_addr}"])
|
||||
# Filter sensitive headers before logging
|
||||
safe_headers = {k: v for k, v in request.headers if k.lower() not in ('authorization', 'cookie', 'x-api-key')}
|
||||
mylog("debug", [f"[HTTP] Headers: {safe_headers}"])
|
||||
if request.method == "POST":
|
||||
# Be careful with large bodies, but log first 1000 chars
|
||||
data = request.get_data(as_text=True)
|
||||
mylog("debug", [f"[HTTP] Body length: {len(data)} chars"])
|
||||
|
||||
|
||||
@app.errorhandler(404)
|
||||
def not_found(error):
|
||||
response = {
|
||||
@@ -145,6 +365,183 @@ def graphql_endpoint():
|
||||
return jsonify(response)
|
||||
|
||||
|
||||
# --------------------------
|
||||
# Tools endpoints (moved from tools_routes)
|
||||
# --------------------------
|
||||
|
||||
|
||||
@app.route('/api/tools/trigger_scan', methods=['POST'])
|
||||
def api_trigger_scan():
|
||||
if not is_authorized():
|
||||
return jsonify({"error": "Unauthorized"}), 401
|
||||
|
||||
data = request.get_json() or {}
|
||||
scan_type = data.get('scan_type', 'nmap_fast')
|
||||
# Map requested scan type to plugin prefix
|
||||
plugin_prefix = None
|
||||
if scan_type in ['nmap_fast', 'nmap_deep']:
|
||||
plugin_prefix = 'NMAPDEV'
|
||||
elif scan_type == 'arp':
|
||||
plugin_prefix = 'ARPSCAN'
|
||||
else:
|
||||
return jsonify({"error": "Invalid scan_type. Must be 'arp', 'nmap_fast', or 'nmap_deep'"}), 400
|
||||
|
||||
queue_instance = UserEventsQueueInstance()
|
||||
action = f"run|{plugin_prefix}"
|
||||
success, message = queue_instance.add_event(action)
|
||||
if success:
|
||||
return jsonify({"success": True, "message": f"Triggered plugin {plugin_prefix} via ad-hoc queue."})
|
||||
else:
|
||||
return jsonify({"success": False, "error": message}), 500
|
||||
|
||||
|
||||
@app.route('/api/tools/list_devices', methods=['POST'])
|
||||
def api_tools_list_devices():
|
||||
if not is_authorized():
|
||||
return jsonify({"error": "Unauthorized"}), 401
|
||||
return get_all_devices()
|
||||
|
||||
|
||||
@app.route('/api/tools/get_device_info', methods=['POST'])
|
||||
def api_tools_get_device_info():
|
||||
if not is_authorized():
|
||||
return jsonify({"error": "Unauthorized"}), 401
|
||||
data = request.get_json(silent=True) or {}
|
||||
query = data.get('query')
|
||||
if not query:
|
||||
return jsonify({"error": "Missing 'query' parameter"}), 400
|
||||
# if MAC -> device endpoint
|
||||
if is_mac(query):
|
||||
return get_device_data(query)
|
||||
# search by name or IP
|
||||
matches = device_handler.search(query)
|
||||
if not matches:
|
||||
return jsonify({"message": "No devices found"}), 404
|
||||
return jsonify(matches)
|
||||
|
||||
|
||||
@app.route('/api/tools/get_latest_device', methods=['POST'])
|
||||
def api_tools_get_latest_device():
|
||||
if not is_authorized():
|
||||
return jsonify({"error": "Unauthorized"}), 401
|
||||
latest = device_handler.getLatest()
|
||||
if not latest:
|
||||
return jsonify({"message": "No devices found"}), 404
|
||||
return jsonify([latest])
|
||||
|
||||
|
||||
@app.route('/api/tools/get_open_ports', methods=['POST'])
|
||||
def api_tools_get_open_ports():
|
||||
if not is_authorized():
|
||||
return jsonify({"error": "Unauthorized"}), 401
|
||||
data = request.get_json(silent=True) or {}
|
||||
target = data.get('target')
|
||||
if not target:
|
||||
return jsonify({"error": "Target is required"}), 400
|
||||
|
||||
# If MAC is provided, use plugin objects to get port entries
|
||||
if is_mac(target):
|
||||
entries = plugin_object_handler.getByPrimary('NMAP', target.lower())
|
||||
open_ports = []
|
||||
for e in entries:
|
||||
try:
|
||||
port = int(e.get('Object_SecondaryID', 0))
|
||||
except (ValueError, TypeError):
|
||||
continue
|
||||
service = e.get('Watched_Value2', 'unknown')
|
||||
open_ports.append({"port": port, "service": service})
|
||||
return jsonify({"success": True, "target": target, "open_ports": open_ports, "raw": entries})
|
||||
|
||||
# If IP provided, try to resolve to MAC and proceed
|
||||
# Use device handler to resolve IP
|
||||
device = device_handler.getByIP(target)
|
||||
if device and device.get('devMac'):
|
||||
mac = device.get('devMac')
|
||||
entries = plugin_object_handler.getByPrimary('NMAP', mac.lower())
|
||||
open_ports = []
|
||||
for e in entries:
|
||||
try:
|
||||
port = int(e.get('Object_SecondaryID', 0))
|
||||
except (ValueError, TypeError):
|
||||
continue
|
||||
service = e.get('Watched_Value2', 'unknown')
|
||||
open_ports.append({"port": port, "service": service})
|
||||
return jsonify({"success": True, "target": target, "open_ports": open_ports, "raw": entries})
|
||||
|
||||
# No plugin data found; as fallback use nettools nmap_scan (may run subprocess)
|
||||
# Note: Prefer plugin data (NMAP) when available
|
||||
res = nmap_scan(target, 'fast')
|
||||
return res
|
||||
|
||||
|
||||
@app.route('/api/tools/get_network_topology', methods=['GET'])
|
||||
def api_tools_get_network_topology():
|
||||
if not is_authorized():
|
||||
return jsonify({"error": "Unauthorized"}), 401
|
||||
topo = device_handler.getNetworkTopology()
|
||||
return jsonify(topo)
|
||||
|
||||
|
||||
@app.route('/api/tools/get_recent_alerts', methods=['POST'])
|
||||
def api_tools_get_recent_alerts():
|
||||
if not is_authorized():
|
||||
return jsonify({"error": "Unauthorized"}), 401
|
||||
data = request.get_json(silent=True) or {}
|
||||
hours = int(data.get('hours', 24))
|
||||
# Reuse get_events() - which returns a Flask response with JSON containing 'events'
|
||||
res = get_events()
|
||||
events_json = res.get_json() if hasattr(res, 'get_json') else None
|
||||
events = events_json.get('events', []) if events_json else []
|
||||
cutoff = datetime.now() - timedelta(hours=hours)
|
||||
filtered = [e for e in events if 'eve_DateTime' in e and datetime.strptime(e['eve_DateTime'], '%Y-%m-%d %H:%M:%S') > cutoff]
|
||||
return jsonify(filtered)
|
||||
|
||||
|
||||
@app.route('/api/tools/set_device_alias', methods=['POST'])
|
||||
def api_tools_set_device_alias():
|
||||
if not is_authorized():
|
||||
return jsonify({"error": "Unauthorized"}), 401
|
||||
data = request.get_json(silent=True) or {}
|
||||
mac = data.get('mac')
|
||||
alias = data.get('alias')
|
||||
if not mac or not alias:
|
||||
return jsonify({"error": "MAC and Alias are required"}), 400
|
||||
return update_device_column(mac, 'devName', alias)
|
||||
|
||||
|
||||
@app.route('/api/tools/wol_wake_device', methods=['POST'])
|
||||
def api_tools_wol_wake_device():
|
||||
if not is_authorized():
|
||||
return jsonify({"error": "Unauthorized"}), 401
|
||||
data = request.get_json(silent=True) or {}
|
||||
mac = data.get('mac')
|
||||
ip = data.get('ip')
|
||||
if not mac and not ip:
|
||||
return jsonify({"error": "MAC or IP is required"}), 400
|
||||
# Resolve IP to MAC if needed
|
||||
if not mac and ip:
|
||||
device = device_handler.getByIP(ip)
|
||||
if not device or not device.get('devMac'):
|
||||
return jsonify({"error": f"Could not resolve MAC for IP {ip}"}), 404
|
||||
mac = device.get('devMac')
|
||||
# Validate mac using is_mac helper
|
||||
if not is_mac(mac):
|
||||
return jsonify({"success": False, "error": f"Invalid MAC: {mac}"}), 400
|
||||
return wakeonlan(mac)
|
||||
|
||||
|
||||
@app.route('/api/tools/openapi.json', methods=['GET'])
|
||||
def api_tools_openapi_spec():
|
||||
# Minimal OpenAPI spec for tools
|
||||
spec = {
|
||||
"openapi": "3.0.0",
|
||||
"info": {"title": "NetAlertX Tools", "version": "1.1.0"},
|
||||
"servers": [{"url": "/api/tools"}],
|
||||
"paths": {}
|
||||
}
|
||||
return jsonify(spec)
|
||||
|
||||
|
||||
# --------------------------
|
||||
# Settings Endpoints
|
||||
# --------------------------
|
||||
@@ -744,6 +1141,23 @@ def sync_endpoint():
|
||||
return jsonify({"success": False, "message": "ERROR: No allowed", "error": "Method Not Allowed"}), 405
|
||||
|
||||
|
||||
# --------------------------
|
||||
# Auth endpoint
|
||||
# --------------------------
|
||||
@app.route("/auth", methods=["GET"])
|
||||
def check_auth():
|
||||
if not is_authorized():
|
||||
return jsonify({"success": False, "message": "ERROR: Not authorized", "error": "Forbidden"}), 403
|
||||
|
||||
elif request.method == "GET":
|
||||
return jsonify({"success": True, "message": "Authentication check successful"}), 200
|
||||
else:
|
||||
msg = "[sync endpoint] Method Not Allowed"
|
||||
write_notification(msg, "alert")
|
||||
mylog("verbose", [msg])
|
||||
return jsonify({"success": False, "message": "ERROR: No allowed", "error": "Method Not Allowed"}), 405
|
||||
|
||||
|
||||
# --------------------------
|
||||
# Background Server Start
|
||||
# --------------------------
|
||||
@@ -775,3 +1189,9 @@ def start_server(graphql_port, app_state):
|
||||
|
||||
# Update the state to indicate the server has started
|
||||
app_state = updateState("Process: Idle", None, None, None, 1)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# This block is for running the server directly for testing purposes
|
||||
# In production, start_server is called from api.py
|
||||
pass
|
||||
|
||||
304
server/api_server/mcp_routes.py
Normal file
304
server/api_server/mcp_routes.py
Normal file
@@ -0,0 +1,304 @@
|
||||
"""MCP bridge routes exposing NetAlertX tool endpoints via JSON-RPC."""
|
||||
|
||||
import json
|
||||
import uuid
|
||||
import queue
|
||||
import requests
|
||||
import threading
|
||||
import logging
|
||||
from flask import Blueprint, request, Response, stream_with_context, jsonify
|
||||
from helper import get_setting_value
|
||||
|
||||
mcp_bp = Blueprint('mcp', __name__)
|
||||
|
||||
# Store active sessions: session_id -> Queue
|
||||
sessions = {}
|
||||
sessions_lock = threading.Lock()
|
||||
|
||||
# Cache for OpenAPI spec to avoid fetching on every request
|
||||
openapi_spec_cache = None
|
||||
|
||||
BACKEND_PORT = get_setting_value("GRAPHQL_PORT")
|
||||
|
||||
API_BASE_URL = f"http://localhost:{BACKEND_PORT}/api/tools"
|
||||
|
||||
|
||||
def get_openapi_spec():
|
||||
"""Fetch and cache the tools OpenAPI specification from the local API server."""
|
||||
global openapi_spec_cache
|
||||
if openapi_spec_cache:
|
||||
return openapi_spec_cache
|
||||
|
||||
try:
|
||||
# Fetch from local server
|
||||
# We use localhost because this code runs on the server
|
||||
response = requests.get(f"{API_BASE_URL}/openapi.json", timeout=10)
|
||||
response.raise_for_status()
|
||||
openapi_spec_cache = response.json()
|
||||
return openapi_spec_cache
|
||||
except Exception as e:
|
||||
print(f"Error fetching OpenAPI spec: {e}")
|
||||
return None
|
||||
|
||||
|
||||
def map_openapi_to_mcp_tools(spec):
|
||||
"""Convert OpenAPI paths into MCP tool descriptors."""
|
||||
tools = []
|
||||
if not spec or "paths" not in spec:
|
||||
return tools
|
||||
|
||||
for path, methods in spec["paths"].items():
|
||||
for method, details in methods.items():
|
||||
if "operationId" in details:
|
||||
tool = {
|
||||
"name": details["operationId"],
|
||||
"description": details.get("description", details.get("summary", "")),
|
||||
"inputSchema": {
|
||||
"type": "object",
|
||||
"properties": {},
|
||||
"required": []
|
||||
}
|
||||
}
|
||||
|
||||
# Extract parameters from requestBody if present
|
||||
if "requestBody" in details:
|
||||
content = details["requestBody"].get("content", {})
|
||||
if "application/json" in content:
|
||||
schema = content["application/json"].get("schema", {})
|
||||
tool["inputSchema"] = schema.copy()
|
||||
if "properties" not in tool["inputSchema"]:
|
||||
tool["inputSchema"]["properties"] = {}
|
||||
if "required" not in tool["inputSchema"]:
|
||||
tool["inputSchema"]["required"] = []
|
||||
|
||||
# Extract parameters from 'parameters' list (query/path params) - simplistic support
|
||||
if "parameters" in details:
|
||||
for param in details["parameters"]:
|
||||
if param.get("in") == "query":
|
||||
tool["inputSchema"]["properties"][param["name"]] = {
|
||||
"type": param.get("schema", {}).get("type", "string"),
|
||||
"description": param.get("description", "")
|
||||
}
|
||||
if param.get("required"):
|
||||
if "required" not in tool["inputSchema"]:
|
||||
tool["inputSchema"]["required"] = []
|
||||
tool["inputSchema"]["required"].append(param["name"])
|
||||
|
||||
tools.append(tool)
|
||||
return tools
|
||||
|
||||
|
||||
def process_mcp_request(data):
|
||||
"""Handle incoming MCP JSON-RPC requests and route them to tools."""
|
||||
method = data.get("method")
|
||||
msg_id = data.get("id")
|
||||
|
||||
response = None
|
||||
|
||||
if method == "initialize":
|
||||
response = {
|
||||
"jsonrpc": "2.0",
|
||||
"id": msg_id,
|
||||
"result": {
|
||||
"protocolVersion": "2024-11-05",
|
||||
"capabilities": {
|
||||
"tools": {}
|
||||
},
|
||||
"serverInfo": {
|
||||
"name": "NetAlertX",
|
||||
"version": "1.0.0"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
elif method == "notifications/initialized":
|
||||
# No response needed for notification
|
||||
pass
|
||||
|
||||
elif method == "tools/list":
|
||||
spec = get_openapi_spec()
|
||||
tools = map_openapi_to_mcp_tools(spec)
|
||||
response = {
|
||||
"jsonrpc": "2.0",
|
||||
"id": msg_id,
|
||||
"result": {
|
||||
"tools": tools
|
||||
}
|
||||
}
|
||||
|
||||
elif method == "tools/call":
|
||||
params = data.get("params", {})
|
||||
tool_name = params.get("name")
|
||||
tool_args = params.get("arguments", {})
|
||||
|
||||
# Find the endpoint for this tool
|
||||
spec = get_openapi_spec()
|
||||
target_path = None
|
||||
target_method = None
|
||||
|
||||
if spec and "paths" in spec:
|
||||
for path, methods in spec["paths"].items():
|
||||
for m, details in methods.items():
|
||||
if details.get("operationId") == tool_name:
|
||||
target_path = path
|
||||
target_method = m.upper()
|
||||
break
|
||||
if target_path:
|
||||
break
|
||||
|
||||
if target_path:
|
||||
try:
|
||||
# Make the request to the local API
|
||||
# We forward the Authorization header from the incoming request if present
|
||||
headers = {
|
||||
"Content-Type": "application/json"
|
||||
}
|
||||
|
||||
if "Authorization" in request.headers:
|
||||
headers["Authorization"] = request.headers["Authorization"]
|
||||
|
||||
url = f"{API_BASE_URL}{target_path}"
|
||||
|
||||
if target_method == "POST":
|
||||
api_res = requests.post(url, json=tool_args, headers=headers, timeout=30)
|
||||
elif target_method == "GET":
|
||||
api_res = requests.get(url, params=tool_args, headers=headers, timeout=30)
|
||||
else:
|
||||
api_res = None
|
||||
|
||||
if api_res:
|
||||
content = []
|
||||
try:
|
||||
json_content = api_res.json()
|
||||
content.append({
|
||||
"type": "text",
|
||||
"text": json.dumps(json_content, indent=2)
|
||||
})
|
||||
except (ValueError, json.JSONDecodeError):
|
||||
content.append({
|
||||
"type": "text",
|
||||
"text": api_res.text
|
||||
})
|
||||
|
||||
is_error = api_res.status_code >= 400
|
||||
response = {
|
||||
"jsonrpc": "2.0",
|
||||
"id": msg_id,
|
||||
"result": {
|
||||
"content": content,
|
||||
"isError": is_error
|
||||
}
|
||||
}
|
||||
else:
|
||||
response = {
|
||||
"jsonrpc": "2.0",
|
||||
"id": msg_id,
|
||||
"error": {"code": -32601, "message": f"Method {target_method} not supported"}
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
response = {
|
||||
"jsonrpc": "2.0",
|
||||
"id": msg_id,
|
||||
"result": {
|
||||
"content": [{"type": "text", "text": f"Error calling tool: {str(e)}"}],
|
||||
"isError": True
|
||||
}
|
||||
}
|
||||
else:
|
||||
response = {
|
||||
"jsonrpc": "2.0",
|
||||
"id": msg_id,
|
||||
"error": {"code": -32601, "message": f"Tool {tool_name} not found"}
|
||||
}
|
||||
|
||||
elif method == "ping":
|
||||
response = {
|
||||
"jsonrpc": "2.0",
|
||||
"id": msg_id,
|
||||
"result": {}
|
||||
}
|
||||
|
||||
else:
|
||||
# Unknown method
|
||||
if msg_id: # Only respond if it's a request (has id)
|
||||
response = {
|
||||
"jsonrpc": "2.0",
|
||||
"id": msg_id,
|
||||
"error": {"code": -32601, "message": "Method not found"}
|
||||
}
|
||||
|
||||
return response
|
||||
|
||||
|
||||
@mcp_bp.route('/sse', methods=['GET', 'POST'])
|
||||
def handle_sse():
|
||||
"""Expose an SSE endpoint that streams MCP responses to connected clients."""
|
||||
if request.method == 'POST':
|
||||
# Handle verification or keep-alive pings
|
||||
try:
|
||||
data = request.get_json(silent=True)
|
||||
if data and "method" in data and "jsonrpc" in data:
|
||||
response = process_mcp_request(data)
|
||||
if response:
|
||||
return jsonify(response)
|
||||
else:
|
||||
# Notification or no response needed
|
||||
return "", 202
|
||||
except Exception as e:
|
||||
# Log but don't fail - malformed requests shouldn't crash the endpoint
|
||||
logging.getLogger(__name__).debug(f"SSE POST processing error: {e}")
|
||||
|
||||
return jsonify({"status": "ok", "message": "MCP SSE endpoint active"}), 200
|
||||
|
||||
session_id = uuid.uuid4().hex
|
||||
q = queue.Queue()
|
||||
|
||||
with sessions_lock:
|
||||
sessions[session_id] = q
|
||||
|
||||
def stream():
|
||||
"""Yield SSE messages for queued MCP responses until the client disconnects."""
|
||||
# Send the endpoint event
|
||||
# The client should POST to /api/mcp/messages?session_id=<session_id>
|
||||
yield f"event: endpoint\ndata: /api/mcp/messages?session_id={session_id}\n\n"
|
||||
|
||||
try:
|
||||
while True:
|
||||
try:
|
||||
# Wait for messages
|
||||
message = q.get(timeout=20) # Keep-alive timeout
|
||||
yield f"event: message\ndata: {json.dumps(message)}\n\n"
|
||||
except queue.Empty:
|
||||
# Send keep-alive comment
|
||||
yield ": keep-alive\n\n"
|
||||
except GeneratorExit:
|
||||
with sessions_lock:
|
||||
if session_id in sessions:
|
||||
del sessions[session_id]
|
||||
|
||||
return Response(stream_with_context(stream()), mimetype='text/event-stream')
|
||||
|
||||
|
||||
@mcp_bp.route('/messages', methods=['POST'])
|
||||
def handle_messages():
|
||||
"""Receive MCP JSON-RPC messages and enqueue responses for an SSE session."""
|
||||
session_id = request.args.get('session_id')
|
||||
if not session_id:
|
||||
return jsonify({"error": "Missing session_id"}), 400
|
||||
|
||||
with sessions_lock:
|
||||
if session_id not in sessions:
|
||||
return jsonify({"error": "Session not found"}), 404
|
||||
q = sessions[session_id]
|
||||
|
||||
data = request.json
|
||||
if not data:
|
||||
return jsonify({"error": "Invalid JSON"}), 400
|
||||
|
||||
response = process_mcp_request(data)
|
||||
|
||||
if response:
|
||||
q.put(response)
|
||||
|
||||
return jsonify({"status": "accepted"}), 202
|
||||
686
server/api_server/tools_routes.py
Normal file
686
server/api_server/tools_routes.py
Normal file
@@ -0,0 +1,686 @@
|
||||
import subprocess
|
||||
import re
|
||||
from datetime import datetime, timedelta
|
||||
from flask import Blueprint, request, jsonify
|
||||
import sqlite3
|
||||
from helper import get_setting_value
|
||||
from database import get_temp_db_connection
|
||||
|
||||
tools_bp = Blueprint('tools', __name__)
|
||||
|
||||
|
||||
def check_auth():
|
||||
"""Check API_TOKEN authorization."""
|
||||
token = request.headers.get("Authorization")
|
||||
expected_token = f"Bearer {get_setting_value('API_TOKEN')}"
|
||||
return token == expected_token
|
||||
|
||||
|
||||
@tools_bp.route('/trigger_scan', methods=['POST'])
|
||||
def trigger_scan():
|
||||
"""
|
||||
Forces NetAlertX to run a specific scan type immediately.
|
||||
Arguments: scan_type (Enum: arp, nmap_fast, nmap_deep), target (optional IP/CIDR)
|
||||
"""
|
||||
if not check_auth():
|
||||
return jsonify({"error": "Unauthorized"}), 401
|
||||
|
||||
data = request.get_json()
|
||||
scan_type = data.get('scan_type', 'nmap_fast')
|
||||
target = data.get('target')
|
||||
|
||||
# Validate scan_type
|
||||
if scan_type not in ['arp', 'nmap_fast', 'nmap_deep']:
|
||||
return jsonify({"error": "Invalid scan_type. Must be 'arp', 'nmap_fast', or 'nmap_deep'"}), 400
|
||||
|
||||
# Determine command
|
||||
cmd = []
|
||||
if scan_type == 'arp':
|
||||
# ARP scan usually requires sudo or root, assuming container runs as root or has caps
|
||||
cmd = ["arp-scan", "--localnet", "--interface=eth0"] # Defaulting to eth0, might need detection
|
||||
if target:
|
||||
cmd = ["arp-scan", target]
|
||||
elif scan_type == 'nmap_fast':
|
||||
cmd = ["nmap", "-F"]
|
||||
if target:
|
||||
cmd.append(target)
|
||||
else:
|
||||
# Default to local subnet if possible, or error if not easily determined
|
||||
# For now, let's require target for nmap if not easily deducible,
|
||||
# or try to get it from settings.
|
||||
# NetAlertX usually knows its subnet.
|
||||
# Let's try to get the scan subnet from settings if not provided.
|
||||
scan_subnets = get_setting_value("SCAN_SUBNETS")
|
||||
if scan_subnets:
|
||||
# Take the first one for now
|
||||
cmd.append(scan_subnets.split(',')[0].strip())
|
||||
else:
|
||||
return jsonify({"error": "Target is required and no default SCAN_SUBNETS found"}), 400
|
||||
elif scan_type == 'nmap_deep':
|
||||
cmd = ["nmap", "-A", "-T4"]
|
||||
if target:
|
||||
cmd.append(target)
|
||||
else:
|
||||
scan_subnets = get_setting_value("SCAN_SUBNETS")
|
||||
if scan_subnets:
|
||||
cmd.append(scan_subnets.split(',')[0].strip())
|
||||
else:
|
||||
return jsonify({"error": "Target is required and no default SCAN_SUBNETS found"}), 400
|
||||
|
||||
try:
|
||||
# Run the command
|
||||
result = subprocess.run(
|
||||
cmd,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
check=True
|
||||
)
|
||||
return jsonify({
|
||||
"success": True,
|
||||
"scan_type": scan_type,
|
||||
"command": " ".join(cmd),
|
||||
"output": result.stdout.strip().split('\n')
|
||||
})
|
||||
except subprocess.CalledProcessError as e:
|
||||
return jsonify({
|
||||
"success": False,
|
||||
"error": "Scan failed",
|
||||
"details": e.stderr.strip()
|
||||
}), 500
|
||||
except Exception as e:
|
||||
return jsonify({"error": str(e)}), 500
|
||||
|
||||
|
||||
@tools_bp.route('/list_devices', methods=['POST'])
|
||||
def list_devices():
|
||||
"""List all devices."""
|
||||
if not check_auth():
|
||||
return jsonify({"error": "Unauthorized"}), 401
|
||||
|
||||
conn = get_temp_db_connection()
|
||||
conn.row_factory = sqlite3.Row
|
||||
cur = conn.cursor()
|
||||
|
||||
try:
|
||||
cur.execute("SELECT devName, devMac, devLastIP as devIP, devVendor, devFirstConnection, devLastConnection FROM Devices ORDER BY devFirstConnection DESC")
|
||||
rows = cur.fetchall()
|
||||
devices = [dict(row) for row in rows]
|
||||
return jsonify(devices)
|
||||
except Exception as e:
|
||||
return jsonify({"error": str(e)}), 500
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
|
||||
@tools_bp.route('/get_device_info', methods=['POST'])
|
||||
def get_device_info():
|
||||
"""Get detailed info for a specific device."""
|
||||
if not check_auth():
|
||||
return jsonify({"error": "Unauthorized"}), 401
|
||||
|
||||
data = request.get_json()
|
||||
if not data or 'query' not in data:
|
||||
return jsonify({"error": "Missing 'query' parameter"}), 400
|
||||
|
||||
query = data['query']
|
||||
|
||||
conn = get_temp_db_connection()
|
||||
conn.row_factory = sqlite3.Row
|
||||
cur = conn.cursor()
|
||||
|
||||
try:
|
||||
# Search by MAC, Name, or partial IP
|
||||
sql = "SELECT * FROM Devices WHERE devMac LIKE ? OR devName LIKE ? OR devLastIP LIKE ?"
|
||||
cur.execute(sql, (f"%{query}%", f"%{query}%", f"%{query}%"))
|
||||
rows = cur.fetchall()
|
||||
|
||||
if not rows:
|
||||
return jsonify({"message": "No devices found"}), 404
|
||||
|
||||
devices = [dict(row) for row in rows]
|
||||
return jsonify(devices)
|
||||
except Exception as e:
|
||||
return jsonify({"error": str(e)}), 500
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
|
||||
@tools_bp.route('/get_latest_device', methods=['POST'])
|
||||
def get_latest_device():
|
||||
"""Get full details of the most recently discovered device."""
|
||||
if not check_auth():
|
||||
return jsonify({"error": "Unauthorized"}), 401
|
||||
|
||||
conn = get_temp_db_connection()
|
||||
conn.row_factory = sqlite3.Row
|
||||
cur = conn.cursor()
|
||||
|
||||
try:
|
||||
# Get the device with the most recent devFirstConnection
|
||||
cur.execute("SELECT * FROM Devices ORDER BY devFirstConnection DESC LIMIT 1")
|
||||
row = cur.fetchone()
|
||||
|
||||
if not row:
|
||||
return jsonify({"message": "No devices found"}), 404
|
||||
|
||||
# Return as a list to be consistent with other endpoints
|
||||
return jsonify([dict(row)])
|
||||
except Exception as e:
|
||||
return jsonify({"error": str(e)}), 500
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
|
||||
@tools_bp.route('/get_open_ports', methods=['POST'])
|
||||
def get_open_ports():
|
||||
"""
|
||||
Specific query for the port-scan results of a target.
|
||||
Arguments: target (IP or MAC)
|
||||
"""
|
||||
if not check_auth():
|
||||
return jsonify({"error": "Unauthorized"}), 401
|
||||
|
||||
data = request.get_json()
|
||||
target = data.get('target')
|
||||
|
||||
if not target:
|
||||
return jsonify({"error": "Target is required"}), 400
|
||||
|
||||
# If MAC is provided, try to resolve to IP
|
||||
if re.match(r"^([0-9A-Fa-f]{2}[:-]){5}([0-9A-Fa-f]{2})$", target):
|
||||
conn = get_temp_db_connection()
|
||||
conn.row_factory = sqlite3.Row
|
||||
cur = conn.cursor()
|
||||
try:
|
||||
cur.execute("SELECT devLastIP FROM Devices WHERE devMac = ?", (target,))
|
||||
row = cur.fetchone()
|
||||
if row and row['devLastIP']:
|
||||
target = row['devLastIP']
|
||||
else:
|
||||
return jsonify({"error": f"Could not resolve IP for MAC {target}"}), 404
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
try:
|
||||
# Run nmap -F for fast port scan
|
||||
cmd = ["nmap", "-F", target]
|
||||
result = subprocess.run(
|
||||
cmd,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
check=True,
|
||||
timeout=120
|
||||
)
|
||||
|
||||
# Parse output for open ports
|
||||
open_ports = []
|
||||
for line in result.stdout.split('\n'):
|
||||
if '/tcp' in line and 'open' in line:
|
||||
parts = line.split('/')
|
||||
port = parts[0].strip()
|
||||
service = line.split()[2] if len(line.split()) > 2 else "unknown"
|
||||
open_ports.append({"port": int(port), "service": service})
|
||||
|
||||
return jsonify({
|
||||
"success": True,
|
||||
"target": target,
|
||||
"open_ports": open_ports,
|
||||
"raw_output": result.stdout.strip().split('\n')
|
||||
})
|
||||
|
||||
except subprocess.CalledProcessError as e:
|
||||
return jsonify({"success": False, "error": "Port scan failed", "details": e.stderr.strip()}), 500
|
||||
except Exception as e:
|
||||
return jsonify({"error": str(e)}), 500
|
||||
|
||||
|
||||
@tools_bp.route('/get_network_topology', methods=['GET'])
|
||||
def get_network_topology():
|
||||
"""
|
||||
Returns the "Parent/Child" relationships.
|
||||
"""
|
||||
if not check_auth():
|
||||
return jsonify({"error": "Unauthorized"}), 401
|
||||
|
||||
conn = get_temp_db_connection()
|
||||
conn.row_factory = sqlite3.Row
|
||||
cur = conn.cursor()
|
||||
|
||||
try:
|
||||
cur.execute("SELECT devName, devMac, devParentMAC, devParentPort, devVendor FROM Devices")
|
||||
rows = cur.fetchall()
|
||||
|
||||
nodes = []
|
||||
links = []
|
||||
|
||||
for row in rows:
|
||||
nodes.append({
|
||||
"id": row['devMac'],
|
||||
"name": row['devName'],
|
||||
"vendor": row['devVendor']
|
||||
})
|
||||
if row['devParentMAC']:
|
||||
links.append({
|
||||
"source": row['devParentMAC'],
|
||||
"target": row['devMac'],
|
||||
"port": row['devParentPort']
|
||||
})
|
||||
|
||||
return jsonify({
|
||||
"nodes": nodes,
|
||||
"links": links
|
||||
})
|
||||
except Exception as e:
|
||||
return jsonify({"error": str(e)}), 500
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
|
||||
@tools_bp.route('/get_recent_alerts', methods=['POST'])
|
||||
def get_recent_alerts():
|
||||
"""
|
||||
Fetches the last N system alerts.
|
||||
Arguments: hours (lookback period, default 24)
|
||||
"""
|
||||
if not check_auth():
|
||||
return jsonify({"error": "Unauthorized"}), 401
|
||||
|
||||
data = request.get_json()
|
||||
hours = data.get('hours', 24)
|
||||
|
||||
conn = get_temp_db_connection()
|
||||
conn.row_factory = sqlite3.Row
|
||||
cur = conn.cursor()
|
||||
|
||||
try:
|
||||
# Calculate cutoff time
|
||||
cutoff = datetime.now() - timedelta(hours=int(hours))
|
||||
cutoff_str = cutoff.strftime('%Y-%m-%d %H:%M:%S')
|
||||
|
||||
cur.execute("""
|
||||
SELECT eve_DateTime, eve_EventType, eve_MAC, eve_IP, devName
|
||||
FROM Events
|
||||
LEFT JOIN Devices ON Events.eve_MAC = Devices.devMac
|
||||
WHERE eve_DateTime > ?
|
||||
ORDER BY eve_DateTime DESC
|
||||
""", (cutoff_str,))
|
||||
|
||||
rows = cur.fetchall()
|
||||
alerts = [dict(row) for row in rows]
|
||||
|
||||
return jsonify(alerts)
|
||||
except Exception as e:
|
||||
return jsonify({"error": str(e)}), 500
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
|
||||
@tools_bp.route('/set_device_alias', methods=['POST'])
|
||||
def set_device_alias():
|
||||
"""
|
||||
Updates the name (alias) of a device.
|
||||
Arguments: mac, alias
|
||||
"""
|
||||
if not check_auth():
|
||||
return jsonify({"error": "Unauthorized"}), 401
|
||||
|
||||
data = request.get_json()
|
||||
mac = data.get('mac')
|
||||
alias = data.get('alias')
|
||||
|
||||
if not mac or not alias:
|
||||
return jsonify({"error": "MAC and Alias are required"}), 400
|
||||
|
||||
conn = get_temp_db_connection()
|
||||
cur = conn.cursor()
|
||||
|
||||
try:
|
||||
cur.execute("UPDATE Devices SET devName = ? WHERE devMac = ?", (alias, mac))
|
||||
conn.commit()
|
||||
|
||||
if cur.rowcount == 0:
|
||||
return jsonify({"error": "Device not found"}), 404
|
||||
|
||||
return jsonify({"success": True, "message": f"Device {mac} renamed to {alias}"})
|
||||
except Exception as e:
|
||||
return jsonify({"error": str(e)}), 500
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
|
||||
@tools_bp.route('/wol_wake_device', methods=['POST'])
|
||||
def wol_wake_device():
|
||||
"""
|
||||
Sends a Wake-on-LAN magic packet.
|
||||
Arguments: mac OR ip
|
||||
"""
|
||||
if not check_auth():
|
||||
return jsonify({"error": "Unauthorized"}), 401
|
||||
|
||||
data = request.get_json()
|
||||
mac = data.get('mac')
|
||||
ip = data.get('ip')
|
||||
|
||||
if not mac and not ip:
|
||||
return jsonify({"error": "MAC address or IP address is required"}), 400
|
||||
|
||||
# Resolve IP to MAC if MAC is missing
|
||||
if not mac and ip:
|
||||
conn = get_temp_db_connection()
|
||||
conn.row_factory = sqlite3.Row
|
||||
cur = conn.cursor()
|
||||
try:
|
||||
# Try to find device by IP (devLastIP)
|
||||
cur.execute("SELECT devMac FROM Devices WHERE devLastIP = ?", (ip,))
|
||||
row = cur.fetchone()
|
||||
if row and row['devMac']:
|
||||
mac = row['devMac']
|
||||
else:
|
||||
return jsonify({"error": f"Could not resolve MAC for IP {ip}"}), 404
|
||||
except Exception as e:
|
||||
return jsonify({"error": f"Database error: {str(e)}"}), 500
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
# Validate MAC
|
||||
if not re.match(r"^([0-9A-Fa-f]{2}[:-]){5}([0-9A-Fa-f]{2})$", mac):
|
||||
return jsonify({"success": False, "error": f"Invalid MAC: {mac}"}), 400
|
||||
|
||||
try:
|
||||
# Using wakeonlan command
|
||||
result = subprocess.run(
|
||||
["wakeonlan", mac], capture_output=True, text=True, check=True, timeout=10
|
||||
)
|
||||
return jsonify(
|
||||
{
|
||||
"success": True,
|
||||
"message": f"WOL packet sent to {mac}",
|
||||
"output": result.stdout.strip(),
|
||||
}
|
||||
)
|
||||
except subprocess.CalledProcessError as e:
|
||||
return jsonify(
|
||||
{
|
||||
"success": False,
|
||||
"error": "Failed to send WOL packet",
|
||||
"details": e.stderr.strip(),
|
||||
}
|
||||
), 500
|
||||
|
||||
|
||||
@tools_bp.route('/openapi.json', methods=['GET'])
|
||||
def openapi_spec():
|
||||
"""Return OpenAPI specification for tools."""
|
||||
# No auth required for spec to allow easy import, or require it if preferred.
|
||||
# Open WebUI usually needs to fetch spec without auth first or handles it.
|
||||
# We'll allow public access to spec for simplicity of import.
|
||||
|
||||
spec = {
|
||||
"openapi": "3.0.0",
|
||||
"info": {
|
||||
"title": "NetAlertX Tools",
|
||||
"description": "API for NetAlertX device management tools",
|
||||
"version": "1.1.0"
|
||||
},
|
||||
"servers": [
|
||||
{"url": "/api/tools"}
|
||||
],
|
||||
"paths": {
|
||||
"/list_devices": {
|
||||
"post": {
|
||||
"summary": "List all devices (Summary)",
|
||||
"description": (
|
||||
"Retrieve a SUMMARY list of all devices, sorted by newest first. "
|
||||
"IMPORTANT: This only provides basic info (Name, IP, Vendor). "
|
||||
"For FULL details (like custom props, alerts, etc.), you MUST use 'get_device_info' or 'get_latest_device'."
|
||||
),
|
||||
"operationId": "list_devices",
|
||||
"responses": {
|
||||
"200": {
|
||||
"description": "List of devices (Summary)",
|
||||
"content": {
|
||||
"application/json": {
|
||||
"schema": {
|
||||
"type": "array",
|
||||
"items": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"devName": {"type": "string"},
|
||||
"devMac": {"type": "string"},
|
||||
"devIP": {"type": "string"},
|
||||
"devVendor": {"type": "string"},
|
||||
"devStatus": {"type": "string"},
|
||||
"devFirstConnection": {"type": "string"},
|
||||
"devLastConnection": {"type": "string"}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"/get_device_info": {
|
||||
"post": {
|
||||
"summary": "Get device info (Full Details)",
|
||||
"description": (
|
||||
"Get COMPREHENSIVE information about a specific device by MAC, Name, or partial IP. "
|
||||
"Use this to see all available properties, alerts, and metadata not shown in the list."
|
||||
),
|
||||
"operationId": "get_device_info",
|
||||
"requestBody": {
|
||||
"required": True,
|
||||
"content": {
|
||||
"application/json": {
|
||||
"schema": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"query": {
|
||||
"type": "string",
|
||||
"description": "MAC address, Device Name, or partial IP to search for"
|
||||
}
|
||||
},
|
||||
"required": ["query"]
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"responses": {
|
||||
"200": {
|
||||
"description": "Device details (Full)",
|
||||
"content": {
|
||||
"application/json": {
|
||||
"schema": {
|
||||
"type": "array",
|
||||
"items": {"type": "object"}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"404": {"description": "Device not found"}
|
||||
}
|
||||
}
|
||||
},
|
||||
"/get_latest_device": {
|
||||
"post": {
|
||||
"summary": "Get latest device (Full Details)",
|
||||
"description": "Get COMPREHENSIVE information about the most recently discovered device (latest devFirstConnection).",
|
||||
"operationId": "get_latest_device",
|
||||
"responses": {
|
||||
"200": {
|
||||
"description": "Latest device details (Full)",
|
||||
"content": {
|
||||
"application/json": {
|
||||
"schema": {
|
||||
"type": "array",
|
||||
"items": {"type": "object"}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"404": {"description": "No devices found"}
|
||||
}
|
||||
}
|
||||
},
|
||||
"/trigger_scan": {
|
||||
"post": {
|
||||
"summary": "Trigger Active Scan",
|
||||
"description": "Forces NetAlertX to run a specific scan type immediately.",
|
||||
"operationId": "trigger_scan",
|
||||
"requestBody": {
|
||||
"required": True,
|
||||
"content": {
|
||||
"application/json": {
|
||||
"schema": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"scan_type": {
|
||||
"type": "string",
|
||||
"enum": ["arp", "nmap_fast", "nmap_deep"],
|
||||
"default": "nmap_fast"
|
||||
},
|
||||
"target": {
|
||||
"type": "string",
|
||||
"description": "IP address or CIDR to scan"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"responses": {
|
||||
"200": {"description": "Scan started/completed successfully"},
|
||||
"400": {"description": "Invalid input"}
|
||||
}
|
||||
}
|
||||
},
|
||||
"/get_open_ports": {
|
||||
"post": {
|
||||
"summary": "Get Open Ports",
|
||||
"description": "Specific query for the port-scan results of a target.",
|
||||
"operationId": "get_open_ports",
|
||||
"requestBody": {
|
||||
"required": True,
|
||||
"content": {
|
||||
"application/json": {
|
||||
"schema": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"target": {
|
||||
"type": "string",
|
||||
"description": "IP or MAC address"
|
||||
}
|
||||
},
|
||||
"required": ["target"]
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"responses": {
|
||||
"200": {"description": "List of open ports"},
|
||||
"404": {"description": "Target not found"}
|
||||
}
|
||||
}
|
||||
},
|
||||
"/get_network_topology": {
|
||||
"get": {
|
||||
"summary": "Get Network Topology",
|
||||
"description": "Returns the Parent/Child relationships for network visualization.",
|
||||
"operationId": "get_network_topology",
|
||||
"responses": {
|
||||
"200": {"description": "Graph data (nodes and links)"}
|
||||
}
|
||||
}
|
||||
},
|
||||
"/get_recent_alerts": {
|
||||
"post": {
|
||||
"summary": "Get Recent Alerts",
|
||||
"description": "Fetches the last N system alerts.",
|
||||
"operationId": "get_recent_alerts",
|
||||
"requestBody": {
|
||||
"content": {
|
||||
"application/json": {
|
||||
"schema": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"hours": {
|
||||
"type": "integer",
|
||||
"default": 24
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"responses": {
|
||||
"200": {"description": "List of alerts"}
|
||||
}
|
||||
}
|
||||
},
|
||||
"/set_device_alias": {
|
||||
"post": {
|
||||
"summary": "Set Device Alias",
|
||||
"description": "Updates the name (alias) of a device.",
|
||||
"operationId": "set_device_alias",
|
||||
"requestBody": {
|
||||
"required": True,
|
||||
"content": {
|
||||
"application/json": {
|
||||
"schema": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"mac": {"type": "string"},
|
||||
"alias": {"type": "string"}
|
||||
},
|
||||
"required": ["mac", "alias"]
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"responses": {
|
||||
"200": {"description": "Alias updated"},
|
||||
"404": {"description": "Device not found"}
|
||||
}
|
||||
}
|
||||
},
|
||||
"/wol_wake_device": {
|
||||
"post": {
|
||||
"summary": "Wake on LAN",
|
||||
"description": "Sends a Wake-on-LAN magic packet to the target MAC or IP. If IP is provided, it resolves to MAC first.",
|
||||
"operationId": "wol_wake_device",
|
||||
"requestBody": {
|
||||
"required": True,
|
||||
"content": {
|
||||
"application/json": {
|
||||
"schema": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"mac": {"type": "string", "description": "Target MAC address"},
|
||||
"ip": {"type": "string", "description": "Target IP address (resolves to MAC)"}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"responses": {
|
||||
"200": {"description": "WOL packet sent"},
|
||||
"404": {"description": "IP not found"}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"components": {
|
||||
"securitySchemes": {
|
||||
"bearerAuth": {
|
||||
"type": "http",
|
||||
"scheme": "bearer",
|
||||
"bearerFormat": "JWT"
|
||||
}
|
||||
}
|
||||
},
|
||||
"security": [
|
||||
{"bearerAuth": []}
|
||||
]
|
||||
}
|
||||
return jsonify(spec)
|
||||
@@ -57,6 +57,44 @@ class DeviceInstance:
|
||||
result = self.db.sql.fetchone()
|
||||
return result["count"] > 0
|
||||
|
||||
# Get a device by its last IP address
|
||||
def getByIP(self, ip):
|
||||
self.db.sql.execute("SELECT * FROM Devices WHERE devLastIP = ?", (ip,))
|
||||
row = self.db.sql.fetchone()
|
||||
return dict(row) if row else None
|
||||
|
||||
# Search devices by partial mac, name or IP
|
||||
def search(self, query):
|
||||
like = f"%{query}%"
|
||||
self.db.sql.execute(
|
||||
"SELECT * FROM Devices WHERE devMac LIKE ? OR devName LIKE ? OR devLastIP LIKE ?",
|
||||
(like, like, like),
|
||||
)
|
||||
rows = self.db.sql.fetchall()
|
||||
return [dict(r) for r in rows]
|
||||
|
||||
# Get the most recently discovered device
|
||||
def getLatest(self):
|
||||
self.db.sql.execute("SELECT * FROM Devices ORDER BY devFirstConnection DESC LIMIT 1")
|
||||
row = self.db.sql.fetchone()
|
||||
return dict(row) if row else None
|
||||
|
||||
def getNetworkTopology(self):
|
||||
"""Returns nodes and links for the current Devices table.
|
||||
|
||||
Nodes: {id, name, vendor}
|
||||
Links: {source, target, port}
|
||||
"""
|
||||
self.db.sql.execute("SELECT devName, devMac, devParentMAC, devParentPort, devVendor FROM Devices")
|
||||
rows = self.db.sql.fetchall()
|
||||
nodes = []
|
||||
links = []
|
||||
for row in rows:
|
||||
nodes.append({"id": row['devMac'], "name": row['devName'], "vendor": row['devVendor']})
|
||||
if row['devParentMAC']:
|
||||
links.append({"source": row['devParentMAC'], "target": row['devMac'], "port": row['devParentPort']})
|
||||
return {"nodes": nodes, "links": links}
|
||||
|
||||
# Update a specific field for a device
|
||||
def updateField(self, devGUID, field, value):
|
||||
if not self.exists(devGUID):
|
||||
|
||||
@@ -37,6 +37,15 @@ class PluginObjectInstance:
|
||||
self.db.sql.execute("SELECT * FROM Plugins_Objects WHERE Plugin = ?", (plugin,))
|
||||
return self.db.sql.fetchall()
|
||||
|
||||
# Get plugin objects by primary ID and plugin name
|
||||
def getByPrimary(self, plugin, primary_id):
|
||||
self.db.sql.execute(
|
||||
"SELECT * FROM Plugins_Objects WHERE Plugin = ? AND Object_PrimaryID = ?",
|
||||
(plugin, primary_id),
|
||||
)
|
||||
rows = self.db.sql.fetchall()
|
||||
return [dict(r) for r in rows]
|
||||
|
||||
# Get objects by status
|
||||
def getByStatus(self, status):
|
||||
self.db.sql.execute("SELECT * FROM Plugins_Objects WHERE Status = ?", (status,))
|
||||
|
||||
66
test/api_endpoints/test_auth_endpoints.py
Normal file
66
test/api_endpoints/test_auth_endpoints.py
Normal file
@@ -0,0 +1,66 @@
|
||||
# tests/test_auth.py
|
||||
|
||||
import sys
|
||||
import os
|
||||
import pytest
|
||||
|
||||
# Register NetAlertX directories
|
||||
INSTALL_PATH = os.getenv("NETALERTX_APP", "/app")
|
||||
sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
|
||||
|
||||
from helper import get_setting_value # noqa: E402
|
||||
from api_server.api_server_start import app # noqa: E402
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def api_token():
|
||||
"""Load API token from system settings (same as other tests)."""
|
||||
return get_setting_value("API_TOKEN")
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def client():
|
||||
"""Flask test client."""
|
||||
with app.test_client() as client:
|
||||
yield client
|
||||
|
||||
|
||||
def auth_headers(token):
|
||||
return {"Authorization": f"Bearer {token}"}
|
||||
|
||||
|
||||
# -------------------------
|
||||
# AUTH ENDPOINT TESTS
|
||||
# -------------------------
|
||||
|
||||
def test_auth_ok(client, api_token):
|
||||
"""Valid token should allow access."""
|
||||
resp = client.get("/auth", headers=auth_headers(api_token))
|
||||
assert resp.status_code == 200
|
||||
|
||||
data = resp.get_json()
|
||||
assert data is not None
|
||||
assert data.get("success") is True
|
||||
assert "successful" in data.get("message", "").lower()
|
||||
|
||||
|
||||
def test_auth_missing_token(client):
|
||||
"""Missing token should be forbidden."""
|
||||
resp = client.get("/auth")
|
||||
assert resp.status_code == 403
|
||||
|
||||
data = resp.get_json()
|
||||
assert data is not None
|
||||
assert data.get("success") is False
|
||||
assert "not authorized" in data.get("message", "").lower()
|
||||
|
||||
|
||||
def test_auth_invalid_token(client):
|
||||
"""Invalid bearer token should be forbidden."""
|
||||
resp = client.get("/auth", headers=auth_headers("INVALID-TOKEN"))
|
||||
assert resp.status_code == 403
|
||||
|
||||
data = resp.get_json()
|
||||
assert data is not None
|
||||
assert data.get("success") is False
|
||||
assert "not authorized" in data.get("message", "").lower()
|
||||
287
test/api_endpoints/test_mcp_tools_endpoints.py
Normal file
287
test/api_endpoints/test_mcp_tools_endpoints.py
Normal file
@@ -0,0 +1,287 @@
|
||||
import sys
|
||||
import os
|
||||
import pytest
|
||||
from unittest.mock import patch, MagicMock
|
||||
|
||||
INSTALL_PATH = os.getenv('NETALERTX_APP', '/app')
|
||||
sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
|
||||
|
||||
from helper import get_setting_value # noqa: E402
|
||||
from api_server.api_server_start import app # noqa: E402
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def api_token():
|
||||
return get_setting_value("API_TOKEN")
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def client():
|
||||
with app.test_client() as client:
|
||||
yield client
|
||||
|
||||
|
||||
def auth_headers(token):
|
||||
return {"Authorization": f"Bearer {token}"}
|
||||
|
||||
|
||||
# --- get_device_info Tests ---
|
||||
@patch('api_server.tools_routes.get_temp_db_connection')
|
||||
def test_get_device_info_ip_partial(mock_db_conn, client, api_token):
|
||||
"""Test get_device_info with partial IP search."""
|
||||
mock_cursor = MagicMock()
|
||||
# Mock return of a device with IP ending in .50
|
||||
mock_cursor.fetchall.return_value = [
|
||||
{"devName": "Test Device", "devMac": "AA:BB:CC:DD:EE:FF", "devLastIP": "192.168.1.50"}
|
||||
]
|
||||
mock_db_conn.return_value.cursor.return_value = mock_cursor
|
||||
|
||||
payload = {"query": ".50"}
|
||||
response = client.post('/api/tools/get_device_info',
|
||||
json=payload,
|
||||
headers=auth_headers(api_token))
|
||||
|
||||
assert response.status_code == 200
|
||||
devices = response.get_json()
|
||||
assert len(devices) == 1
|
||||
assert devices[0]["devLastIP"] == "192.168.1.50"
|
||||
|
||||
# Verify SQL query included 3 params (MAC, Name, IP)
|
||||
args, _ = mock_cursor.execute.call_args
|
||||
assert args[0].count("?") == 3
|
||||
assert len(args[1]) == 3
|
||||
|
||||
|
||||
# --- trigger_scan Tests ---
|
||||
@patch('subprocess.run')
|
||||
def test_trigger_scan_nmap_fast(mock_run, client, api_token):
|
||||
"""Test trigger_scan with nmap_fast."""
|
||||
mock_run.return_value = MagicMock(stdout="Scan completed", returncode=0)
|
||||
|
||||
payload = {"scan_type": "nmap_fast", "target": "192.168.1.1"}
|
||||
response = client.post('/api/tools/trigger_scan',
|
||||
json=payload,
|
||||
headers=auth_headers(api_token))
|
||||
|
||||
assert response.status_code == 200
|
||||
data = response.get_json()
|
||||
assert data["success"] is True
|
||||
assert "nmap -F 192.168.1.1" in data["command"]
|
||||
mock_run.assert_called_once()
|
||||
|
||||
|
||||
@patch('subprocess.run')
|
||||
def test_trigger_scan_invalid_type(mock_run, client, api_token):
|
||||
"""Test trigger_scan with invalid scan_type."""
|
||||
payload = {"scan_type": "invalid_type", "target": "192.168.1.1"}
|
||||
response = client.post('/api/tools/trigger_scan',
|
||||
json=payload,
|
||||
headers=auth_headers(api_token))
|
||||
|
||||
assert response.status_code == 400
|
||||
mock_run.assert_not_called()
|
||||
|
||||
# --- get_open_ports Tests ---
|
||||
|
||||
|
||||
@patch('subprocess.run')
|
||||
def test_get_open_ports_ip(mock_run, client, api_token):
|
||||
"""Test get_open_ports with an IP address."""
|
||||
mock_output = """
|
||||
Starting Nmap 7.80 ( https://nmap.org ) at 2023-10-27 10:00 UTC
|
||||
Nmap scan report for 192.168.1.1
|
||||
Host is up (0.0010s latency).
|
||||
Not shown: 98 closed ports
|
||||
PORT STATE SERVICE
|
||||
22/tcp open ssh
|
||||
80/tcp open http
|
||||
Nmap done: 1 IP address (1 host up) scanned in 0.10 seconds
|
||||
"""
|
||||
mock_run.return_value = MagicMock(stdout=mock_output, returncode=0)
|
||||
|
||||
payload = {"target": "192.168.1.1"}
|
||||
response = client.post('/api/tools/get_open_ports',
|
||||
json=payload,
|
||||
headers=auth_headers(api_token))
|
||||
|
||||
assert response.status_code == 200
|
||||
data = response.get_json()
|
||||
assert data["success"] is True
|
||||
assert len(data["open_ports"]) == 2
|
||||
assert data["open_ports"][0]["port"] == 22
|
||||
assert data["open_ports"][1]["service"] == "http"
|
||||
|
||||
|
||||
@patch('api_server.tools_routes.get_temp_db_connection')
|
||||
@patch('subprocess.run')
|
||||
def test_get_open_ports_mac_resolve(mock_run, mock_db_conn, client, api_token):
|
||||
"""Test get_open_ports with a MAC address that resolves to an IP."""
|
||||
# Mock DB to resolve MAC to IP
|
||||
mock_cursor = MagicMock()
|
||||
mock_cursor.fetchone.return_value = {"devLastIP": "192.168.1.50"}
|
||||
mock_db_conn.return_value.cursor.return_value = mock_cursor
|
||||
|
||||
# Mock Nmap output
|
||||
mock_run.return_value = MagicMock(stdout="80/tcp open http", returncode=0)
|
||||
|
||||
payload = {"target": "AA:BB:CC:DD:EE:FF"}
|
||||
response = client.post('/api/tools/get_open_ports',
|
||||
json=payload,
|
||||
headers=auth_headers(api_token))
|
||||
|
||||
assert response.status_code == 200
|
||||
data = response.get_json()
|
||||
assert data["target"] == "192.168.1.50" # Should be resolved IP
|
||||
mock_run.assert_called_once()
|
||||
args, _ = mock_run.call_args
|
||||
assert "192.168.1.50" in args[0]
|
||||
|
||||
|
||||
# --- get_network_topology Tests ---
|
||||
@patch('api_server.tools_routes.get_temp_db_connection')
|
||||
def test_get_network_topology(mock_db_conn, client, api_token):
|
||||
"""Test get_network_topology."""
|
||||
mock_cursor = MagicMock()
|
||||
mock_cursor.fetchall.return_value = [
|
||||
{"devName": "Router", "devMac": "AA:AA:AA:AA:AA:AA", "devParentMAC": None, "devParentPort": None, "devVendor": "VendorA"},
|
||||
{"devName": "Device1", "devMac": "BB:BB:BB:BB:BB:BB", "devParentMAC": "AA:AA:AA:AA:AA:AA", "devParentPort": "eth1", "devVendor": "VendorB"}
|
||||
]
|
||||
mock_db_conn.return_value.cursor.return_value = mock_cursor
|
||||
|
||||
response = client.get('/api/tools/get_network_topology',
|
||||
headers=auth_headers(api_token))
|
||||
|
||||
assert response.status_code == 200
|
||||
data = response.get_json()
|
||||
assert len(data["nodes"]) == 2
|
||||
assert len(data["links"]) == 1
|
||||
assert data["links"][0]["source"] == "AA:AA:AA:AA:AA:AA"
|
||||
assert data["links"][0]["target"] == "BB:BB:BB:BB:BB:BB"
|
||||
|
||||
|
||||
# --- get_recent_alerts Tests ---
|
||||
@patch('api_server.tools_routes.get_temp_db_connection')
|
||||
def test_get_recent_alerts(mock_db_conn, client, api_token):
|
||||
"""Test get_recent_alerts."""
|
||||
mock_cursor = MagicMock()
|
||||
mock_cursor.fetchall.return_value = [
|
||||
{"eve_DateTime": "2023-10-27 10:00:00", "eve_EventType": "New Device", "eve_MAC": "CC:CC:CC:CC:CC:CC", "eve_IP": "192.168.1.100", "devName": "Unknown"}
|
||||
]
|
||||
mock_db_conn.return_value.cursor.return_value = mock_cursor
|
||||
|
||||
payload = {"hours": 24}
|
||||
response = client.post('/api/tools/get_recent_alerts',
|
||||
json=payload,
|
||||
headers=auth_headers(api_token))
|
||||
|
||||
assert response.status_code == 200
|
||||
data = response.get_json()
|
||||
assert len(data) == 1
|
||||
assert data[0]["eve_EventType"] == "New Device"
|
||||
|
||||
|
||||
# --- set_device_alias Tests ---
|
||||
@patch('api_server.tools_routes.get_temp_db_connection')
|
||||
def test_set_device_alias(mock_db_conn, client, api_token):
|
||||
"""Test set_device_alias."""
|
||||
mock_cursor = MagicMock()
|
||||
mock_cursor.rowcount = 1 # Simulate successful update
|
||||
mock_db_conn.return_value.cursor.return_value = mock_cursor
|
||||
|
||||
payload = {"mac": "AA:BB:CC:DD:EE:FF", "alias": "New Name"}
|
||||
response = client.post('/api/tools/set_device_alias',
|
||||
json=payload,
|
||||
headers=auth_headers(api_token))
|
||||
|
||||
assert response.status_code == 200
|
||||
data = response.get_json()
|
||||
assert data["success"] is True
|
||||
|
||||
|
||||
@patch('api_server.tools_routes.get_temp_db_connection')
|
||||
def test_set_device_alias_not_found(mock_db_conn, client, api_token):
|
||||
"""Test set_device_alias when device is not found."""
|
||||
mock_cursor = MagicMock()
|
||||
mock_cursor.rowcount = 0 # Simulate no rows updated
|
||||
mock_db_conn.return_value.cursor.return_value = mock_cursor
|
||||
|
||||
payload = {"mac": "AA:BB:CC:DD:EE:FF", "alias": "New Name"}
|
||||
response = client.post('/api/tools/set_device_alias',
|
||||
json=payload,
|
||||
headers=auth_headers(api_token))
|
||||
|
||||
assert response.status_code == 404
|
||||
|
||||
|
||||
# --- wol_wake_device Tests ---
|
||||
@patch('subprocess.run')
|
||||
def test_wol_wake_device(mock_subprocess, client, api_token):
|
||||
"""Test wol_wake_device."""
|
||||
mock_subprocess.return_value.stdout = "Sending magic packet to 255.255.255.255:9 with AA:BB:CC:DD:EE:FF"
|
||||
mock_subprocess.return_value.returncode = 0
|
||||
|
||||
payload = {"mac": "AA:BB:CC:DD:EE:FF"}
|
||||
response = client.post('/api/tools/wol_wake_device',
|
||||
json=payload,
|
||||
headers=auth_headers(api_token))
|
||||
|
||||
assert response.status_code == 200
|
||||
data = response.get_json()
|
||||
assert data["success"] is True
|
||||
mock_subprocess.assert_called_with(["wakeonlan", "AA:BB:CC:DD:EE:FF"], capture_output=True, text=True, check=True)
|
||||
|
||||
|
||||
@patch('api_server.tools_routes.get_temp_db_connection')
|
||||
@patch('subprocess.run')
|
||||
def test_wol_wake_device_by_ip(mock_subprocess, mock_db_conn, client, api_token):
|
||||
"""Test wol_wake_device with IP address."""
|
||||
# Mock DB for IP resolution
|
||||
mock_cursor = MagicMock()
|
||||
mock_cursor.fetchone.return_value = {"devMac": "AA:BB:CC:DD:EE:FF"}
|
||||
mock_db_conn.return_value.cursor.return_value = mock_cursor
|
||||
|
||||
# Mock subprocess
|
||||
mock_subprocess.return_value.stdout = "Sending magic packet to 255.255.255.255:9 with AA:BB:CC:DD:EE:FF"
|
||||
mock_subprocess.return_value.returncode = 0
|
||||
|
||||
payload = {"ip": "192.168.1.50"}
|
||||
response = client.post('/api/tools/wol_wake_device',
|
||||
json=payload,
|
||||
headers=auth_headers(api_token))
|
||||
|
||||
assert response.status_code == 200
|
||||
data = response.get_json()
|
||||
assert data["success"] is True
|
||||
assert "AA:BB:CC:DD:EE:FF" in data["message"]
|
||||
|
||||
# Verify DB lookup
|
||||
mock_cursor.execute.assert_called_with("SELECT devMac FROM Devices WHERE devLastIP = ?", ("192.168.1.50",))
|
||||
|
||||
# Verify subprocess call
|
||||
mock_subprocess.assert_called_with(["wakeonlan", "AA:BB:CC:DD:EE:FF"], capture_output=True, text=True, check=True)
|
||||
|
||||
|
||||
def test_wol_wake_device_invalid_mac(client, api_token):
|
||||
"""Test wol_wake_device with invalid MAC."""
|
||||
payload = {"mac": "invalid-mac"}
|
||||
response = client.post('/api/tools/wol_wake_device',
|
||||
json=payload,
|
||||
headers=auth_headers(api_token))
|
||||
|
||||
assert response.status_code == 400
|
||||
|
||||
|
||||
# --- openapi_spec Tests ---
|
||||
def test_openapi_spec(client):
|
||||
"""Test openapi_spec endpoint contains new paths."""
|
||||
response = client.get('/api/tools/openapi.json')
|
||||
assert response.status_code == 200
|
||||
spec = response.get_json()
|
||||
|
||||
# Check for new endpoints
|
||||
assert "/trigger_scan" in spec["paths"]
|
||||
assert "/get_open_ports" in spec["paths"]
|
||||
assert "/get_network_topology" in spec["paths"]
|
||||
assert "/get_recent_alerts" in spec["paths"]
|
||||
assert "/set_device_alias" in spec["paths"]
|
||||
assert "/wol_wake_device" in spec["paths"]
|
||||
79
test/api_endpoints/test_tools_endpoints.py
Normal file
79
test/api_endpoints/test_tools_endpoints.py
Normal file
@@ -0,0 +1,79 @@
|
||||
import sys
|
||||
import os
|
||||
import pytest
|
||||
|
||||
INSTALL_PATH = os.getenv('NETALERTX_APP', '/app')
|
||||
sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
|
||||
|
||||
from helper import get_setting_value # noqa: E402 [flake8 lint suppression]
|
||||
from api_server.api_server_start import app # noqa: E402 [flake8 lint suppression]
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def api_token():
|
||||
return get_setting_value("API_TOKEN")
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def client():
|
||||
with app.test_client() as client:
|
||||
yield client
|
||||
|
||||
|
||||
def auth_headers(token):
|
||||
return {"Authorization": f"Bearer {token}"}
|
||||
|
||||
|
||||
def test_openapi_spec(client):
|
||||
"""Test OpenAPI spec endpoint."""
|
||||
response = client.get('/api/tools/openapi.json')
|
||||
assert response.status_code == 200
|
||||
spec = response.get_json()
|
||||
assert "openapi" in spec
|
||||
assert "info" in spec
|
||||
assert "paths" in spec
|
||||
assert "/list_devices" in spec["paths"]
|
||||
assert "/get_device_info" in spec["paths"]
|
||||
|
||||
|
||||
def test_list_devices(client, api_token):
|
||||
"""Test list_devices endpoint."""
|
||||
response = client.post('/api/tools/list_devices', headers=auth_headers(api_token))
|
||||
assert response.status_code == 200
|
||||
devices = response.get_json()
|
||||
assert isinstance(devices, list)
|
||||
# If there are devices, check structure
|
||||
if devices:
|
||||
device = devices[0]
|
||||
assert "devName" in device
|
||||
assert "devMac" in device
|
||||
|
||||
|
||||
def test_get_device_info(client, api_token):
|
||||
"""Test get_device_info endpoint."""
|
||||
# Test with a query that might not exist
|
||||
payload = {"query": "nonexistent_device"}
|
||||
response = client.post('/api/tools/get_device_info',
|
||||
json=payload,
|
||||
headers=auth_headers(api_token))
|
||||
# Should return 404 if no match, or 200 with results
|
||||
assert response.status_code in [200, 404]
|
||||
if response.status_code == 200:
|
||||
devices = response.get_json()
|
||||
assert isinstance(devices, list)
|
||||
elif response.status_code == 404:
|
||||
# Expected for no matches
|
||||
pass
|
||||
|
||||
|
||||
def test_list_devices_unauthorized(client):
|
||||
"""Test list_devices without authorization."""
|
||||
response = client.post('/api/tools/list_devices')
|
||||
assert response.status_code == 401
|
||||
|
||||
|
||||
def test_get_device_info_unauthorized(client):
|
||||
"""Test get_device_info without authorization."""
|
||||
payload = {"query": "test"}
|
||||
response = client.post('/api/tools/get_device_info', json=payload)
|
||||
assert response.status_code == 401
|
||||
Reference in New Issue
Block a user