Merge pull request #1235 from adamoutler/hardening-fixes
Some checks failed
Code checks / check-url-paths (push) Has been cancelled
docker / docker_dev (push) Has been cancelled
Deploy MkDocs / deploy (push) Has been cancelled

Hardening fixes
This commit is contained in:
Jokob @NetAlertX
2025-10-28 08:31:30 +11:00
committed by GitHub
36 changed files with 2009 additions and 371 deletions

BIN
.coverage

Binary file not shown.

View File

@@ -1,4 +1,4 @@
# DO NOT MODIFY THIS FILE DIRECTLY. IT IS AUTO-GENERATED BY .devcontainer/scripts/generate-dockerfile.sh
# DO NOT MODIFY THIS FILE DIRECTLY. IT IS AUTO-GENERATED BY .devcontainer/scripts/generate-configs.sh
# ---/Dockerfile---
# The NetAlertX Dockerfile has 3 stages:
@@ -103,7 +103,6 @@ ENV PORT=20211
ENV NETALERTX_DEBUG=0
ENV VENDORSPATH=/app/back/ieee-oui.txt
ENV VENDORSPATH_NEWEST=/services/run/tmp/ieee-oui.txt
ENV PYTHONPATHPATH="${NETALERTX_APP}:${VIRTUAL_ENV}/bin:${PATH}"
ENV ENVIRONMENT=alpine
ENV READ_ONLY_USER=readonly READ_ONLY_GROUP=readonly
ENV NETALERTX_USER=netalertx NETALERTX_GROUP=netalertx
@@ -146,13 +145,14 @@ RUN apk add libcap && \
setcap cap_net_raw,cap_net_admin+eip /usr/bin/arp-scan && \
setcap cap_net_raw,cap_net_admin,cap_net_bind_service+eip /usr/bin/nbtscan && \
setcap cap_net_raw,cap_net_admin+eip /usr/bin/traceroute && \
setcap cap_net_raw,cap_net_admin+eip ${VIRTUAL_ENV_BIN}/scapy && \
setcap cap_net_raw,cap_net_admin+eip $(readlink -f ${VIRTUAL_ENV_BIN}/python) && \
/bin/sh /build/init-nginx.sh && \
/bin/sh /build/init-php-fpm.sh && \
/bin/sh /build/init-crond.sh && \
/bin/sh /build/init-backend.sh && \
rm -rf /build && \
apk del libcap
apk del libcap && \
date +%s > ${NETALERTX_FRONT}/buildtimestamp.txt
ENTRYPOINT ["/bin/sh","/entrypoint.sh"]
@@ -185,6 +185,9 @@ RUN chown -R ${READ_ONLY_USER}:${READ_ONLY_GROUP} ${READ_ONLY_FOLDERS} && \
find ${READ_WRITE_FOLDERS} -type d -exec chmod 700 {} + && \
chown ${READ_ONLY_USER}:${READ_ONLY_GROUP} /entrypoint.sh /opt /opt/venv && \
chmod 005 /entrypoint.sh ${SYSTEM_SERVICES}/*.sh /app /opt /opt/venv && \
for dir in ${READ_WRITE_FOLDERS}; do \
install -d -o ${NETALERTX_USER} -g ${NETALERTX_GROUP} -m 700 "$dir"; \
done && \
apk del apk-tools && \
rm -Rf /var /etc/sudoers.d/* /etc/shadow /etc/gshadow /etc/sudoers \
/lib/apk /lib/firmware /lib/modules-load.d /lib/sysctl.d /mnt /home/ /root \
@@ -210,7 +213,7 @@ HEALTHCHECK --interval=30s --timeout=10s --start-period=60s --retries=3 \
FROM runner AS netalertx-devcontainer
ENV INSTALL_DIR=/app
ENV PYTHONPATH=/workspaces/NetAlertX/test:/workspaces/NetAlertX/server:/app:/app/server:/opt/venv/lib/python3.12/site-packages
ENV PYTHONPATH=/workspaces/NetAlertX/test:/workspaces/NetAlertX/server:/app:/app/server:/opt/venv/lib/python3.12/site-packages:/usr/lib/python3.12/site-packages
ENV PATH=/services:${PATH}
ENV PHP_INI_SCAN_DIR=/services/config/php/conf.d:/etc/php83/conf.d
ENV LISTEN_ADDR=0.0.0.0
@@ -218,19 +221,20 @@ ENV PORT=20211
ENV NETALERTX_DEBUG=1
ENV PYDEVD_DISABLE_FILE_VALIDATION=1
COPY .devcontainer/resources/devcontainer-overlay/ /
USER root
# Install common tools, create user, and set up sudo
RUN apk add --no-cache git nano vim jq php83-pecl-xdebug py3-pip nodejs sudo gpgconf pytest pytest-cov fish shfmt sudo
RUN apk add --no-cache git nano vim jq php83-pecl-xdebug py3-pip nodejs sudo gpgconf pytest \
pytest-cov fish shfmt github-cli py3-yaml py3-docker-py docker-cli docker-cli-buildx
RUN install -d -o netalertx -g netalertx -m 755 /services/php/modules && \
cp -a /usr/lib/php83/modules/. /services/php/modules/ && \
echo "${NETALERTX_USER} ALL=(ALL) NOPASSWD: ALL" >> /etc/sudoers
# Install debugpy in the virtualenv if present, otherwise into system python3
RUN /bin/sh -c '(/opt/venv/bin/python3 -m pip install --no-cache-dir debugpy) || (python3 -m pip install --no-cache-dir debugpy) || true' && \
mkdir /workspaces && \
RUN mkdir /workspaces && \
install -d -o netalertx -g netalertx -m 777 /services/run/logs && \
install -d -o netalertx -g netalertx -m 777 /app/run/tmp/client_body && \
sed -i -e 's|:/app:|:/workspaces:|' /etc/passwd && \
python -m pip install -U pytest pytest-cov
find /opt/venv -type d -exec chmod o+rwx {} \;
USER netalertx
ENTRYPOINT ["/bin/sh","-c","sleep infinity"]

View File

@@ -23,6 +23,9 @@
// even within this container and connect to them as needed.
// "--network=host",
],
"mounts": [
"source=/var/run/docker.sock,target=/var/run/docker.sock,type=bind" //used for testing various conditions in docker
],
// ATTENTION: If running with --network=host, COMMENT `forwardPorts` OR ELSE THERE WILL BE NO WEBUI!
"forwardPorts": [20211, 20212, 5678],
"portsAttributes": { // the ports we care about
@@ -40,8 +43,13 @@
}
},
"postStartCommand": "${containerWorkspaceFolder}/.devcontainer/scripts/setup.sh",
"postCreateCommand": {
"Install Pip Requirements": "/opt/venv/bin/pip3 install pytest docker debugpy"
},
"postStartCommand": {
"Start Environment":"${containerWorkspaceFolder}/.devcontainer/scripts/setup.sh",
"Build test-container":"echo building netalertx-test container in background. check /tmp/build.log for progress. && setsid docker buildx build -t netalertx-test . > /tmp/build.log 2>&1 &"
},
"customizations": {
"vscode": {
"extensions": [

View File

@@ -7,7 +7,7 @@
FROM runner AS netalertx-devcontainer
ENV INSTALL_DIR=/app
ENV PYTHONPATH=/workspaces/NetAlertX/test:/workspaces/NetAlertX/server:/app:/app/server:/opt/venv/lib/python3.12/site-packages
ENV PYTHONPATH=/workspaces/NetAlertX/test:/workspaces/NetAlertX/server:/app:/app/server:/opt/venv/lib/python3.12/site-packages:/usr/lib/python3.12/site-packages
ENV PATH=/services:${PATH}
ENV PHP_INI_SCAN_DIR=/services/config/php/conf.d:/etc/php83/conf.d
ENV LISTEN_ADDR=0.0.0.0
@@ -15,19 +15,20 @@ ENV PORT=20211
ENV NETALERTX_DEBUG=1
ENV PYDEVD_DISABLE_FILE_VALIDATION=1
COPY .devcontainer/resources/devcontainer-overlay/ /
USER root
# Install common tools, create user, and set up sudo
RUN apk add --no-cache git nano vim jq php83-pecl-xdebug py3-pip nodejs sudo gpgconf pytest pytest-cov fish shfmt github-cli
RUN apk add --no-cache git nano vim jq php83-pecl-xdebug py3-pip nodejs sudo gpgconf pytest \
pytest-cov fish shfmt github-cli py3-yaml py3-docker-py docker-cli docker-cli-buildx
RUN install -d -o netalertx -g netalertx -m 755 /services/php/modules && \
cp -a /usr/lib/php83/modules/. /services/php/modules/ && \
echo "${NETALERTX_USER} ALL=(ALL) NOPASSWD: ALL" >> /etc/sudoers
# Install debugpy in the virtualenv if present, otherwise into system python3
RUN /bin/sh -c '(/opt/venv/bin/python3 -m pip install --no-cache-dir debugpy) || (python3 -m pip install --no-cache-dir debugpy) || true' && \
mkdir /workspaces && \
RUN mkdir /workspaces && \
install -d -o netalertx -g netalertx -m 777 /services/run/logs && \
install -d -o netalertx -g netalertx -m 777 /app/run/tmp/client_body && \
sed -i -e 's|:/app:|:/workspaces:|' /etc/passwd && \
python -m pip install -U pytest pytest-cov
find /opt/venv -type d -exec chmod o+rwx {} \;
USER netalertx
ENTRYPOINT ["/bin/sh","-c","sleep infinity"]

View File

@@ -25,11 +25,52 @@ export PORT=20211
export SOURCE_DIR="/workspaces/NetAlertX"
ensure_docker_socket_access() {
local socket="/var/run/docker.sock"
if [ ! -S "${socket}" ]; then
echo "docker socket not present; skipping docker group configuration"
return
fi
local sock_gid
sock_gid=$(stat -c '%g' "${socket}" 2>/dev/null || true)
if [ -z "${sock_gid}" ]; then
echo "unable to determine docker socket gid; skipping docker group configuration"
return
fi
local group_entry=""
if command -v getent >/dev/null 2>&1; then
group_entry=$(getent group "${sock_gid}" 2>/dev/null || true)
else
group_entry=$(grep -E ":${sock_gid}:" /etc/group 2>/dev/null || true)
fi
local group_name=""
if [ -n "${group_entry}" ]; then
group_name=$(echo "${group_entry}" | cut -d: -f1)
else
group_name="docker-host"
sudo addgroup -g "${sock_gid}" "${group_name}" 2>/dev/null || group_name=$(grep -E ":${sock_gid}:" /etc/group | head -n1 | cut -d: -f1)
fi
if [ -z "${group_name}" ]; then
echo "failed to resolve group for docker socket gid ${sock_gid}; skipping docker group configuration"
return
fi
if ! id -nG netalertx | tr ' ' '\n' | grep -qx "${group_name}"; then
sudo addgroup netalertx "${group_name}" 2>/dev/null || true
fi
}
main() {
echo "=== NetAlertX Development Container Setup ==="
killall php-fpm83 nginx crond python3 2>/dev/null
sleep 1
echo "Setting up ${SOURCE_DIR}..."
ensure_docker_socket_access
sudo chown $(id -u):$(id -g) /workspaces
sudo chmod 755 /workspaces
configure_source
@@ -102,6 +143,12 @@ configure_source() {
killall python3 &>/dev/null
sleep 0.2
done
sudo chmod 777 /opt/venv/lib/python3.12/site-packages/ && \
sudo chmod 005 /opt/venv/lib/python3.12/site-packages/
sudo chmod 666 /var/run/docker.sock
echo " -> Updating build timestamp"
date +%s > ${NETALERTX_FRONT}/buildtimestamp.txt
}

1
.gitignore vendored
View File

@@ -1,3 +1,4 @@
.coverage
.vscode
.dotnet
.vscode-server

21
.vscode/tasks.json vendored
View File

@@ -160,5 +160,26 @@
"color": "terminal.ansiBlue"
}
}
,
{
"label": "[Any] Build Unit Test Docker image",
"type": "shell",
"command": "docker buildx build -t netalertx-test . && echo '🧪 Unit Test Docker image built: netalertx-test'",
"presentation": {
"echo": true,
"reveal": "always",
"panel": "shared",
"showReuseMessage": false
},
"problemMatcher": [],
"group": {
"kind": "build",
"isDefault": false
},
"icon": {
"id": "beaker",
"color": "terminal.ansiBlue"
}
}
]
}

View File

@@ -182,6 +182,9 @@ RUN chown -R ${READ_ONLY_USER}:${READ_ONLY_GROUP} ${READ_ONLY_FOLDERS} && \
find ${READ_WRITE_FOLDERS} -type d -exec chmod 700 {} + && \
chown ${READ_ONLY_USER}:${READ_ONLY_GROUP} /entrypoint.sh /opt /opt/venv && \
chmod 005 /entrypoint.sh ${SYSTEM_SERVICES}/*.sh /app /opt /opt/venv && \
for dir in ${READ_WRITE_FOLDERS}; do \
install -d -o ${NETALERTX_USER} -g ${NETALERTX_GROUP} -m 700 "$dir"; \
done && \
apk del apk-tools && \
rm -Rf /var /etc/sudoers.d/* /etc/shadow /etc/gshadow /etc/sudoers \
/lib/apk /lib/firmware /lib/modules-load.d /lib/sysctl.d /mnt /home/ /root \

View File

@@ -10,17 +10,25 @@ Get visibility of what's going on on your WIFI/LAN network and enable presence d
## 📋 Table of Contents
- [Features](#-features)
- [Documentation](#-documentation)
- [Quick Start](#-quick-start)
- [Alternative Apps](#-other-alternative-apps)
- [Security & Privacy](#-security--privacy)
- [FAQ](#-faq)
- [Known Issues](#-known-issues)
- [Donations](#-donations)
- [Contributors](#-contributors)
- [Translations](#-translations)
- [License](#license)
- [NetAlertX - Network, presence scanner and alert framework](#netalertx---network-presence-scanner-and-alert-framework)
- [📋 Table of Contents](#-table-of-contents)
- [🚀 Quick Start](#-quick-start)
- [📦 Features](#-features)
- [Scanners](#scanners)
- [Notification gateways](#notification-gateways)
- [Integrations and Plugins](#integrations-and-plugins)
- [Workflows](#workflows)
- [📚 Documentation](#-documentation)
- [🔐 Security \& Privacy](#-security--privacy)
- [❓ FAQ](#-faq)
- [🐞 Known Issues](#-known-issues)
- [📃 Everything else](#-everything-else)
- [📧 Get notified what's new](#-get-notified-whats-new)
- [🔀 Other Alternative Apps](#-other-alternative-apps)
- [💙 Donations](#-donations)
- [🏗 Contributors](#-contributors)
- [🌍 Translations](#-translations)
- [License](#license)
## 🚀 Quick Start
@@ -38,6 +46,14 @@ docker run -d --rm --network=host \
ghcr.io/jokob-sk/netalertx:latest
```
To deploy a containerized instance directly from the source repository, execute the following BASH sequence:
```bash
git clone https://github.com/jokob-sk/NetAlertX.git
cd NetAlertX
docker compose up --force-recreate --build
# To customize: edit docker-compose.yaml and run that last command again
```
Need help configuring it? Check the [usage guide](https://github.com/jokob-sk/NetAlertX/blob/main/docs/README.md) or [full documentation](https://jokob-sk.github.io/NetAlertX/).
For Home Assistant users: [Click here to add NetAlertX](https://my.home-assistant.io/redirect/supervisor_add_addon_repository/?repository_url=https%3A%2F%2Fgithub.com%2Falexbelgium%2Fhassio-addons)

View File

@@ -1,56 +1,59 @@
services:
netalertx:
network_mode: host # Use host networking for ARP scanning and other services
#use an environmental variable to set host networking mode if needed
network_mode: ${NETALERTX_NETWORK_MODE:-host} # Use host networking for ARP scanning and other services
build:
context: . # Build context is the current directory
dockerfile: Dockerfile # Specify the Dockerfile to use
context: . # Build context is the current directory
dockerfile: Dockerfile # Specify the Dockerfile to use
image: netalertx:latest
container_name: netalertx # The name when you docker contiainer ls
read_only: true # Make the container filesystem read-only
cap_drop: # Drop all capabilities for enhanced security
container_name: netalertx # The name when you docker contiainer ls
read_only: true # Make the container filesystem read-only
cap_drop: # Drop all capabilities for enhanced security
- ALL
cap_add: # Add only the necessary capabilities
- NET_ADMIN # Required for ARP scanning
- NET_RAW # Required for raw socket operations
- NET_BIND_SERVICE # Required to bind to privileged ports (nbtscan)
cap_add: # Add only the necessary capabilities
- NET_ADMIN # Required for ARP scanning
- NET_RAW # Required for raw socket operations
- NET_BIND_SERVICE # Required to bind to privileged ports (nbtscan)
volumes:
- type: bind
source: ${APP_DATA_LOCATION}/netalertx/config
target: /app/config
read_only: false
- type: bind
source: ${APP_DATA_LOCATION}/netalertx/db
- type: volume # Persistent Docker-managed Named Volume for storage of config files
source: netalertx_config # the default name of the volume is netalertx_config
target: /app/config # inside the container mounted to /app/config
read_only: false # writable volume
# Example custom local folder called /home/user/netalertx_config
# - type: bind
# source: /home/user/netalertx_config
# target: /app/config
# read_only: false
# ... or use the alternative format
# - /home/user/netalertx_config:/app/config:rw
- type: volume
source: netalertx_db
target: /app/db
read_only: false
- type: bind
- type: bind # Bind mount for timezone consistency
source: /etc/localtime
target: /etc/localtime
read_only: true
# Retain logs - comment out tmpfs /app/log if you want to retain logs between container restarts
# - /path/on/host/log:/app/log
# Optional logs
# - type: bind
# source: ${LOGS_LOCATION}
# target: /app/log
# read_only: false
# Optional development mounts
- type: bind
source: ${DEV_LOCATION}
target: /app/front/plugins/custom
read_only: false
# Use a custom Enterprise-configured nginx config for ldap or other settings
# - /custom-enterprise.conf:/services/config/nginx/conf.active/netalertx.conf:ro
# Test your plugin on the production container
# - /path/on/host:/app/front/plugins/custom
# Retain logs - comment out tmpfs /app/log if you want to retain logs between container restarts
# - /path/on/host/log:/app/log
# Tempfs mounts for writable directories in a read-only container and improve system performance
# All mounts have noexec,nosuid,nodev for security purposes no devices, no suid/sgid and no execution of binaries
# async where possible for performance, sync where required for correctness
# uid=20211 and gid=20211 is the netalertx user inside the container
# mode=1700 gives rwx------ permissions to the netalertx user only
tmpfs:
# Speed up logging. This can be commented out to retain logs between container restarts
- "/app/log:uid=20211,gid=20211,mode=1700,rw,noexec,nosuid,nodev,async,noatime,nodiratime"
@@ -63,27 +66,26 @@ services:
# /tmp is required by php for session save this should be reworked to /services/run/tmp
- "/tmp:uid=20211,gid=20211,mode=1700,rw,noexec,nosuid,nodev,async,noatime,nodiratime"
environment:
LISTEN_ADDR: 0.0.0.0 # Listen for connections on all interfaces
PORT: ${PORT} # Application port
ALWAYS_FRESH_INSTALL: ${ALWAYS_FRESH_INSTALL} # Set to true to reset your config and database on each container start
NETALERTX_DEBUG: 0 # 0=kill all services and restart if any dies. 1 keeps running dead services.
TZ: ${TZ} # Timezone, e.g. Europe/Paris
# APP_CONF_OVERRIDE={"SCAN_SUBNETS":"['192.168.1.0/24 --interface=eth1']","GRAPHQL_PORT":"20223","UI_theme":"Light"} # (optional) app.conf settings override
# LOADED_PLUGINS=["DHCPLSS","PIHOLE","ASUSWRT","FREEBOX"] # (optional) default plugins to load
LISTEN_ADDR: ${LISTEN_ADDR:-0.0.0.0} # Listen for connections on all interfaces
PORT: ${PORT:-20211} # Application port
APP_CONF_OVERRIDE: ${GRAPHQL_PORT:-20212} # GraphQL API port
ALWAYS_FRESH_INSTALL: ${ALWAYS_FRESH_INSTALL:-false} # Set to true to reset your config and database on each container start
NETALERTX_DEBUG: ${NETALERTX_DEBUG:-0} # 0=kill all services and restart if any dies. 1 keeps running dead services.
# Resource limits to prevent resource exhaustion
mem_limit: 2048m
mem_reservation: 1024m
cpus: 4
pids_limit: 512
mem_limit: 2048m # Maximum memory usage
mem_reservation: 1024m # Soft memory limit
cpu_shares: 512 # Relative CPU weight for CPU contention scenarios
pids_limit: 512 # Limit the number of processes/threads to prevent fork bombs
logging:
driver: "json-file"
driver: "json-file" # Use JSON file logging driver
options:
max-size: "10m"
max-file: "3"
max-size: "10m" # Rotate log files after they reach 10MB
max-file: "3" # Keep a maximum of 3 log files
# Always restart the container unless explicitly stopped
restart: unless-stopped
# volumes:
# netalertx_config:
# netalertx_db:
volumes: # Persistent volumes for configuration and database storage
netalertx_config: # Configuration files
netalertx_db: # Database files

View File

@@ -49,25 +49,58 @@ printf '
\033[0m
Network intruder and presence detector.
https://netalertx.com
'
'
set -u
# Run all pre-startup checks to validate container environment and dependencies
FAILED_STATUS=""
echo "Startup pre-checks"
for script in ${SYSTEM_SERVICES_SCRIPTS}/check-*.sh; do
sh "$script"
if [ -n "${SKIP_TESTS:-}" ]; then
echo "Skipping startup checks as SKIP_TESTS is set."
break
fi
script_name=$(basename "$script" | sed 's/^check-//;s/\.sh$//;s/-/ /g')
echo " --> ${script_name}"
sh "$script"
NETALERTX_DOCKER_ERROR_CHECK=$?
if [ ${NETALERTX_DOCKER_ERROR_CHECK} -ne 0 ]; then
# fail but continue checks so user can see all issues
FAILED_STATUS="${NETALERTX_DOCKER_ERROR_CHECK}"
echo "${script_name}: FAILED with ${FAILED_STATUS}"
echo "Failure detected in: ${script}"
fi
done
if [ -n "${FAILED_STATUS}" ]; then
echo "Container startup checks failed with exit code ${FAILED_STATUS}."
exit ${FAILED_STATUS}
fi
# Set APP_CONF_OVERRIDE based on GRAPHQL_PORT if not already set
if [ -n "${GRAPHQL_PORT:-}" ] && [ -z "${APP_CONF_OVERRIDE:-}" ]; then
export APP_CONF_OVERRIDE='{"GRAPHQL_PORT":"'"${GRAPHQL_PORT}"'"}'
echo "Setting APP_CONF_OVERRIDE to $APP_CONF_OVERRIDE"
fi
# Exit after checks if in check-only mode (for testing)
if [ "${NETALERTX_CHECK_ONLY:-0}" -eq 1 ]; then
exit 0
fi
# Update vendor data (MAC address OUI database) in the background
# This happens concurrently with service startup to avoid blocking container readiness
${SYSTEM_SERVICES_SCRIPTS}/update_vendors.sh &
bash ${SYSTEM_SERVICES_SCRIPTS}/update_vendors.sh &
# Service management state variables
SERVICES="" # Space-separated list of active services in format "pid:name"
FAILED_NAME="" # Name of service that failed (used for error reporting)
FAILED_STATUS=0 # Exit status code from failed service or signal
################################################################################
# is_pid_active() - Check if a process is alive and not in zombie/dead state

View File

@@ -0,0 +1,110 @@
#!/bin/sh
# check-0-permissions.sh: Verify file system permissions for critical paths.
#
# This script ensures that the application has the necessary read and write
# permissions for its operational directories. It distinguishes between running
# as root (user 0) and a non-privileged user.
#
# As root, it will proactively fix ownership and permissions.
# As a non-root user, it will only warn about issues.
# --- Color Codes ---
RED='\033[1;31m'
YELLOW='\033[1;33m'
RESET='\033[0m'
# --- Main Logic ---
# Define paths that need read-only access
READ_ONLY_PATHS="
${NETALERTX_APP}
${NETALERTX_SERVER}
${NETALERTX_FRONT}
${SYSTEM_SERVICES_CONFIG}
${VIRTUAL_ENV}
"
# Define paths that need read-write access
READ_WRITE_PATHS="
${NETALERTX_API}
${NETALERTX_LOG}
${SYSTEM_SERVICES_RUN}
${NETALERTX_CONFIG}
$(dirname "${NETALERTX_DB_FILE}")
"
# If running as root, fix permissions first
if [ "$(id -u)" -eq 0 ]; then
echo "Running as root. Ensuring correct ownership and permissions..."
# Set ownership to netalertx user and group for all read-write paths
chown -R netalertx:netalertx ${READ_WRITE_PATHS}
# Set directory and file permissions for all read-write paths
find ${READ_WRITE_PATHS} -type d -exec chmod 700 {} +
find ${READ_WRITE_PATHS} -type f -exec chmod 600 {} +
fi
# --- Permission Validation ---
failures=0
# Check all paths
ALL_PATHS="${READ_ONLY_PATHS} ${READ_WRITE_PATHS}"
echo "${READ_ONLY_PATHS}" | while IFS= read -r path; do
[ -z "$path" ] && continue
if [ ! -e "$path" ]; then
failures=1
>&2 printf "%s" "${RED}"
>&2 cat <<EOF
══════════════════════════════════════════════════════════════════════════════
❌ CRITICAL: Path does not exist.
The required path "${path}" could not be found. The application
cannot start without its complete directory structure.
══════════════════════════════════════════════════════════════════════════════
EOF
>&2 printf "%s" "${RESET}"
elif [ ! -r "$path" ]; then
failures=1
>&2 printf "%s" "${YELLOW}"
>&2 cat <<EOF
══════════════════════════════════════════════════════════════════════════════
⚠️ ATTENTION: Read permission denied.
The application cannot read from "${path}". This will cause
unpredictable errors. Please correct the file system permissions.
══════════════════════════════════════════════════════════════════════════════
EOF
>&2 printf "%s" "${RESET}"
fi
done
# Check read-write paths specifically for write access
for path in $READ_WRITE_PATHS; do
if [ -e "$path" ] && [ ! -w "$path" ]; then
failures=1
>&2 printf "%s" "${YELLOW}"
>&2 cat <<EOF
══════════════════════════════════════════════════════════════════════════════
⚠️ ATTENTION: Write permission denied.
The application cannot write to "${path}". This will prevent it from
saving data, logs, or configuration.
To fix this automatically, restart the container with root privileges
(e.g., remove the "user:" directive in your Docker Compose file).
══════════════════════════════════════════════════════════════════════════════
EOF
>&2 printf "%s" "${RESET}"
fi
done
# If there were any failures, exit
if [ "$failures" -ne 0 ]; then
exit 1
fi

View File

@@ -27,5 +27,5 @@ then
══════════════════════════════════════════════════════════════════════════════
EOF
>&2 printf "%s" "${RESET}"
exit 1
fi
fi
exit 0 # Always exit success even after warnings

View File

@@ -9,11 +9,9 @@ if [ ! -f ${NETALERTX_CONFIG}/app.conf ]; then
}
cp /app/back/app.conf "${NETALERTX_CONFIG}/app.conf" || {
>&2 echo "ERROR: Failed to copy default config to ${NETALERTX_CONFIG}/app.conf"
exit 1
exit 2
}
CYAN='\033[1;36m'
RESET='\033[0m'
>&2 printf "%s" "${CYAN}"
>&2 cat <<EOF
══════════════════════════════════════════════════════════════════════════════
🆕 First run detected. Default configuration written to ${NETALERTX_CONFIG}/app.conf.
@@ -22,6 +20,7 @@ if [ ! -f ${NETALERTX_CONFIG}/app.conf ]; then
this instance in production.
══════════════════════════════════════════════════════════════════════════════
EOF
>&2 printf "%s" "${RESET}"
>&2 printf "%s" "${RESET}"
fi

View File

@@ -2,8 +2,17 @@
# This script checks if the database file exists, and if not, creates it with the initial schema.
# It is intended to be run at the first start of the application.
# if the db exists, exit
test -f "${NETALERTX_DB_FILE}" && exit 0
# If ALWAYS_FRESH_INSTALL is true, remove the database to force a rebuild.
if [ "${ALWAYS_FRESH_INSTALL}" = "true" ]; then
if [ -f "${NETALERTX_DB_FILE}" ]; then
# Provide feedback to the user.
>&2 echo "INFO: ALWAYS_FRESH_INSTALL is true. Removing existing database to force a fresh installation."
rm -f "${NETALERTX_DB_FILE}" "${NETALERTX_DB_FILE}-shm" "${NETALERTX_DB_FILE}-wal"
fi
# Otherwise, if the db exists, exit.
elif [ -f "${NETALERTX_DB_FILE}" ]; then
exit 0
fi
CYAN='\033[1;36m'
RESET='\033[0m'
@@ -32,7 +41,6 @@ CREATE TABLE IF NOT EXISTS "Online_History" (
"Offline_Devices" INTEGER,
PRIMARY KEY("Index" AUTOINCREMENT)
);
CREATE TABLE sqlite_sequence(name,seq);
CREATE TABLE Devices (
devMac STRING (50) PRIMARY KEY NOT NULL COLLATE NOCASE,
devName STRING (50) NOT NULL DEFAULT "(unknown)",

View File

@@ -1,9 +1,53 @@
#!/bin/sh
# Initialize required directories and log files
# These must exist before services start to avoid permission/write errors
# TODO - improve with per-directory warning if creation fails
[ ! -d "${NETALERTX_PLUGINS_LOG}" ] && mkdir -p "${NETALERTX_PLUGINS_LOG}"
[ ! -d "${SYSTEM_SERVICES_RUN_LOG}" ] && mkdir -p "${SYSTEM_SERVICES_RUN_LOG}"
[ ! -d "${SYSTEM_SERVICES_RUN_TMP}" ] && mkdir -p "${SYSTEM_SERVICES_RUN_TMP}"
[ ! -f "${LOG_DB_IS_LOCKED}" ] && touch "${LOG_DB_IS_LOCKED}"
[ ! -f "${LOG_EXECUTION_QUEUE}" ] && touch "${LOG_EXECUTION_QUEUE}"
check_mandatory_folders() {
# Check and create plugins log directory
if [ ! -d "${NETALERTX_PLUGINS_LOG}" ]; then
echo " * Creating Plugins log."
if ! mkdir -p "${NETALERTX_PLUGINS_LOG}"; then
echo "Error: Failed to create plugins log directory: ${NETALERTX_PLUGINS_LOG}"
return 1
fi
fi
# Check and create system services run log directory
if [ ! -d "${SYSTEM_SERVICES_RUN_LOG}" ]; then
echo " * Creating System services run log."
if ! mkdir -p "${SYSTEM_SERVICES_RUN_LOG}"; then
echo "Error: Failed to create system services run log directory: ${SYSTEM_SERVICES_RUN_LOG}"
return 1
fi
fi
# Check and create system services run tmp directory
if [ ! -d "${SYSTEM_SERVICES_RUN_TMP}" ]; then
echo " * Creating System services run tmp."
if ! mkdir -p "${SYSTEM_SERVICES_RUN_TMP}"; then
echo "Error: Failed to create system services run tmp directory: ${SYSTEM_SERVICES_RUN_TMP}"
return 1
fi
fi
# Check and create DB locked log file
if [ ! -f "${LOG_DB_IS_LOCKED}" ]; then
echo " * Creating DB locked log."
if ! touch "${LOG_DB_IS_LOCKED}"; then
echo "Error: Failed to create DB locked log file: ${LOG_DB_IS_LOCKED}"
return 1
fi
fi
# Check and create execution queue log file
if [ ! -f "${LOG_EXECUTION_QUEUE}" ]; then
echo " * Creating Execution queue log."
if ! touch "${LOG_EXECUTION_QUEUE}"; then
echo "Error: Failed to create execution queue log file: ${LOG_EXECUTION_QUEUE}"
return 1
fi
fi
}
# Run the function
check_mandatory_folders

View File

@@ -0,0 +1,64 @@
#!/bin/sh
# check-network-mode.sh - detect when the container is not using host networking.
# Exit if NETALERTX_DEBUG=1
if [ "${NETALERTX_DEBUG}" = "1" ]; then
exit 0
fi
# Get the default network interface
DEFAULT_IF="$(ip route show default 0.0.0.0/0 2>/dev/null | awk 'NR==1 {print $5}')"
if [ -z "${DEFAULT_IF}" ]; then
# No default route; nothing to validate.
exit 0
fi
IF_LINK_INFO="$(ip link show "${DEFAULT_IF}" 2>/dev/null)"
IF_IP="$(ip -4 addr show "${DEFAULT_IF}" 2>/dev/null | awk '/inet / {print $2}' | head -n1)"
IF_MAC=""
if [ -r "/sys/class/net/${DEFAULT_IF}/address" ]; then
IF_MAC="$(cat "/sys/class/net/${DEFAULT_IF}/address")"
fi
looks_like_bridge="0"
# Check for common bridge MAC and IP patterns
case "${IF_MAC}" in
02:42:*) looks_like_bridge="1" ;;
00:00:00:00:00:00) looks_like_bridge="1" ;;
"") ;; # leave as is
esac
# Check for common bridge IP ranges
case "${IF_IP}" in
172.1[6-9].*|172.2[0-9].*|172.3[0-1].*) looks_like_bridge="1" ;;
192.168.65.*) looks_like_bridge="1" ;;
esac
if echo "${IF_LINK_INFO}" | grep -q "@if"; then
looks_like_bridge="1"
fi
if [ "${looks_like_bridge}" -ne 1 ]; then
exit 0
fi
YELLOW=$(printf '\033[1;33m')
RESET=$(printf '\033[0m')
>&2 printf "%s" "${YELLOW}"
>&2 cat <<EOF
══════════════════════════════════════════════════════════════════════════════
⚠️ ATTENTION: NetAlertX is not running with --network=host.
Bridge networking blocks passive discovery (ARP, NBNS, mDNS) and active
scanning accuracy. Most plugins expect raw access to the LAN through host
networking and CAP_NET_RAW capabilities.
Restart the container with:
docker run --network=host --cap-add=NET_RAW --cap-add=NET_ADMIN --cap-add=NET_BIND_SERVICE
or set "network_mode: host" in docker-compose.yml.
══════════════════════════════════════════════════════════════════════════════
EOF
>&2 printf "%s" "${RESET}"
exit 0

View File

@@ -0,0 +1,50 @@
#!/bin/sh
# check-nginx-config.sh - verify nginx conf.active mount is writable when startup needs to render config.
CONF_ACTIVE_DIR="${SYSTEM_NGINX_CONFIG}/conf.active"
TARGET_FILE="${CONF_ACTIVE_DIR}/netalertx.conf"
# If the directory is missing entirely we warn and exit failure so the caller can see the message.
if [ ! -d "${CONF_ACTIVE_DIR}" ]; then
YELLOW=$(printf '\033[1;33m')
RESET=$(printf '\033[0m')
>&2 printf "%s" "${YELLOW}"
>&2 cat <<EOF
══════════════════════════════════════════════════════════════════════════════
⚠️ ATTENTION: Nginx configuration mount ${CONF_ACTIVE_DIR} is missing.
Custom listen address or port changes require a writable nginx conf.active
directory. Without it, the container falls back to defaults and ignores
your overrides.
Create a bind mount:
--mount type=bind,src=/path/on/host,dst=${CONF_ACTIVE_DIR}
and ensure it is owned by the netalertx user (20211:20211) with 700 perms.
══════════════════════════════════════════════════════════════════════════════
EOF
>&2 printf "%s" "${RESET}"
exit 1
fi
TMP_FILE="${CONF_ACTIVE_DIR}/.netalertx-write-test"
if ! ( : >"${TMP_FILE}" ) 2>/dev/null; then
YELLOW=$(printf '\033[1;33m')
RESET=$(printf '\033[0m')
>&2 printf "%s" "${YELLOW}"
>&2 cat <<EOF
══════════════════════════════════════════════════════════════════════════════
⚠️ ATTENTION: Unable to write to ${TARGET_FILE}.
Ensure the conf.active mount is writable by the netalertx user before
changing LISTEN_ADDR or PORT. Fix permissions:
chown -R 20211:20211 ${CONF_ACTIVE_DIR}
find ${CONF_ACTIVE_DIR} -type d -exec chmod 700 {} +
find ${CONF_ACTIVE_DIR} -type f -exec chmod 600 {} +
══════════════════════════════════════════════════════════════════════════════
EOF
>&2 printf "%s" "${RESET}"
exit 1
fi
rm -f "${TMP_FILE}"
exit 0

View File

@@ -0,0 +1,45 @@
#!/bin/sh
# check-storage-extra.sh - ensure additional NetAlertX directories are persistent mounts.
if [ "${NETALERTX_DEBUG}" == "1" ]; then
exit 0
fi
warn_if_not_persistent_mount() {
path="$1"
label="$2"
if awk -v target="${path}" '$5 == target {found=1} END {exit found ? 0 : 1}' /proc/self/mountinfo; then
return 0
fi
failures=1
YELLOW=$(printf '\033[1;33m')
RESET=$(printf '\033[0m')
>&2 printf "%s" "${YELLOW}"
>&2 cat <<EOF
══════════════════════════════════════════════════════════════════════════════
⚠️ ATTENTION: ${path} is not a persistent mount.
${label} relies on host storage to persist data across container restarts.
Mount this directory from the host or a named volume before trusting the
container's output.
Example:
--mount type=bind,src=/path/on/host,dst=${path}
══════════════════════════════════════════════════════════════════════════════
EOF
>&2 printf "%s" "${RESET}"
return 1
}
failures=0
warn_if_not_persistent_mount "${NETALERTX_LOG}" "Logs" || failures=$((failures + 1))
warn_if_not_persistent_mount "${NETALERTX_API}" "API JSON cache" || failures=$((failures + 1))
warn_if_not_persistent_mount "${SYSTEM_SERVICES_RUN}" "Runtime work directory" || failures=$((failures + 1))
if [ "${failures}" -ne 0 ]; then
exit 1
fi
exit 0

View File

@@ -1,14 +0,0 @@
#!/bin/sh
# TODO Add sanity checks here to ensure we can read from
# ${NETALERTX_APP}
# ${NETALERTX_SERVER}
# ${NETALERTX_FRONT}
# ${SYSTEM_SERVICES_CONFIG}
# ${VIRTUAL_ENV}
# And read/write tempdirs
# ${NETALERTX_API}
# ${NETALERTX_LOGS}
# ${SYSTEM_SERVICES_RUN}

View File

@@ -0,0 +1,84 @@
#!/bin/sh
# check-storage.sh - Verify critical paths are persistent mounts.
# Define non-persistent filesystem types to check against
# NOTE: 'overlay' and 'aufs' are the primary non-persistent types for container roots.
# 'tmpfs' and 'ramfs' are for specific non-persistent mounts.
NON_PERSISTENT_FSTYPES="tmpfs|ramfs|overlay|aufs"
MANDATORY_PERSISTENT_PATHS="/app/db /app/config"
# This function is now the robust persistence checker.
is_persistent_mount() {
target_path="$1"
mount_entry=$(awk -v path="${target_path}" '$2 == path { print $0 }' /proc/mounts)
if [ -z "${mount_entry}" ]; then
# CRITICAL FIX: If the mount entry is empty, check if it's one of the mandatory paths.
if echo "${MANDATORY_PERSISTENT_PATHS}" | grep -w -q "${target_path}"; then
# The path is mandatory but not mounted: FAIL (Not persistent)
return 1
else
# Not mandatory and not a mount point: Assume persistence is inherited from parent (pass)
return 0
fi
fi
# ... (rest of the original logic remains the same for explicit mounts)
fs_type=$(echo "${mount_entry}" | awk '{print $3}')
# Check if the filesystem type matches any non-persistent types
if echo "${fs_type}" | grep -E -q "^(${NON_PERSISTENT_FSTYPES})$"; then
return 1 # Not persistent (matched a non-persistent type)
else
return 0 # Persistent
fi
}
warn_if_not_persistent_mount() {
path="$1"
if is_persistent_mount "${path}"; then
return 0
fi
failures=1
YELLOW=$(printf '\033[1;33m')
RESET=$(printf '\033[0m')
>&2 printf "%s" "${YELLOW}"
>&2 cat <<EOF
══════════════════════════════════════════════════════════════════════════════
⚠️ ATTENTION: ${path} is not a persistent mount.
Your data in this directory may not persist across container restarts or
upgrades. The filesystem type for this path is identified as non-persistent.
Fix: mount ${path} explicitly as a bind mount or a named volume:
# Bind mount
--mount type=bind,src=/path/on/host,dst=${path}
# Named volume
--mount type=volume,src=netalertx-data,dst=${path}
Apply one of these mount options and restart the container.
══════════════════════════════════════════════════════════════════════════════
EOF
>&2 printf "%s" "${RESET}"
}
# If NETALERTX_DEBUG=1 then we will exit
if [ "${NETALERTX_DEBUG}" = "1" ]; then
exit 0
fi
failures=0
# NETALERTX_DB is a file, so we check its directory
warn_if_not_persistent_mount "$(dirname "${NETALERTX_DB_FILE}")"
warn_if_not_persistent_mount "${NETALERTX_CONFIG}"
if [ "${failures}" -ne 0 ]; then
# We only warn, not exit, as this is not a critical failure
# but the user should be aware of the potential data loss.
sleep 1 # Give user time to read the message
fi

View File

@@ -41,10 +41,8 @@ failures=0
warn_if_not_dedicated_mount "${NETALERTX_API}"
warn_if_not_dedicated_mount "${NETALERTX_LOG}"
if [ "${failures}" -ne 0 ]; then
exit 1
fi
if [ ! -f "${SYSTEM_NGINX_CONFIG}/conf.active" ]; then
echo "Note: Using default listen address ${LISTEN_ADDR}:${PORT} (no ${SYSTEM_NGINX_CONFIG}/conf.active override)."
if [ ! -w "${SYSTEM_NGINX_CONFIG}/conf.active" ]; then
echo "Note: Using default listen address 0.0.0.0:20211 instead of ${LISTEN_ADDR}:${PORT} (no ${SYSTEM_NGINX_CONFIG}/conf.active override)."
fi
exit 0

View File

@@ -20,6 +20,10 @@ if [ "${CURRENT_UID}" -eq 0 ]; then
* Keep the default USER in the image (20211:20211), or
* In docker-compose.yml, remove any 'user:' override that sets UID 0.
Note: As a courtesy, this special mode is only used to set the permissions
of /app/db and /app/config to be owned by the netalertx user so future
runs work correctly.
Bottom line: never run security tooling as root unless you are actively
trying to get pwned.
══════════════════════════════════════════════════════════════════════════════

View File

@@ -1,7 +0,0 @@
#!/bin/sh
# TODO Sanity checks for storage paths
# Ensure we can read/write to
# ${NETALERTX_CONFIG}
# ${NETALERTX_DB}

View File

@@ -0,0 +1,41 @@
#!/bin/sh
# check-user-netalertx.sh - ensure the container is running as the hardened service user.
EXPECTED_USER="${NETALERTX_USER:-netalertx}"
EXPECTED_UID="$(getent passwd "${EXPECTED_USER}" 2>/dev/null | cut -d: -f3)"
EXPECTED_GID="$(getent passwd "${EXPECTED_USER}" 2>/dev/null | cut -d: -f4)"
CURRENT_UID="$(id -u)"
CURRENT_GID="$(id -g)"
# Fallback to known defaults when lookups fail
if [ -z "${EXPECTED_UID}" ]; then
EXPECTED_UID="20211"
fi
if [ -z "${EXPECTED_GID}" ]; then
EXPECTED_GID="20211"
fi
if [ "${CURRENT_UID}" -eq "${EXPECTED_UID}" ] && [ "${CURRENT_GID}" -eq "${EXPECTED_GID}" ]; then
exit 0
fi
YELLOW=$(printf '\033[1;33m')
RESET=$(printf '\033[0m')
>&2 printf "%s" "${YELLOW}"
>&2 cat <<EOF
══════════════════════════════════════════════════════════════════════════════
⚠️ ATTENTION: NetAlertX is running as UID ${CURRENT_UID}:${CURRENT_GID}.
Hardened permissions, file ownership, and runtime isolation expect the
dedicated service account (${EXPECTED_USER} -> ${EXPECTED_UID}:${EXPECTED_GID}).
When you override the container user (for example, docker run --user 1000:1000
or a Compose "user:" directive), NetAlertX loses crucial safeguards and
future upgrades may silently fail.
Restore the container to the default user:
* Remove any custom --user flag
* Delete "user:" overrides in compose files
* Recreate the container so volume ownership is reset
══════════════════════════════════════════════════════════════════════════════
EOF
>&2 printf "%s" "${RESET}"

View File

@@ -19,7 +19,7 @@ TEMP_FILE="/services/run/tmp/ieee-oui.txt.tmp"
OUTPUT_FILE="/services/run/tmp/ieee-oui.txt"
# Download the file using wget to stdout and process it
if ! wget --timeout=30 --tries=3 "https://standards-oui.ieee.org/oui/oui.txt" -O /dev/stdout | \
if ! wget --timeout=30 --tries=3 "https://standards-oui.ieee.org/oui/oui.txt" -O /dev/stdout 2>/dev/null | \
sed -E 's/ *\(base 16\)//' | \
awk -F' ' '{printf "%s\t%s\n", $1, substr($0, index($0, $2))}' | \
sort | \

View File

@@ -11,5 +11,5 @@ done
# Force kill if graceful shutdown failed
killall -KILL python3 &>/dev/null
echo "python3 $(cat /services/config/python/backend-extra-launch-parameters 2>/dev/null) -m server > >(tee /app/log/stdout.log) 2> >(tee /app/log/stderr.log >&2)"
exec python3 $(cat /services/config/python/backend-extra-launch-parameters 2>/dev/null) -m server > >(tee /app/log/stdout.log) 2> >(tee /app/log/stderr.log >&2)
echo "Starting python3 $(cat /services/config/python/backend-extra-launch-parameters 2>/dev/null) -m server > /app/log/stdout.log 2> >(tee /app/log/stderr.log >&2)"
exec python3 $(cat /services/config/python/backend-extra-launch-parameters 2>/dev/null) -m server > /app/log/stdout.log 2> >(tee /app/log/stderr.log >&2)

View File

@@ -1,7 +1,6 @@
#!/bin/bash
set -euo pipefail
echo "Starting crond..."
crond_pid=""
@@ -24,7 +23,7 @@ done
trap cleanup EXIT
trap forward_signal INT TERM
echo "/usr/sbin/crond -c \"${SYSTEM_SERVICES_CROND}\" -f -L \"${LOG_CROND}\" >>\"${LOG_CROND}\" 2>&1 &"
echo "Starting /usr/sbin/crond -c \"${SYSTEM_SERVICES_CROND}\" -f -L \"${LOG_CROND}\" >>\"${LOG_CROND}\" 2>&1 &"
/usr/sbin/crond -c "${SYSTEM_SERVICES_CROND}" -f -L "${LOG_CROND}" >>"${LOG_CROND}" 2>&1 &
crond_pid=$!

View File

@@ -11,7 +11,6 @@ SYSTEM_NGINX_CONFIG_FILE="/services/config/nginx/conf.active/netalertx.conf"
# Create directories if they don't exist
mkdir -p "${LOG_DIR}" "${RUN_DIR}" "${TMP_DIR}"
echo "Starting nginx..."
nginx_pid=""
@@ -48,11 +47,11 @@ trap forward_signal INT TERM
# Execute nginx with overrides
# echo the full nginx command then run it
echo "nginx -p \"${RUN_DIR}/\" -c \"${SYSTEM_NGINX_CONFIG_FILE}\" -g \"error_log ${NETALERTX_LOG}/nginx-error.log; pid ${RUN_DIR}/nginx.pid; daemon off;\" &"
nginx \
echo "Starting /usr/sbin/nginx -p \"${RUN_DIR}/\" -c \"${SYSTEM_NGINX_CONFIG_FILE}\" -g \"error_log /dev/stderr; error_log ${NETALERTX_LOG}/nginx-error.log; pid ${RUN_DIR}/nginx.pid; daemon off;\" &"
/usr/sbin/nginx \
-p "${RUN_DIR}/" \
-c "${SYSTEM_NGINX_CONFIG_FILE}" \
-g "error_log ${NETALERTX_LOG}/nginx-error.log; pid ${RUN_DIR}/nginx.pid; daemon off;" &
-g "error_log /dev/stderr; error_log ${NETALERTX_LOG}/nginx-error.log; pid ${RUN_DIR}/nginx.pid; daemon off;" &
nginx_pid=$!
wait "${nginx_pid}"

View File

@@ -1,8 +1,6 @@
#!/bin/bash
set -euo pipefail
echo "Starting php-fpm..."
php_fpm_pid=""
cleanup() {
@@ -24,8 +22,8 @@ done
trap cleanup EXIT
trap forward_signal INT TERM
echo "/usr/sbin/php-fpm83 -y \"${PHP_FPM_CONFIG_FILE}\" -F >>\"${LOG_APP_PHP_ERRORS}\" 2>&1 &"
/usr/sbin/php-fpm83 -y "${PHP_FPM_CONFIG_FILE}" -F >>"${LOG_APP_PHP_ERRORS}" 2>&1 &
echo "Starting /usr/sbin/php-fpm83 -y \"${PHP_FPM_CONFIG_FILE}\" -F >>\"${LOG_APP_PHP_ERRORS}\" 2>/dev/stderr &"
/usr/sbin/php-fpm83 -y "${PHP_FPM_CONFIG_FILE}" -F >>"${LOG_APP_PHP_ERRORS}" 2> /dev/stderr &
php_fpm_pid=$!
wait "${php_fpm_pid}"

View File

@@ -2,4 +2,8 @@
python_classes = ["Test", "Describe"]
python_functions = ["test_", "it_", "and_", "but_", "they_"]
python_files = ["test_*.py",]
testpaths = ["test",]
testpaths = ["test", "tests/docker_tests"]
markers = [
"docker: requires docker socket and elevated container permissions",
"feature_complete: extended coverage suite not run by default",
]

View File

@@ -21,7 +21,6 @@ CREATE TABLE IF NOT EXISTS "Online_History" (
"Offline_Devices" INTEGER,
PRIMARY KEY("Index" AUTOINCREMENT)
);
CREATE TABLE sqlite_sequence(name,seq);
CREATE TABLE Devices (
devMac STRING (50) PRIMARY KEY NOT NULL COLLATE NOCASE,
devName STRING (50) NOT NULL DEFAULT "(unknown)",

View File

@@ -2,12 +2,37 @@ import subprocess
import re
import sys
import ipaddress
import shutil
import os
from flask import jsonify
# Register NetAlertX directories
INSTALL_PATH = "/app"
sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
# Resolve speedtest-cli path once at module load and validate it.
# We do this once to avoid repeated PATH lookups and to fail fast when
# the binary isn't available or executable.
SPEEDTEST_CLI_PATH = None
def _get_speedtest_cli_path():
"""Resolve and validate the speedtest-cli executable path."""
path = shutil.which("speedtest-cli")
if path is None:
raise RuntimeError(
"speedtest-cli not found in PATH. Please install it: pip install speedtest-cli"
)
if not os.access(path, os.X_OK):
raise RuntimeError(f"speedtest-cli found at {path} but is not executable")
return path
try:
SPEEDTEST_CLI_PATH = _get_speedtest_cli_path()
except Exception as e:
# Warn but don't crash import — the endpoint will return 503 when called.
print(f"Warning: {e}", file=sys.stderr)
SPEEDTEST_CLI_PATH = None
def wakeonlan(mac):
# Validate MAC
@@ -77,10 +102,18 @@ def speedtest():
API endpoint to run a speedtest using speedtest-cli.
Returns JSON with the test output or error.
"""
# If the CLI wasn't found at module load, return a 503 so the caller
# knows the service is unavailable rather than failing unpredictably.
if SPEEDTEST_CLI_PATH is None:
return jsonify({
"success": False,
"error": "speedtest-cli is not installed or not found in PATH"
}), 503
try:
# Run speedtest-cli command
# Run speedtest-cli command using the resolved absolute path
result = subprocess.run(
[f"{INSTALL_PATH}/back/speedtest-cli", "--secure", "--simple"],
[SPEEDTEST_CLI_PATH, "--secure", "--simple"],
capture_output=True,
text=True,
check=True
@@ -97,6 +130,13 @@ def speedtest():
"details": e.stderr.strip()
}), 500
except Exception as e:
return jsonify({
"success": False,
"error": "Failed to run speedtest",
"details": str(e)
}), 500
def nslookup(ip):
"""

View File

@@ -0,0 +1,950 @@
'''
This set of tests requires netalertx-test image built. Ensure netalertx-test image is built prior
to starting these tests or they will fail. netalertx-test image is generally rebuilt using the
Build Unit Test Docker Image task. but can be created manually with the following command executed
in the workspace:
docker buildx build -t netalertx-test .
'''
import os
import pathlib
import shutil
import subprocess
import uuid
import re
import pytest
#TODO: test ALWAYS_FRESH_INSTALL
#TODO: test new named volume mount
IMAGE = os.environ.get("NETALERTX_TEST_IMAGE", "netalertx-test")
GRACE_SECONDS = float(os.environ.get("NETALERTX_TEST_GRACE", "2"))
DEFAULT_CAPS = ["NET_RAW", "NET_ADMIN", "NET_BIND_SERVICE"]
VOLUME_MAP = {
"app_db": "/app/db",
"app_config": "/app/config",
"app_log": "/app/log",
"app_api": "/app/api",
"nginx_conf": "/services/config/nginx/conf.active",
"services_run": "/services/run",
}
pytestmark = [pytest.mark.docker, pytest.mark.feature_complete]
def _unique_label(prefix: str) -> str:
return f"{prefix.upper()}__NETALERTX_INTENTIONAL__{uuid.uuid4().hex[:6]}"
def _create_docker_volume(prefix: str) -> str:
name = f"netalertx-test-{prefix}-{uuid.uuid4().hex[:8]}".lower()
subprocess.run(
["docker", "volume", "create", name],
check=True,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
return name
def _remove_docker_volume(name: str) -> None:
subprocess.run(
["docker", "volume", "rm", "-f", name],
check=False,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
def _chown_path(host_path: pathlib.Path, uid: int, gid: int) -> None:
"""Chown a host path using the test image with host user namespace."""
if not host_path.exists():
raise RuntimeError(f"Cannot chown missing path {host_path}")
cmd = [
"docker",
"run",
"--rm",
"--userns",
"host",
"--user",
"0:0",
"--entrypoint",
"/bin/chown",
"-v",
f"{host_path}:/mnt",
IMAGE,
"-R",
f"{uid}:{gid}",
"/mnt",
]
try:
subprocess.run(
cmd,
check=True,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
except subprocess.CalledProcessError as exc:
raise RuntimeError(f"Failed to chown {host_path} to {uid}:{gid}") from exc
def _setup_mount_tree(tmp_path: pathlib.Path, prefix: str, seed_config: bool = True, seed_db: bool = True) -> dict[str, pathlib.Path]:
label = _unique_label(prefix)
base = tmp_path / f"{label}_MOUNT_ROOT"
base.mkdir()
paths: dict[str, pathlib.Path] = {}
for key, target in VOLUME_MAP.items():
folder_name = f"{label}_{key.upper()}_INTENTIONAL_NETALERTX_TEST"
host_path = base / folder_name
host_path.mkdir(parents=True, exist_ok=True)
# Make the directory writable so the container (running as UID 20211)
# can create files on first run even if the host owner differs.
try:
host_path.chmod(0o777)
except PermissionError:
# If we can't chmod (uncommon in CI), tests that require strict
# ownership will still run their own chown/chmod operations.
pass
paths[key] = host_path
# Determine repo root from env or by walking up from this file
repo_root_env = os.environ.get("NETALERTX_REPO_ROOT")
if repo_root_env:
repo_root = pathlib.Path(repo_root_env)
else:
repo_root = None
cur = pathlib.Path(__file__).resolve()
for parent in cur.parents:
if (parent / "pyproject.toml").exists() or (parent / ".git").exists() or (
(parent / "back").exists() and (parent / "db").exists()
):
repo_root = parent
break
if repo_root is None:
repo_root = cur.parents[2]
if seed_config:
config_file = paths["app_config"] / "app.conf"
config_src = repo_root / "back" / "app.conf"
if not config_src.exists():
print(f"[WARN] Seed file not found: {config_src}. Set NETALERTX_REPO_ROOT or run from repo root. Skipping copy.")
else:
shutil.copyfile(config_src, config_file)
config_file.chmod(0o600)
if seed_db:
db_file = paths["app_db"] / "app.db"
db_src = repo_root / "db" / "app.db"
if not db_src.exists():
print(f"[WARN] Seed file not found: {db_src}. Set NETALERTX_REPO_ROOT or run from repo root. Skipping copy.")
else:
shutil.copyfile(db_src, db_file)
db_file.chmod(0o600)
_chown_netalertx(base)
return paths
def _setup_fixed_mount_tree(base: pathlib.Path) -> dict[str, pathlib.Path]:
if base.exists():
shutil.rmtree(base)
base.mkdir(parents=True)
paths: dict[str, pathlib.Path] = {}
for key in VOLUME_MAP:
host_path = base / f"{key.upper()}_NETALERTX_TEST"
host_path.mkdir(parents=True, exist_ok=True)
host_path.chmod(0o777)
paths[key] = host_path
return paths
def _build_volume_args(
paths: dict[str, pathlib.Path],
read_only: set[str] | None = None,
skip: set[str] | None = None,
) -> list[tuple[str, str, bool]]:
bindings: list[tuple[str, str, bool]] = []
for key, target in VOLUME_MAP.items():
if skip and key in skip:
continue
bindings.append((str(paths[key]), target, key in read_only if read_only else False))
return bindings
def _chown_root(host_path: pathlib.Path) -> None:
_chown_path(host_path, 0, 0)
def _chown_netalertx(host_path: pathlib.Path) -> None:
_chown_path(host_path, 20211, 20211)
def _run_container(
label: str,
volumes: list[tuple[str, str, bool]] | None = None,
*,
env: dict[str, str] | None = None,
user: str | None = None,
drop_caps: list[str] | None = None,
network_mode: str | None = "host",
extra_args: list[str] | None = None,
volume_specs: list[str] | None = None,
sleep_seconds: float = GRACE_SECONDS,
) -> subprocess.CompletedProcess[str]:
name = f"netalertx-test-{label}-{uuid.uuid4().hex[:8]}".lower()
cmd: list[str] = ["docker", "run", "--rm", "--name", name]
if network_mode:
cmd.extend(["--network", network_mode])
cmd.extend(["--userns", "host"])
# Add default ramdisk to /tmp with permissions 777
cmd.extend(["--tmpfs", "/tmp:mode=777"])
if user:
cmd.extend(["--user", user])
if drop_caps:
for cap in drop_caps:
cmd.extend(["--cap-drop", cap])
else:
for cap in DEFAULT_CAPS:
cmd.extend(["--cap-add", cap])
if env:
for key, value in env.items():
cmd.extend(["-e", f"{key}={value}"])
if extra_args:
cmd.extend(extra_args)
for host_path, target, readonly in volumes or []:
mount = f"{host_path}:{target}"
if readonly:
mount += ":ro"
cmd.extend(["-v", mount])
if volume_specs:
for spec in volume_specs:
cmd.extend(["-v", spec])
# Diagnostic wrapper: list ownership and perms of mounted targets inside
# the container before running the real entrypoint. This helps debug
# permission failures by capturing the container's view of the host mounts.
mounts_ls = """
echo "--- MOUNT PERMS (container view) ---";
ls -ldn \
"""
for _, target, _ in volumes or []:
mounts_ls += f" {target}"
mounts_ls += " || true; echo '--- END MOUNTS ---'; \n"
script = (
mounts_ls
+ "sh /entrypoint.sh & pid=$!; "
+ f"sleep {sleep_seconds}; "
+ "if kill -0 $pid >/dev/null 2>&1; then kill -TERM $pid >/dev/null 2>&1 || true; fi; "
+ "wait $pid; code=$?; if [ $code -eq 143 ]; then exit 0; fi; exit $code"
)
cmd.extend(["--entrypoint", "/bin/sh", IMAGE, "-c", script])
# Print the full Docker command for debugging
print("\n--- DOCKER CMD ---\n", " ".join(cmd), "\n--- END CMD ---\n")
result = subprocess.run(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
timeout=sleep_seconds + 30,
check=False,
)
# Combine and clean stdout and stderr
stdouterr = (
re.sub(r'\x1b\[[0-9;]*m', '', result.stdout or '') +
re.sub(r'\x1b\[[0-9;]*m', '', result.stderr or '')
)
result.output = stdouterr
# Print container output for debugging in every test run.
try:
print("\n--- CONTAINER out ---\n", result.output)
except Exception:
pass
return result
def _assert_contains(result, snippet: str, cmd: list[str] = None) -> None:
if snippet not in result.output:
cmd_str = " ".join(cmd) if cmd else ""
raise AssertionError(
f"Expected to find '{snippet}' in container output.\n"
f"Got:\n{result.output}\n"
f"Container command:\n{cmd_str}"
)
def _setup_zero_perm_dir(paths: dict[str, pathlib.Path], key: str) -> None:
"""Set up a directory with files and zero permissions for testing."""
if key in ["app_db", "app_config"]:
# Files already exist from _setup_mount_tree seeding
pass
else:
# Create a dummy file for other directories
(paths[key] / "dummy.txt").write_text("dummy")
# Chmod all files in the directory to 000
for f in paths[key].iterdir():
f.chmod(0)
# Chmod the directory itself to 000
paths[key].chmod(0)
def _restore_zero_perm_dir(paths: dict[str, pathlib.Path], key: str) -> None:
"""Restore permissions after zero perm test."""
# Chmod directory back to 700
paths[key].chmod(0o700)
# Chmod files back to appropriate permissions
for f in paths[key].iterdir():
if f.name in ["app.db", "app.conf"]:
f.chmod(0o600)
else:
f.chmod(0o644)
def test_root_owned_app_db_mount(tmp_path: pathlib.Path) -> None:
"""Test root-owned mounts - simulates mounting host directories owned by root.
1. Root-Owned Mounts: Simulates mounting host directories owned by root
(common with docker run -v /host/path:/app/db).
Tests each required mount point when owned by root user.
Expected: Warning about permission issues, guidance to fix ownership.
Check script: check-app-permissions.sh
Sample message: "⚠️ ATTENTION: Write permission denied. The application cannot write to..."
"""
paths = _setup_mount_tree(tmp_path, "root_app_db")
_chown_root(paths["app_db"])
volumes = _build_volume_args(paths)
try:
result = _run_container("root-app-db", volumes)
_assert_contains(result, "Write permission denied", result.args)
_assert_contains(result, str(VOLUME_MAP["app_db"]), result.args)
finally:
_chown_netalertx(paths["app_db"])
def test_root_owned_app_config_mount(tmp_path: pathlib.Path) -> None:
"""Test root-owned mounts - simulates mounting host directories owned by root.
1. Root-Owned Mounts: Simulates mounting host directories owned by root
(common with docker run -v /host/path:/app/db).
Tests each required mount point when owned by root user.
Expected: Warning about permission issues, guidance to fix ownership.
"""
paths = _setup_mount_tree(tmp_path, "root_app_config")
_chown_root(paths["app_config"])
volumes = _build_volume_args(paths)
try:
result = _run_container("root-app-config", volumes)
_assert_contains(result, "Write permission denied", result.args)
_assert_contains(result, str(VOLUME_MAP["app_config"]), result.args)
assert result.returncode != 0
finally:
_chown_netalertx(paths["app_config"])
def test_root_owned_app_log_mount(tmp_path: pathlib.Path) -> None:
"""Test root-owned mounts - simulates mounting host directories owned by root.
1. Root-Owned Mounts: Simulates mounting host directories owned by root
(common with docker run -v /host/path:/app/db).
Tests each required mount point when owned by root user.
Expected: Warning about permission issues, guidance to fix ownership.
"""
paths = _setup_mount_tree(tmp_path, "root_app_log")
_chown_root(paths["app_log"])
volumes = _build_volume_args(paths)
try:
result = _run_container("root-app-log", volumes)
_assert_contains(result, "Write permission denied", result.args)
_assert_contains(result, str(VOLUME_MAP["app_log"]), result.args)
assert result.returncode != 0
finally:
_chown_netalertx(paths["app_log"])
def test_root_owned_app_api_mount(tmp_path: pathlib.Path) -> None:
"""Test root-owned mounts - simulates mounting host directories owned by root.
1. Root-Owned Mounts: Simulates mounting host directories owned by root
(common with docker run -v /host/path:/app/db).
Tests each required mount point when owned by root user.
Expected: Warning about permission issues, guidance to fix ownership.
"""
paths = _setup_mount_tree(tmp_path, "root_app_api")
_chown_root(paths["app_api"])
volumes = _build_volume_args(paths)
try:
result = _run_container("root-app-api", volumes)
_assert_contains(result, "Write permission denied", result.args)
_assert_contains(result, str(VOLUME_MAP["app_api"]), result.args)
assert result.returncode != 0
finally:
_chown_netalertx(paths["app_api"])
def test_root_owned_nginx_conf_mount(tmp_path: pathlib.Path) -> None:
"""Test root-owned mounts - simulates mounting host directories owned by root.
1. Root-Owned Mounts: Simulates mounting host directories owned by root
(common with docker run -v /host/path:/app/db).
Tests each required mount point when owned by root user.
Expected: Warning about permission issues, guidance to fix ownership.
"""
paths = _setup_mount_tree(tmp_path, "root_nginx_conf")
_chown_root(paths["nginx_conf"])
volumes = _build_volume_args(paths)
try:
result = _run_container("root-nginx-conf", volumes)
_assert_contains(result, "Write permission denied", result.args)
_assert_contains(result, str(VOLUME_MAP["nginx_conf"]), result.args)
assert result.returncode != 0
finally:
_chown_netalertx(paths["nginx_conf"])
def test_root_owned_services_run_mount(tmp_path: pathlib.Path) -> None:
"""Test root-owned mounts - simulates mounting host directories owned by root.
1. Root-Owned Mounts: Simulates mounting host directories owned by root
(common with docker run -v /host/path:/app/db).
Tests each required mount point when owned by root user.
Expected: Warning about permission issues, guidance to fix ownership.
"""
paths = _setup_mount_tree(tmp_path, "root_services_run")
_chown_root(paths["services_run"])
volumes = _build_volume_args(paths)
try:
result = _run_container("root-services-run", volumes)
_assert_contains(result, "Write permission denied", result.args)
_assert_contains(result, str(VOLUME_MAP["services_run"]), result.args)
assert result.returncode != 0
finally:
_chown_netalertx(paths["services_run"])
def test_zero_permissions_app_db_dir(tmp_path: pathlib.Path) -> None:
"""Test zero permissions - simulates mounting directories/files with no permissions.
2. Zero Permissions: Simulates mounting directories/files with no permissions (chmod 000).
Tests directories and files with no read/write/execute permissions.
Expected: "Write permission denied" error with path, guidance to fix permissions.
Check script: check-app-permissions.sh
Sample messages: "⚠️ ATTENTION: Write permission denied. The application cannot write to..."
"⚠️ ATTENTION: Read permission denied. The application cannot read from..."
"""
paths = _setup_mount_tree(tmp_path, "chmod_app_db")
_setup_zero_perm_dir(paths, "app_db")
volumes = _build_volume_args(paths)
try:
result = _run_container("chmod-app-db", volumes, user="20211:20211")
_assert_contains(result, "Write permission denied", result.args)
_assert_contains(result, str(VOLUME_MAP["app_db"]), result.args)
assert result.returncode != 0
finally:
_restore_zero_perm_dir(paths, "app_db")
def test_zero_permissions_app_db_file(tmp_path: pathlib.Path) -> None:
"""Test zero permissions - simulates mounting directories/files with no permissions.
2. Zero Permissions: Simulates mounting directories/files with no permissions (chmod 000).
Tests directories and files with no read/write/execute permissions.
Expected: "Write permission denied" error with path, guidance to fix permissions.
"""
paths = _setup_mount_tree(tmp_path, "chmod_app_db_file")
(paths["app_db"] / "app.db").chmod(0)
volumes = _build_volume_args(paths)
try:
result = _run_container("chmod-app-db-file", volumes)
_assert_contains(result, "Write permission denied", result.args)
assert result.returncode != 0
finally:
(paths["app_db"] / "app.db").chmod(0o600)
def test_zero_permissions_app_config_dir(tmp_path: pathlib.Path) -> None:
"""Test zero permissions - simulates mounting directories/files with no permissions.
2. Zero Permissions: Simulates mounting directories/files with no permissions (chmod 000).
Tests directories and files with no read/write/execute permissions.
Expected: "Write permission denied" error with path, guidance to fix permissions.
"""
paths = _setup_mount_tree(tmp_path, "chmod_app_config")
_setup_zero_perm_dir(paths, "app_config")
volumes = _build_volume_args(paths)
try:
result = _run_container("chmod-app-config", volumes, user="20211:20211")
_assert_contains(result, "Write permission denied", result.args)
_assert_contains(result, str(VOLUME_MAP["app_config"]), result.args)
assert result.returncode != 0
finally:
_restore_zero_perm_dir(paths, "app_config")
def test_zero_permissions_app_config_file(tmp_path: pathlib.Path) -> None:
"""Test zero permissions - simulates mounting directories/files with no permissions.
2. Zero Permissions: Simulates mounting directories/files with no permissions (chmod 000).
Tests directories and files with no read/write/execute permissions.
Expected: "Write permission denied" error with path, guidance to fix permissions.
"""
paths = _setup_mount_tree(tmp_path, "chmod_app_config_file")
(paths["app_config"] / "app.conf").chmod(0)
volumes = _build_volume_args(paths)
try:
result = _run_container("chmod-app-config-file", volumes)
_assert_contains(result, "Write permission denied", result.args)
assert result.returncode != 0
finally:
(paths["app_config"] / "app.conf").chmod(0o600)
def test_zero_permissions_app_log_dir(tmp_path: pathlib.Path) -> None:
"""Test zero permissions - simulates mounting directories/files with no permissions.
2. Zero Permissions: Simulates mounting directories/files with no permissions (chmod 000).
Tests directories and files with no read/write/execute permissions.
Expected: "Write permission denied" error with path, guidance to fix permissions.
"""
paths = _setup_mount_tree(tmp_path, "chmod_app_log")
_setup_zero_perm_dir(paths, "app_log")
volumes = _build_volume_args(paths)
try:
result = _run_container("chmod-app-log", volumes, user="20211:20211")
_assert_contains(result, "Write permission denied", result.args)
_assert_contains(result, str(VOLUME_MAP["app_log"]), result.args)
assert result.returncode != 0
finally:
_restore_zero_perm_dir(paths, "app_log")
def test_zero_permissions_app_api_dir(tmp_path: pathlib.Path) -> None:
"""Test zero permissions - simulates mounting directories/files with no permissions.
2. Zero Permissions: Simulates mounting directories/files with no permissions (chmod 000).
Tests directories and files with no read/write/execute permissions.
Expected: "Write permission denied" error with path, guidance to fix permissions.
"""
paths = _setup_mount_tree(tmp_path, "chmod_app_api")
_setup_zero_perm_dir(paths, "app_api")
volumes = _build_volume_args(paths)
try:
result = _run_container("chmod-app-api", volumes, user="20211:20211")
_assert_contains(result, "Write permission denied", result.args)
_assert_contains(result, str(VOLUME_MAP["app_api"]), result.args)
assert result.returncode != 0
finally:
_restore_zero_perm_dir(paths, "app_api")
def test_zero_permissions_nginx_conf_dir(tmp_path: pathlib.Path) -> None:
"""Test zero permissions - simulates mounting directories/files with no permissions.
2. Zero Permissions: Simulates mounting directories/files with no permissions (chmod 000).
Tests directories and files with no read/write/execute permissions.
Expected: "Write permission denied" error with path, guidance to fix permissions.
"""
paths = _setup_mount_tree(tmp_path, "chmod_nginx_conf")
_setup_zero_perm_dir(paths, "nginx_conf")
volumes = _build_volume_args(paths)
try:
result = _run_container("chmod-nginx-conf", volumes, user="20211:20211")
assert result.returncode != 0
finally:
_restore_zero_perm_dir(paths, "nginx_conf")
def test_zero_permissions_services_run_dir(tmp_path: pathlib.Path) -> None:
"""Test zero permissions - simulates mounting directories/files with no permissions.
2. Zero Permissions: Simulates mounting directories/files with no permissions (chmod 000).
Tests directories and files with no read/write/execute permissions.
Expected: "Write permission denied" error with path, guidance to fix permissions.
"""
paths = _setup_mount_tree(tmp_path, "chmod_services_run")
_setup_zero_perm_dir(paths, "services_run")
volumes = _build_volume_args(paths)
try:
result = _run_container("chmod-services-run", volumes, user="20211:20211")
_assert_contains(result, "Write permission denied", result.args)
_assert_contains(result, str(VOLUME_MAP["services_run"]), result.args)
assert result.returncode != 0
finally:
_restore_zero_perm_dir(paths, "services_run")
def test_readonly_app_db_mount(tmp_path: pathlib.Path) -> None:
"""Test readonly mounts - simulates read-only volume mounts in containers.
3. Missing Required Mounts: Simulates forgetting to mount required persistent volumes
in read-only containers. Tests each required mount point when mounted read-only.
Expected: "Write permission denied" error with path, guidance to add volume mounts.
"""
paths = _setup_mount_tree(tmp_path, "readonly_app_db")
volumes = _build_volume_args(paths, read_only={"app_db"})
result = _run_container("readonly-app-db", volumes)
_assert_contains(result, "Write permission denied", result.args)
_assert_contains(result, str(VOLUME_MAP["app_db"]), result.args)
assert result.returncode != 0
def test_readonly_app_config_mount(tmp_path: pathlib.Path) -> None:
"""Test readonly mounts - simulates read-only volume mounts in containers.
3. Missing Required Mounts: Simulates forgetting to mount required persistent volumes
in read-only containers. Tests each required mount point when mounted read-only.
Expected: "Write permission denied" error with path, guidance to add volume mounts.
"""
paths = _setup_mount_tree(tmp_path, "readonly_app_config")
volumes = _build_volume_args(paths, read_only={"app_config"})
result = _run_container("readonly-app-config", volumes)
_assert_contains(result, "Write permission denied", result.args)
_assert_contains(result, str(VOLUME_MAP["app_config"]), result.args)
assert result.returncode != 0
def test_readonly_app_log_mount(tmp_path: pathlib.Path) -> None:
"""Test readonly mounts - simulates read-only volume mounts in containers.
3. Missing Required Mounts: Simulates forgetting to mount required persistent volumes
in read-only containers. Tests each required mount point when mounted read-only.
Expected: "Write permission denied" error with path, guidance to add volume mounts.
"""
paths = _setup_mount_tree(tmp_path, "readonly_app_log")
volumes = _build_volume_args(paths, read_only={"app_log"})
result = _run_container("readonly-app-log", volumes)
_assert_contains(result, "Write permission denied", result.args)
_assert_contains(result, str(VOLUME_MAP["app_log"]), result.args)
assert result.returncode != 0
def test_readonly_app_api_mount(tmp_path: pathlib.Path) -> None:
"""Test readonly mounts - simulates read-only volume mounts in containers.
3. Missing Required Mounts: Simulates forgetting to mount required persistent volumes
in read-only containers. Tests each required mount point when mounted read-only.
Expected: "Write permission denied" error with path, guidance to add volume mounts.
"""
paths = _setup_mount_tree(tmp_path, "readonly_app_api")
volumes = _build_volume_args(paths, read_only={"app_api"})
result = _run_container("readonly-app-api", volumes)
_assert_contains(result, "Write permission denied", result.args)
_assert_contains(result, str(VOLUME_MAP["app_api"]), result.args)
assert result.returncode != 0
def test_readonly_nginx_conf_mount(tmp_path: pathlib.Path) -> None:
"""Test readonly mounts - simulates read-only volume mounts in containers.
3. Missing Required Mounts: Simulates forgetting to mount required persistent volumes
in read-only containers. Tests each required mount point when mounted read-only.
Expected: "Write permission denied" error with path, guidance to add volume mounts.
"""
paths = _setup_mount_tree(tmp_path, "readonly_nginx_conf")
volumes = _build_volume_args(paths, read_only={"nginx_conf"})
result = _run_container("readonly-nginx-conf", volumes)
_assert_contains(result, "Write permission denied", result.args)
_assert_contains(result, "/services/config/nginx/conf.active", result.args)
assert result.returncode != 0
def test_readonly_services_run_mount(tmp_path: pathlib.Path) -> None:
"""Test readonly mounts - simulates read-only volume mounts in containers.
3. Missing Required Mounts: Simulates forgetting to mount required persistent volumes
in read-only containers. Tests each required mount point when mounted read-only.
Expected: "Write permission denied" error with path, guidance to add volume mounts.
"""
paths = _setup_mount_tree(tmp_path, "readonly_services_run")
volumes = _build_volume_args(paths, read_only={"services_run"})
result = _run_container("readonly-services-run", volumes)
_assert_contains(result, "Write permission denied", result.args)
_assert_contains(result, str(VOLUME_MAP["services_run"]), result.args)
assert result.returncode != 0
def test_custom_port_without_writable_conf(tmp_path: pathlib.Path) -> None:
"""Test custom port configuration without writable nginx config mount.
4. Custom Port Without Nginx Config Mount: Simulates setting custom LISTEN_ADDR/PORT
without mounting nginx config. Container starts but uses default address.
Expected: Container starts but uses default address, warning about missing config mount.
Check script: check-nginx-config.sh
Sample messages: "⚠️ ATTENTION: Nginx configuration mount /services/config/nginx/conf.active is missing."
"⚠️ ATTENTION: Unable to write to /services/config/nginx/conf.active/netalertx.conf."
"""
paths = _setup_mount_tree(tmp_path, "custom_port_ro_conf")
paths["nginx_conf"].chmod(0o500)
volumes = _build_volume_args(paths)
try:
result = _run_container(
"custom-port-ro-conf",
volumes,
env={"PORT": "24444", "LISTEN_ADDR": "127.0.0.1"},
)
_assert_contains(result, "Write permission denied", result.args)
_assert_contains(result, "/services/config/nginx/conf.active", result.args)
assert result.returncode != 0
finally:
paths["nginx_conf"].chmod(0o755)
def test_missing_mount_app_db(tmp_path: pathlib.Path) -> None:
"""Test missing required mounts - simulates forgetting to mount persistent volumes.
...
"""
paths = _setup_mount_tree(tmp_path, "missing_mount_app_db")
volumes = _build_volume_args(paths, skip={"app_db"})
# CHANGE: Run as root (0:0) to bypass all permission checks on other mounts.
result = _run_container("missing-mount-app-db", volumes, user="0:0")
# Acknowledge the original intent to check for permission denial (now implicit via root)
# _assert_contains(result, "Write permission denied", result.args) # No longer needed, as root user is used
# Robust assertion: check for both the warning and the path
if "not a persistent mount" not in result.output or "/app/db" not in result.output:
print("\n--- DEBUG CONTAINER OUTPUT ---\n", result.output)
raise AssertionError("Expected persistent mount warning for /app/db in container output.")
def test_missing_mount_app_config(tmp_path: pathlib.Path) -> None:
"""Test missing required mounts - simulates forgetting to mount persistent volumes.
3. Missing Required Mounts: Simulates forgetting to mount required persistent volumes
in read-only containers. Tests each required mount point when missing.
Expected: "Write permission denied" error with path, guidance to add volume mounts.
"""
paths = _setup_mount_tree(tmp_path, "missing_mount_app_config")
volumes = _build_volume_args(paths, skip={"app_config"})
result = _run_container("missing-mount-app-config", volumes, user="20211:20211")
_assert_contains(result, "Write permission denied", result.args)
_assert_contains(result, "/app/config", result.args)
def test_missing_mount_app_log(tmp_path: pathlib.Path) -> None:
"""Test missing required mounts - simulates forgetting to mount persistent volumes.
3. Missing Required Mounts: Simulates forgetting to mount required persistent volumes
in read-only containers. Tests each required mount point when missing.
Expected: "Write permission denied" error with path, guidance to add volume mounts.
"""
paths = _setup_mount_tree(tmp_path, "missing_mount_app_log")
volumes = _build_volume_args(paths, skip={"app_log"})
result = _run_container("missing-mount-app-log", volumes, user="20211:20211")
_assert_contains(result, "Write permission denied", result.args)
_assert_contains(result, "/app/log", result.args)
def test_missing_mount_app_api(tmp_path: pathlib.Path) -> None:
"""Test missing required mounts - simulates forgetting to mount persistent volumes.
3. Missing Required Mounts: Simulates forgetting to mount required persistent volumes
in read-only containers. Tests each required mount point when missing.
Expected: "Write permission denied" error with path, guidance to add volume mounts.
"""
paths = _setup_mount_tree(tmp_path, "missing_mount_app_api")
volumes = _build_volume_args(paths, skip={"app_api"})
result = _run_container("missing-mount-app-api", volumes, user="20211:20211")
_assert_contains(result, "Write permission denied", result.args)
_assert_contains(result, "/app/api", result.args)
def test_missing_mount_nginx_conf(tmp_path: pathlib.Path) -> None:
"""Test missing required mounts - simulates forgetting to mount persistent volumes.
3. Missing Required Mounts: Simulates forgetting to mount required persistent volumes
in read-only containers. Tests each required mount point when missing.
Expected: "Write permission denied" error with path, guidance to add volume mounts.
"""
paths = _setup_mount_tree(tmp_path, "missing_mount_nginx_conf")
volumes = _build_volume_args(paths, skip={"nginx_conf"})
result = _run_container("missing-mount-nginx-conf", volumes, user="20211:20211")
_assert_contains(result, "Write permission denied", result.args)
_assert_contains(result, "/services/config/nginx/conf.active", result.args)
assert result.returncode != 0
def test_missing_mount_services_run(tmp_path: pathlib.Path) -> None:
"""Test missing required mounts - simulates forgetting to mount persistent volumes.
3. Missing Required Mounts: Simulates forgetting to mount required persistent volumes
in read-only containers. Tests each required mount point when missing.
Expected: "Write permission denied" error with path, guidance to add volume mounts.
"""
paths = _setup_mount_tree(tmp_path, "missing_mount_services_run")
volumes = _build_volume_args(paths, skip={"services_run"})
result = _run_container("missing-mount-services-run", volumes, user="20211:20211")
_assert_contains(result, "Write permission denied", result.args)
_assert_contains(result, "/services/run", result.args)
_assert_contains(result, "Container startup checks failed with exit code", result.args)
def test_missing_capabilities_triggers_warning(tmp_path: pathlib.Path) -> None:
"""Test missing required capabilities - simulates insufficient container privileges.
5. Missing Required Capabilities: Simulates running without NET_ADMIN, NET_RAW,
NET_BIND_SERVICE capabilities. Required for ARP scanning and network operations.
Expected: "exec /bin/sh: operation not permitted" error, guidance to add capabilities.
Check script: check-cap.sh
Sample message: "⚠️ ATTENTION: Raw network capabilities are missing. Tools that rely on NET_RAW..."
"""
paths = _setup_mount_tree(tmp_path, "missing_caps")
volumes = _build_volume_args(paths)
result = _run_container(
"missing-caps",
volumes,
drop_caps=["ALL"],
)
_assert_contains(result, "exec /bin/sh: operation not permitted", result.args)
assert result.returncode != 0
def test_running_as_root_is_blocked(tmp_path: pathlib.Path) -> None:
"""Test running as root user - simulates insecure container execution.
6. Running as Root User: Simulates running container as root (UID 0) instead of
dedicated netalertx user. Warning about security risks, special permission fix mode.
Expected: Warning about security risks, guidance to use UID 20211.
Check script: check-root.sh
Sample message: "⚠️ ATTENTION: NetAlertX is running as root (UID 0). This defeats every hardening..."
"""
paths = _setup_mount_tree(tmp_path, "run_as_root")
volumes = _build_volume_args(paths)
result = _run_container(
"run-as-root",
volumes,
user="0:0",
)
_assert_contains(result, "NetAlertX is running as root", result.args)
assert result.returncode != 0
def test_running_as_uid_1000_warns(tmp_path: pathlib.Path) -> None:
# No output assertion, just returncode check
"""Test running as wrong user - simulates using arbitrary user instead of netalertx.
7. Running as Wrong User: Simulates running as arbitrary user (UID 1000) instead
of netalertx user. Permission errors due to incorrect user context.
Expected: Permission errors, guidance to use correct user.
Check script: check-user-netalertx.sh
Sample message: "⚠️ ATTENTION: NetAlertX is running as UID 1000:1000. Hardened permissions..."
"""
paths = _setup_mount_tree(tmp_path, "run_as_1000")
volumes = _build_volume_args(paths)
result = _run_container(
"run-as-1000",
volumes,
user="1000:1000",
)
_assert_contains(result, "NetAlertX is running as UID", result.args)
assert result.returncode != 0
def test_missing_host_network_warns(tmp_path: pathlib.Path) -> None:
# No output assertion, just returncode check
"""Test missing host networking - simulates running without host network mode.
8. Missing Host Networking: Simulates running without network_mode: host.
Limits ARP scanning capabilities for network discovery.
Expected: Warning about ARP scanning limitations, guidance to use host networking.
Check script: check-network-mode.sh
Sample message: "⚠️ ATTENTION: NetAlertX is not running with --network=host. Bridge networking..."
"""
paths = _setup_mount_tree(tmp_path, "missing_host_net")
volumes = _build_volume_args(paths)
result = _run_container(
"missing-host-network",
volumes,
network_mode=None,
)
_assert_contains(result, "not running with --network=host", result.args)
assert result.returncode != 0
def test_missing_app_conf_triggers_seed(tmp_path: pathlib.Path) -> None:
"""Test missing configuration file seeding - simulates corrupted/missing app.conf.
9. Missing Configuration File: Simulates corrupted/missing app.conf.
Container automatically regenerates default configuration on startup.
Expected: Automatic regeneration of default configuration.
"""
paths = _setup_mount_tree(tmp_path, "missing_app_conf")
(paths["app_config"] / "app.conf").unlink()
volumes = _build_volume_args(paths)
result = _run_container("missing-app-conf", volumes, user="0:0")
_assert_contains(result, "Default configuration written to", result.args)
assert result.returncode != 0
def test_missing_app_db_triggers_seed(tmp_path: pathlib.Path) -> None:
"""Test missing database file seeding - simulates corrupted/missing app.db.
10. Missing Database File: Simulates corrupted/missing app.db.
Container automatically creates initial database schema on startup.
Expected: Automatic creation of initial database schema.
"""
paths = _setup_mount_tree(tmp_path, "missing_app_db")
(paths["app_db"] / "app.db").unlink()
volumes = _build_volume_args(paths)
result = _run_container("missing-app-db", volumes, user="0:0")
_assert_contains(result, "Building initial database schema", result.args)
assert result.returncode != 0
def test_tmpfs_config_mount_warns(tmp_path: pathlib.Path) -> None:
"""Test tmpfs instead of volumes - simulates using tmpfs for persistent data.
11. Tmpfs Instead of Volumes: Simulates using tmpfs mounts instead of persistent volumes
(data loss on restart). Tests config and db directories mounted as tmpfs.
Expected: "Read permission denied" error, guidance to use persistent volumes.
Check scripts: check-storage.sh, check-storage-extra.sh
Sample message: "⚠️ ATTENTION: /app/config is not a persistent mount. Your data in this directory..."
"""
paths = _setup_mount_tree(tmp_path, "tmpfs_config")
volumes = _build_volume_args(paths, skip={"app_config"})
extra = ["--mount", "type=tmpfs,destination=/app/config"]
result = _run_container(
"tmpfs-config",
volumes,
extra_args=extra,
)
_assert_contains(result, "not a persistent mount.", result.args)
_assert_contains(result, "/app/config", result.args)
def test_tmpfs_db_mount_warns(tmp_path: pathlib.Path) -> None:
"""Test tmpfs instead of volumes - simulates using tmpfs for persistent data.
11. Tmpfs Instead of Volumes: Simulates using tmpfs mounts instead of persistent volumes
(data loss on restart). Tests config and db directories mounted as tmpfs.
Expected: "Read permission denied" error, guidance to use persistent volumes.
"""
paths = _setup_mount_tree(tmp_path, "tmpfs_db")
volumes = _build_volume_args(paths, skip={"app_db"})
extra = ["--mount", "type=tmpfs,destination=/app/db"]
result = _run_container(
"tmpfs-db",
volumes,
extra_args=extra,
)
_assert_contains(result, "not a persistent mount.", result.args)
_assert_contains(result, "/app/db", result.args)
assert result.returncode != 0

View File

@@ -0,0 +1,82 @@
'''
Tests for the NetAlertX entrypoint.sh script.
These tests verify the behavior of the entrypoint script under various conditions,
such as environment variable settings and check skipping.
'''
import subprocess
import uuid
import pytest
IMAGE = "netalertx-test"
def _run_entrypoint(env: dict[str, str] | None = None, check_only: bool = True) -> subprocess.CompletedProcess[str]:
"""Run the entrypoint script in the test container with given environment."""
name = f"netalertx-test-entrypoint-{uuid.uuid4().hex[:8]}".lower()
cmd = [
"docker", "run", "--rm", "--name", name,
"--network", "host", "--userns", "host",
"--tmpfs", "/tmp:mode=777",
"--cap-add", "NET_RAW", "--cap-add", "NET_ADMIN", "--cap-add", "NET_BIND_SERVICE",
]
if env:
for key, value in env.items():
cmd.extend(["-e", f"{key}={value}"])
if check_only:
cmd.extend(["-e", "NETALERTX_CHECK_ONLY=1"])
cmd.extend([
"--entrypoint", "/bin/sh", IMAGE, "-c",
"sh /entrypoint.sh"
])
return subprocess.run(cmd, capture_output=True, text=True, timeout=30)
@pytest.mark.docker
@pytest.mark.feature_complete
def test_skip_tests_env_var():
# If SKIP_TESTS=1 is set, the entrypoint should skip all startup checks and print a
# message indicating checks are skipped.
# There should be no check output, and the script should exit successfully.
result = _run_entrypoint(env={"SKIP_TESTS": "1"}, check_only=True)
assert "Skipping startup checks as SKIP_TESTS is set." in result.stdout
assert " --> " not in result.stdout # No check outputs
assert result.returncode == 0
@pytest.mark.docker
@pytest.mark.feature_complete
def test_app_conf_override_from_graphql_port():
# If GRAPHQL_PORT is set and APP_CONF_OVERRIDE is not set, the entrypoint should set
# APP_CONF_OVERRIDE to a JSON string containing the GRAPHQL_PORT value and print a message
# about it.
# The script should exit successfully.
result = _run_entrypoint(env={"GRAPHQL_PORT": "20212", "SKIP_TESTS": "1"}, check_only=True)
assert 'Setting APP_CONF_OVERRIDE to {"GRAPHQL_PORT":"20212"}' in result.stdout
assert result.returncode == 0
@pytest.mark.docker
@pytest.mark.feature_complete
def test_app_conf_override_not_overridden():
# If both GRAPHQL_PORT and APP_CONF_OVERRIDE are set, the entrypoint should NOT override
# APP_CONF_OVERRIDE or print a message about it.
# The script should exit successfully.
result = _run_entrypoint(env={
"GRAPHQL_PORT": "20212",
"APP_CONF_OVERRIDE": '{"OTHER":"value"}',
"SKIP_TESTS": "1"
}, check_only=True)
assert 'Setting APP_CONF_OVERRIDE to' not in result.stdout
assert result.returncode == 0
@pytest.mark.docker
@pytest.mark.feature_complete
def test_no_app_conf_override_when_no_graphql_port():
# If GRAPHQL_PORT is not set, the entrypoint should NOT set or print APP_CONF_OVERRIDE.
# The script should exit successfully.
result = _run_entrypoint(env={"SKIP_TESTS": "1"}, check_only=True)
assert 'Setting APP_CONF_OVERRIDE to' not in result.stdout
assert result.returncode == 0

View File

@@ -5,322 +5,327 @@ Tests the fix for Issue #1210 - compound conditions with multiple AND/OR clauses
"""
import sys
import unittest
import pytest
from unittest.mock import MagicMock
# Mock the logger module before importing SafeConditionBuilder
sys.modules['logger'] = MagicMock()
# Add parent directory to path for imports
sys.path.insert(0, '/tmp/netalertx_hotfix/server/db')
import os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
from sql_safe_builder import SafeConditionBuilder
from server.db.sql_safe_builder import SafeConditionBuilder
class TestCompoundConditions(unittest.TestCase):
"""Test compound condition parsing functionality."""
@pytest.fixture
def builder():
"""Create a fresh builder instance for each test."""
return SafeConditionBuilder()
def setUp(self):
"""Create a fresh builder instance for each test."""
self.builder = SafeConditionBuilder()
def test_user_failing_filter_six_and_clauses(self):
"""Test the exact user-reported failing filter from Issue #1210."""
condition = (
"AND devLastIP NOT LIKE '192.168.50.%' "
"AND devLastIP NOT LIKE '192.168.60.%' "
"AND devLastIP NOT LIKE '192.168.70.2' "
"AND devLastIP NOT LIKE '192.168.70.5' "
"AND devLastIP NOT LIKE '192.168.70.3' "
"AND devLastIP NOT LIKE '192.168.70.4'"
)
def test_user_failing_filter_six_and_clauses(builder):
"""Test the exact user-reported failing filter from Issue #1210."""
condition = (
"AND devLastIP NOT LIKE '192.168.50.%' "
"AND devLastIP NOT LIKE '192.168.60.%' "
"AND devLastIP NOT LIKE '192.168.70.2' "
"AND devLastIP NOT LIKE '192.168.70.5' "
"AND devLastIP NOT LIKE '192.168.70.3' "
"AND devLastIP NOT LIKE '192.168.70.4'"
)
sql, params = self.builder.build_safe_condition(condition)
sql, params = builder.build_safe_condition(condition)
# Should successfully parse
self.assertIsNotNone(sql)
self.assertIsNotNone(params)
# Should successfully parse
assert sql is not None
assert params is not None
# Should have 6 parameters (one per clause)
self.assertEqual(len(params), 6)
# Should have 6 parameters (one per clause)
assert len(params) == 6
# Should contain all 6 AND operators
self.assertEqual(sql.count('AND'), 6)
# Should contain all 6 AND operators
assert sql.count('AND') == 6
# Should contain all 6 NOT LIKE operators
self.assertEqual(sql.count('NOT LIKE'), 6)
# Should contain all 6 NOT LIKE operators
assert sql.count('NOT LIKE') == 6
# Should have 6 parameter placeholders
self.assertEqual(sql.count(':param_'), 6)
# Should have 6 parameter placeholders
assert sql.count(':param_') == 6
# Verify all IP patterns are in parameters
param_values = list(params.values())
self.assertIn('192.168.50.%', param_values)
self.assertIn('192.168.60.%', param_values)
self.assertIn('192.168.70.2', param_values)
self.assertIn('192.168.70.5', param_values)
self.assertIn('192.168.70.3', param_values)
self.assertIn('192.168.70.4', param_values)
# Verify all IP patterns are in parameters
param_values = list(params.values())
assert '192.168.50.%' in param_values
assert '192.168.60.%' in param_values
assert '192.168.70.2' in param_values
assert '192.168.70.5' in param_values
assert '192.168.70.3' in param_values
assert '192.168.70.4' in param_values
def test_multiple_and_clauses_simple(self):
"""Test multiple AND clauses with simple equality operators."""
condition = "AND devName = 'Device1' AND devVendor = 'Apple' AND devFavorite = '1'"
sql, params = self.builder.build_safe_condition(condition)
def test_multiple_and_clauses_simple(builder):
"""Test multiple AND clauses with simple equality operators."""
condition = "AND devName = 'Device1' AND devVendor = 'Apple' AND devFavorite = '1'"
# Should have 3 parameters
self.assertEqual(len(params), 3)
sql, params = builder.build_safe_condition(condition)
# Should have 3 AND operators
self.assertEqual(sql.count('AND'), 3)
# Should have 3 parameters
assert len(params) == 3
# Verify all values are parameterized
param_values = list(params.values())
self.assertIn('Device1', param_values)
self.assertIn('Apple', param_values)
self.assertIn('1', param_values)
# Should have 3 AND operators
assert sql.count('AND') == 3
def test_multiple_or_clauses(self):
"""Test multiple OR clauses."""
condition = "OR devName = 'Device1' OR devName = 'Device2' OR devName = 'Device3'"
# Verify all values are parameterized
param_values = list(params.values())
assert 'Device1' in param_values
assert 'Apple' in param_values
assert '1' in param_values
sql, params = self.builder.build_safe_condition(condition)
# Should have 3 parameters
self.assertEqual(len(params), 3)
def test_multiple_or_clauses(builder):
"""Test multiple OR clauses."""
condition = "OR devName = 'Device1' OR devName = 'Device2' OR devName = 'Device3'"
# Should have 3 OR operators
self.assertEqual(sql.count('OR'), 3)
sql, params = builder.build_safe_condition(condition)
# Verify all device names are parameterized
param_values = list(params.values())
self.assertIn('Device1', param_values)
self.assertIn('Device2', param_values)
self.assertIn('Device3', param_values)
# Should have 3 parameters
assert len(params) == 3
def test_mixed_and_or_clauses(self):
"""Test mixed AND/OR logical operators."""
condition = "AND devName = 'Device1' OR devName = 'Device2' AND devFavorite = '1'"
# Should have 3 OR operators
assert sql.count('OR') == 3
sql, params = self.builder.build_safe_condition(condition)
# Verify all device names are parameterized
param_values = list(params.values())
assert 'Device1' in param_values
assert 'Device2' in param_values
assert 'Device3' in param_values
# Should have 3 parameters
self.assertEqual(len(params), 3)
def test_mixed_and_or_clauses(builder):
"""Test mixed AND/OR logical operators."""
condition = "AND devName = 'Device1' OR devName = 'Device2' AND devFavorite = '1'"
# Should preserve the logical operator order
self.assertIn('AND', sql)
self.assertIn('OR', sql)
sql, params = builder.build_safe_condition(condition)
# Verify all values are parameterized
param_values = list(params.values())
self.assertIn('Device1', param_values)
self.assertIn('Device2', param_values)
self.assertIn('1', param_values)
# Should have 3 parameters
assert len(params) == 3
def test_single_condition_backward_compatibility(self):
"""Test that single conditions still work (backward compatibility)."""
condition = "AND devName = 'TestDevice'"
# Should preserve the logical operator order
assert 'AND' in sql
assert 'OR' in sql
sql, params = self.builder.build_safe_condition(condition)
# Verify all values are parameterized
param_values = list(params.values())
assert 'Device1' in param_values
assert 'Device2' in param_values
assert '1' in param_values
# Should have 1 parameter
self.assertEqual(len(params), 1)
# Should match expected format
self.assertIn('AND devName = :param_', sql)
def test_single_condition_backward_compatibility(builder):
"""Test that single conditions still work (backward compatibility)."""
condition = "AND devName = 'TestDevice'"
# Parameter should contain the value
self.assertIn('TestDevice', params.values())
sql, params = builder.build_safe_condition(condition)
def test_single_condition_like_operator(self):
"""Test single LIKE condition for backward compatibility."""
condition = "AND devComments LIKE '%important%'"
# Should have 1 parameter
assert len(params) == 1
sql, params = self.builder.build_safe_condition(condition)
# Should match expected format
assert 'AND devName = :param_' in sql
# Should have 1 parameter
self.assertEqual(len(params), 1)
# Parameter should contain the value
assert 'TestDevice' in params.values()
# Should contain LIKE operator
self.assertIn('LIKE', sql)
# Parameter should contain the pattern
self.assertIn('%important%', params.values())
def test_single_condition_like_operator(builder):
"""Test single LIKE condition for backward compatibility."""
condition = "AND devComments LIKE '%important%'"
def test_compound_with_like_patterns(self):
"""Test compound conditions with LIKE patterns."""
condition = "AND devLastIP LIKE '192.168.%' AND devVendor LIKE '%Apple%'"
sql, params = builder.build_safe_condition(condition)
sql, params = self.builder.build_safe_condition(condition)
# Should have 1 parameter
assert len(params) == 1
# Should have 2 parameters
self.assertEqual(len(params), 2)
# Should contain LIKE operator
assert 'LIKE' in sql
# Should have 2 LIKE operators
self.assertEqual(sql.count('LIKE'), 2)
# Parameter should contain the pattern
assert '%important%' in params.values()
# Verify patterns are parameterized
param_values = list(params.values())
self.assertIn('192.168.%', param_values)
self.assertIn('%Apple%', param_values)
def test_compound_with_inequality_operators(self):
"""Test compound conditions with various inequality operators."""
condition = "AND eve_DateTime > '2024-01-01' AND eve_DateTime < '2024-12-31'"
def test_compound_with_like_patterns(builder):
"""Test compound conditions with LIKE patterns."""
condition = "AND devLastIP LIKE '192.168.%' AND devVendor LIKE '%Apple%'"
sql, params = self.builder.build_safe_condition(condition)
sql, params = builder.build_safe_condition(condition)
# Should have 2 parameters
self.assertEqual(len(params), 2)
# Should have 2 parameters
assert len(params) == 2
# Should have both operators
self.assertIn('>', sql)
self.assertIn('<', sql)
# Should have 2 LIKE operators
assert sql.count('LIKE') == 2
# Verify dates are parameterized
param_values = list(params.values())
self.assertIn('2024-01-01', param_values)
self.assertIn('2024-12-31', param_values)
# Verify patterns are parameterized
param_values = list(params.values())
assert '192.168.%' in param_values
assert '%Apple%' in param_values
def test_empty_condition(self):
"""Test empty condition string."""
condition = ""
sql, params = self.builder.build_safe_condition(condition)
def test_compound_with_inequality_operators(builder):
"""Test compound conditions with various inequality operators."""
condition = "AND eve_DateTime > '2024-01-01' AND eve_DateTime < '2024-12-31'"
# Should return empty results
self.assertEqual(sql, "")
self.assertEqual(params, {})
sql, params = builder.build_safe_condition(condition)
def test_whitespace_only_condition(self):
"""Test condition with only whitespace."""
condition = " \t\n "
# Should have 2 parameters
assert len(params) == 2
sql, params = self.builder.build_safe_condition(condition)
# Should have both operators
assert '>' in sql
assert '<' in sql
# Should return empty results
self.assertEqual(sql, "")
self.assertEqual(params, {})
# Verify dates are parameterized
param_values = list(params.values())
assert '2024-01-01' in param_values
assert '2024-12-31' in param_values
def test_invalid_column_name_rejected(self):
"""Test that invalid column names are rejected."""
condition = "AND malicious_column = 'value'"
with self.assertRaises(ValueError):
self.builder.build_safe_condition(condition)
def test_empty_condition(builder):
"""Test empty condition string."""
condition = ""
def test_invalid_operator_rejected(self):
"""Test that invalid operators are rejected."""
condition = "AND devName EXECUTE 'DROP TABLE'"
sql, params = builder.build_safe_condition(condition)
with self.assertRaises(ValueError):
self.builder.build_safe_condition(condition)
# Should return empty results
assert sql == ""
assert params == {}
def test_sql_injection_attempt_blocked(self):
"""Test that SQL injection attempts are blocked."""
condition = "AND devName = 'value'; DROP TABLE devices; --"
# Should either reject or sanitize the dangerous input
# The semicolon and comment should not appear in the final SQL
try:
sql, params = self.builder.build_safe_condition(condition)
# If it doesn't raise an error, it should sanitize the input
self.assertNotIn('DROP', sql.upper())
self.assertNotIn(';', sql)
except ValueError:
# Rejection is also acceptable
pass
def test_whitespace_only_condition(builder):
"""Test condition with only whitespace."""
condition = " \t\n "
def test_quoted_string_with_spaces(self):
"""Test that quoted strings with spaces are handled correctly."""
condition = "AND devName = 'My Device Name' AND devComments = 'Has spaces here'"
sql, params = builder.build_safe_condition(condition)
sql, params = self.builder.build_safe_condition(condition)
# Should return empty results
assert sql == ""
assert params == {}
# Should have 2 parameters
self.assertEqual(len(params), 2)
# Verify values with spaces are preserved
param_values = list(params.values())
self.assertIn('My Device Name', param_values)
self.assertIn('Has spaces here', param_values)
def test_invalid_column_name_rejected(builder):
"""Test that invalid column names are rejected."""
condition = "AND malicious_column = 'value'"
def test_compound_condition_with_not_equal(self):
"""Test compound conditions with != operator."""
condition = "AND devName != 'Device1' AND devVendor != 'Unknown'"
with pytest.raises(ValueError):
builder.build_safe_condition(condition)
sql, params = self.builder.build_safe_condition(condition)
# Should have 2 parameters
self.assertEqual(len(params), 2)
def test_invalid_operator_rejected(builder):
"""Test that invalid operators are rejected."""
condition = "AND devName EXECUTE 'DROP TABLE'"
# Should have != operators (or converted to <>)
self.assertTrue('!=' in sql or '<>' in sql)
with pytest.raises(ValueError):
builder.build_safe_condition(condition)
# Verify values are parameterized
param_values = list(params.values())
self.assertIn('Device1', param_values)
self.assertIn('Unknown', param_values)
def test_very_long_compound_condition(self):
"""Test handling of very long compound conditions (10+ clauses)."""
clauses = []
for i in range(10):
clauses.append(f"AND devName != 'Device{i}'")
def test_sql_injection_attempt_blocked(builder):
"""Test that SQL injection attempts are blocked."""
condition = "AND devName = 'value'; DROP TABLE devices; --"
condition = " ".join(clauses)
sql, params = self.builder.build_safe_condition(condition)
# Should either reject or sanitize the dangerous input
# The semicolon and comment should not appear in the final SQL
try:
sql, params = builder.build_safe_condition(condition)
# If it doesn't raise an error, it should sanitize the input
assert 'DROP' not in sql.upper()
assert ';' not in sql
except ValueError:
# Rejection is also acceptable
pass
# Should have 10 parameters
self.assertEqual(len(params), 10)
# Should have 10 AND operators
self.assertEqual(sql.count('AND'), 10)
def test_quoted_string_with_spaces(builder):
"""Test that quoted strings with spaces are handled correctly."""
condition = "AND devName = 'My Device Name' AND devComments = 'Has spaces here'"
# Verify all device names are parameterized
param_values = list(params.values())
for i in range(10):
self.assertIn(f'Device{i}', param_values)
sql, params = builder.build_safe_condition(condition)
# Should have 2 parameters
assert len(params) == 2
class TestParameterGeneration(unittest.TestCase):
"""Test parameter generation and naming."""
# Verify values with spaces are preserved
param_values = list(params.values())
assert 'My Device Name' in param_values
assert 'Has spaces here' in param_values
def setUp(self):
"""Create a fresh builder instance for each test."""
self.builder = SafeConditionBuilder()
def test_parameters_have_unique_names(self):
"""Test that all parameters get unique names."""
condition = "AND devName = 'A' AND devName = 'B' AND devName = 'C'"
def test_compound_condition_with_not_equal(builder):
"""Test compound conditions with != operator."""
condition = "AND devName != 'Device1' AND devVendor != 'Unknown'"
sql, params = self.builder.build_safe_condition(condition)
sql, params = builder.build_safe_condition(condition)
# All parameter names should be unique
param_names = list(params.keys())
self.assertEqual(len(param_names), len(set(param_names)))
# Should have 2 parameters
assert len(params) == 2
def test_parameter_values_match_condition(self):
"""Test that parameter values correctly match the condition values."""
condition = "AND devLastIP NOT LIKE '192.168.1.%' AND devLastIP NOT LIKE '10.0.0.%'"
# Should have != operators (or converted to <>)
assert '!=' in sql or '<>' in sql
sql, params = self.builder.build_safe_condition(condition)
# Verify values are parameterized
param_values = list(params.values())
assert 'Device1' in param_values
assert 'Unknown' in param_values
# Should have exactly the values from the condition
param_values = sorted(params.values())
expected_values = sorted(['192.168.1.%', '10.0.0.%'])
self.assertEqual(param_values, expected_values)
def test_parameters_referenced_in_sql(self):
"""Test that all parameters are actually referenced in the SQL."""
condition = "AND devName = 'Device1' AND devVendor = 'Apple'"
def test_very_long_compound_condition(builder):
"""Test handling of very long compound conditions (10+ clauses)."""
clauses = []
for i in range(10):
clauses.append(f"AND devName != 'Device{i}'")
sql, params = self.builder.build_safe_condition(condition)
condition = " ".join(clauses)
sql, params = builder.build_safe_condition(condition)
# Every parameter should appear in the SQL
for param_name in params.keys():
self.assertIn(f':{param_name}', sql)
# Should have 10 parameters
assert len(params) == 10
# Should have 10 AND operators
assert sql.count('AND') == 10
if __name__ == '__main__':
unittest.main()
# Verify all device names are parameterized
param_values = list(params.values())
for i in range(10):
assert f'Device{i}' in param_values
def test_parameters_have_unique_names(builder):
"""Test that all parameters get unique names."""
condition = "AND devName = 'A' AND devName = 'B' AND devName = 'C'"
sql, params = builder.build_safe_condition(condition)
# All parameter names should be unique
param_names = list(params.keys())
assert len(param_names) == len(set(param_names))
def test_parameter_values_match_condition(builder):
"""Test that parameter values correctly match the condition values."""
condition = "AND devLastIP NOT LIKE '192.168.1.%' AND devLastIP NOT LIKE '10.0.0.%'"
sql, params = builder.build_safe_condition(condition)
# Should have exactly the values from the condition
param_values = sorted(params.values())
expected_values = sorted(['192.168.1.%', '10.0.0.%'])
assert param_values == expected_values
def test_parameters_referenced_in_sql(builder):
"""Test that all parameters are actually referenced in the SQL."""
condition = "AND devName = 'Device1' AND devVendor = 'Apple'"
sql, params = builder.build_safe_condition(condition)
# Every parameter should appear in the SQL
for param_name in params.keys():
assert f':{param_name}' in sql