Merge branch 'jokob-sk:main' into baremetal-installer

This commit is contained in:
rell3k
2025-10-02 12:01:39 -04:00
committed by GitHub
16 changed files with 1276 additions and 508 deletions

0
api/.git-placeholder Normal file → Executable file
View File

View File

@@ -10,6 +10,9 @@ NetAlertX comes with a plugin system to feed events from third-party scripts int
> (Currently, update/overwriting of existing objects is only supported for devices via the `CurrentScan` table.)
> [!NOTE]
> For a high-level overview of how the `config.json` is used and it's lifecycle check the [config.json Lifecycle in NetAlertX Guide](PLUGINS_DEV_CONFIG.md).
### 🎥 Watch the video:
> [!TIP]

146
docs/PLUGINS_DEV_CONFIG.md Executable file
View File

@@ -0,0 +1,146 @@
## config.json Lifecycle in NetAlertX
This document describes on a high level how `config.json` is read, processed, and used by the NetAlertX core and plugins. It also outlines the plugin output contract and the main plugin types.
> [!NOTE]
> For a deep-dive on the specific configuration options and sections of the `config.json` plugin manifest, consult the [Plugins Development Guide](PLUGINS_DEV.md).
---
### 1. Loading
* On startup, the app core loads `config.json` for each plugin.
* The `config.json` represents a plugin manifest, that contains metadata and runtime settings.
---
### 2. Validation
* The core checks that each required settings key (such as `RUN`) for a plugin exists.
* Invalid or missing values may be replaced with defaults, or the plugin may be disabled.
---
### 3. Preparation
* The plugins settings (paths, commands, parameters) are prepared.
* Database mappings (`mapped_to_table`, `database_column_definitions`) for data ingestion into the core app are parsed.
---
### 4. Execution
* Plugins can be run at different core app execution points, such as on schedule, once on start, after a notification, etc.
* At runtime, the scheduler triggers plugins according to their `interval`.
* The plugin executes its command or script.
---
### 5. Parsing
* Plugin output is expected in **pipe (`|`)-delimited format**.
* The core parses lines into fields, matching the **plugin interface contract**.
---
### 6. Mapping
* Each parsed field is moved into the `Plugins_` database tables and can be mapped into a configured database table.
* Controlled by `database_column_definitions` and `mapped_to_table`.
* Example: `Object_PrimaryID → Devices.MAC`.
---
### 6a. Plugin Output Contract
Each plugin must output results in the **plugin interface contract format**, pipe (`|`)-delimited values, in the column order described under [Plugin Interface Contract](PLUGINS_DEV.md)
#### IDs
* `Object_PrimaryID` and `Object_SecondaryID` identify the record (e.g. `MAC|IP`).
#### **Watched values (`Watched_Value14`)**
* Used by the core to detect changes between runs.
* Changes here can trigger **notifications**.
#### **Extra value (`Extra`)**
* Optional, extra field.
* Stored in the database but **not used for alerts**.
#### **Helper values (`Helper_Value13`)**
* Added for cases where more than IDs + watched + extra are needed.
* Can be made visible in the UI.
* Stored in the database but **not used for alerts**.
#### **Mapping matters**
* While the plugin output is free-form, the `database_column_definitions` and `mapped_to_table` settings in `config.json` determine the **target columns and data types** in NetAlertX.
---
### 7. Persistence
* Data is upserted into the database.
* Conflicts are resolved using `Object_PrimaryID` + `Object_SecondaryID`.
---
### 8. Plugin Types and Expected Outputs
Beyond the `data_source` setting, plugins fall into functional categories. Each has its own input requirements and output expectations:
#### **Device discovery plugins**
* **Inputs:** `N/A`, subnet, or API for discovery service, or similar.
* **Outputs:** At minimum `MAC` and `IP` that results in a new or updated device records in the `Devices` table.
* **Mapping:** Must be mapped to the `CurrentScan` table via `database_column_definitions` and `data_filters`.
* **Examples:** ARP-scan, NMAP device discovery (e.g., `ARPSCAN`, `NMAPDEV`).
#### **Device-data enrichment plugins**
* **Inputs:** Device identifier (usually `MAC`, `IP`).
* **Outputs:** Additional data for that device (e.g. open ports).
* **Mapping:** Controlled via `database_column_definitions` and `data_filters`.
* **Examples:** Ports, MQTT messages (e.g., `NMAP`, `MQTT`)
#### **Name resolver plugins**
* **Inputs:** Device identifiers (MAC, IP, or hostname).
* **Outputs:** Updated `devName` and `devFQDN` fields.
* **Mapping:** Not expected.
* **Note:** Currently requires **core app modification** to add new plugins, not fully driven by the plugins `config.json`.
* **Examples:** Avahiscan (e.g., `NBTSCAN`, `NSLOOKUP`).
#### **Generic plugins**
* **Inputs:** Whatever the script or query provides.
* **Outputs:** Data shown only in **Integrations → Plugins**, not tied to devices.
* **Mapping:** Not expected.
* **Examples:** External monitoring data (e.g., `INTRSPD`)
#### **Configuration-only plugins**
* **Inputs/Outputs:** None at runtime.
* **Mapping:** Not expected.
* **Examples:** Used to provide additional settings or execute scripts (e.g., `MAINT`, `CSVBCKP`).
---
### 9. Post-Processing
* Notifications are generated if watched values change.
* UI is updated with new or updated records.
* All values that are configured to be shown in teh UI appear in the Plugins section.
---
### 10. Summary
The lifecycle of `config.json` entries is:
**Load → Validate → Prepare → Execute → Parse → Map → Persist → Post-process**
Plugins must follow the **output contract**, and their category (discovery, specific, resolver, generic, config-only) defines what inputs they require and what outputs are expected.

View File

@@ -1,36 +1,32 @@
#!/usr/bin/env python
import json
import subprocess
import argparse
import os
import pathlib
import sys
from datetime import datetime
import time
import re
import unicodedata
import paho.mqtt.client as mqtt
# from paho.mqtt import client as mqtt_client
# from paho.mqtt import CallbackAPIVersion as mqtt_CallbackAPIVersion
import hashlib
import sqlite3
from pytz import timezone
# Register NetAlertX directories
INSTALL_PATH="/app"
INSTALL_PATH = "/app"
sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
# NetAlertX modules
import conf
from const import apiPath, confFileName, logPath
from const import confFileName, logPath
from plugin_utils import getPluginObject
from plugin_helper import Plugin_Objects
from logger import mylog, Logger, append_line_to_file
from helper import timeNowTZ, get_setting_value, bytes_to_string, sanitize_string, normalize_string
from models.notification_instance import NotificationInstance
from logger import mylog, Logger
from helper import timeNowTZ, get_setting_value, bytes_to_string, \
sanitize_string, normalize_string
from database import DB, get_device_stats
from pytz import timezone
# Make sure the TIMEZONE for logging is correct
conf.tz = timezone(get_setting_value('TIMEZONE'))
@@ -49,20 +45,19 @@ plugin_objects = Plugin_Objects(RESULT_FILE)
md5_hash = hashlib.md5()
# globals
mqtt_sensors = []
mqtt_connected_to_broker = False
mqtt_client = None # mqtt client
topic_root = get_setting_value('MQTT_topic_root')
def main():
mylog('verbose', [f'[{pluginName}](publisher) In script'])
# Check if basic config settings supplied
if check_config() == False:
mylog('verbose', [f'[{pluginName}] ⚠ ERROR: Publisher notification gateway not set up correctly. Check your {confFileName} {pluginName}_* variables.'])
if not check_config():
return
# Create a database connection
@@ -70,30 +65,52 @@ def main():
db.open()
mqtt_start(db)
mqtt_client.disconnect()
plugin_objects.write_result_file()
#-------------------------------------------------------------------------------
# -----------------------------------------------------------------------------
# MQTT
#-------------------------------------------------------------------------------
#-------------------------------------------------------------------------------
# -----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
def check_config():
if get_setting_value('MQTT_BROKER') == '' or get_setting_value('MQTT_PORT') == '' or get_setting_value('MQTT_USER') == '' or get_setting_value('MQTT_PASSWORD') == '':
mylog('verbose', [f'[Check Config] ⚠ ERROR: MQTT service not set up correctly. Check your {confFileName} MQTT_* variables.'])
return False
else:
return True
"""
Checks whether the MQTT configuration settings are properly set.
Returns:
bool: True if all required MQTT settings
('MQTT_BROKER', 'MQTT_PORT', 'MQTT_USER', 'MQTT_PASSWORD')
are non-empty;
False otherwise. Logs a verbose error message
if any setting is missing.
"""
if get_setting_value('MQTT_BROKER') == '' \
or get_setting_value('MQTT_PORT') == '' \
or get_setting_value('MQTT_USER') == '' \
or get_setting_value('MQTT_PASSWORD') == '':
mylog('verbose', [f'[Check Config] ⚠ ERROR: MQTT service not set up \
correctly. Check your {confFileName} MQTT_* variables.'])
return False
else:
return True
#-------------------------------------------------------------------------------
# Sensor configs are tracking which sensors in NetAlertX exist and if a config has changed
# -----------------------------------------------------------------------------
# Sensor configs are tracking which sensors in NetAlertX exist
# and if a config has changed
class sensor_config:
def __init__(self, deviceId, deviceName, sensorType, sensorName, icon, mac):
def __init__(self,
deviceId,
deviceName,
sensorType,
sensorName,
icon,
mac):
"""
Initialize the sensor_config object with provided parameters. Sets up sensor configuration
and generates necessary MQTT topics and messages based on the sensor type.
Initialize the sensor_config object with provided parameters.
Sets up sensor configuration and generates necessary MQTT topics
and messages based on the sensor type.
"""
# Assign initial attributes
self.deviceId = deviceId
@@ -110,20 +127,23 @@ class sensor_config:
self.message = {} # Initialize message as an empty dictionary
self.unique_id = ''
# Call helper functions to initialize the message, generate a hash, and handle plugin object
# Call helper functions to initialize the message, generate a hash,
# and handle plugin object
self.initialize_message()
self.generate_hash()
self.handle_plugin_object()
def initialize_message(self):
"""
Initialize the MQTT message payload based on the sensor type. This method handles sensors of types:
Initialize the MQTT message payload based on the sensor type.
This method handles sensors of types:
- 'timestamp'
- 'binary_sensor'
- 'sensor'
- 'device_tracker'
"""
# Ensure self.message is initialized as a dictionary if not already done
# Ensure self.message is initialized as a dictionary
# if not already done
if not isinstance(self.message, dict):
self.message = {}
@@ -153,7 +173,6 @@ class sensor_config:
"icon": f'mdi:{self.icon}'
})
# Handle 'device_tracker' sensor type
elif self.sensorType == 'device_tracker':
self.topic = f'homeassistant/device_tracker/{self.deviceId}/config'
@@ -229,25 +248,36 @@ class sensor_config:
)
#-------------------------------------------------------------------------------
# -------------------------------------------------------------------------------
def publish_mqtt(mqtt_client, topic, message):
"""
Publishes a message to an MQTT topic using the provided MQTT client.
If the message is not a string, it is converted to a JSON-formatted string.
The function retrieves the desired QoS level from settings and logs the publishing process.
If the client is not connected to the broker, the function logs an error and aborts.
It attempts to publish the message, retrying until the publish status indicates success.
Args:
mqtt_client: The MQTT client instance used to publish the message.
topic (str): The MQTT topic to publish to.
message (Any): The message payload to send. Non-string messages are converted to JSON.
Returns:
bool: True if the message was published successfully, False if not connected to the broker.
"""
status = 1
# convert anything but a simple string to json
if not isinstance(message, str):
message = json.dumps(message).replace("'",'"')
message = json.dumps(message).replace("'", '"')
qos = get_setting_value('MQTT_QOS')
mylog('verbose', [f"[{pluginName}] Sending MQTT topic: {topic}"])
mylog('verbose', [f"[{pluginName}] Sending MQTT message: {message}"])
mylog('debug', [f"[{pluginName}] Sending MQTT topic: {topic}",
f"[{pluginName}] Sending MQTT message: {message}"])
# mylog('verbose', [f"[{pluginName}] get_setting_value('MQTT_QOS'): {qos}"])
if mqtt_connected_to_broker == False:
mylog('verbose', [f"[{pluginName}] ⚠ ERROR: Not connected to broker, aborting."])
if not mqtt_connected_to_broker:
mylog('minimal', [f"[{pluginName}] ⚠ ERROR: Not connected to broker, aborting."])
return False
while status != 0:
@@ -268,11 +298,11 @@ def publish_mqtt(mqtt_client, topic, message):
# mylog('verbose', [f"[{pluginName}] result: {result}"])
if status != 0:
mylog('verbose', [f"[{pluginName}] Waiting to reconnect to MQTT broker"])
mylog('debug', [f"[{pluginName}] Waiting to reconnect to MQTT broker"])
time.sleep(0.1)
return True
#-------------------------------------------------------------------------------
# ------------------------------------------------------------------------------
# Create a generic device for overal stats
def create_generic_device(mqtt_client, deviceId, deviceName):
@@ -284,28 +314,29 @@ def create_generic_device(mqtt_client, deviceId, deviceName):
create_sensor(mqtt_client, deviceId, deviceName, 'sensor', 'unknown', 'wifi-alert')
#-------------------------------------------------------------------------------
# ------------------------------------------------------------------------------
# Register sensor config on the broker
def create_sensor(mqtt_client, deviceId, deviceName, sensorType, sensorName, icon, mac=""):
global mqtt_sensors
# check previous configs
sensorConfig = sensor_config(deviceId, deviceName, sensorType, sensorName, icon, mac)
# send if new
# Create the HA sensor config if a new device is discovered
if sensorConfig.isNew:
# add the sensor to the global list to keep track of succesfully added sensors
if publish_mqtt(mqtt_client, sensorConfig.topic, sensorConfig.message):
# hack - delay adding to the queue in case the process is
time.sleep(get_setting_value('MQTT_DELAY_SEC')) # restarted and previous publish processes aborted
# (it takes ~2s to update a sensor config on the broker)
# hack - delay adding to the queue in case the process is
# restarted and previous publish processes aborted
# (it takes ~2s to update a sensor config on the broker)
time.sleep(get_setting_value('MQTT_DELAY_SEC'))
mqtt_sensors.append(sensorConfig)
return sensorConfig
#-------------------------------------------------------------------------------
# -----------------------------------------------------------------------------
def mqtt_create_client():
# attempt reconnections on failure, ref https://www.emqx.com/en/blog/how-to-use-mqtt-in-python
@@ -314,7 +345,7 @@ def mqtt_create_client():
MAX_RECONNECT_COUNT = 12
MAX_RECONNECT_DELAY = 60
mytransport = 'tcp' # or 'websockets'
mytransport = 'tcp' # or 'websockets'
def on_disconnect(mqtt_client, userdata, rc):
@@ -341,7 +372,6 @@ def mqtt_create_client():
mqtt_connected_to_broker = False
def on_connect(mqtt_client, userdata, flags, rc, properties):
global mqtt_connected_to_broker
@@ -367,10 +397,12 @@ def mqtt_create_client():
version = mqtt.MQTTv5
# we now hardcode the client id into here.
# TODO: Add config ffor client id
# TODO: Add config for client id (atm, we use a fixed client id,
# so only one instance of NetAlertX can connect to the broker at any given time)
# If you intend to run multiple instances simultaneously, make sure to set unique client IDs for each instance.
mqtt_client = mqtt.Client(
client_id='netalertx',
callback_api_version = mqtt.CallbackAPIVersion.VERSION2,
callback_api_version=mqtt.CallbackAPIVersion.VERSION2,
transport=mytransport,
protocol=version)
mqtt_client.on_connect = on_connect
@@ -379,8 +411,8 @@ def mqtt_create_client():
if get_setting_value('MQTT_TLS'):
mqtt_client.tls_set()
mqtt_client.username_pw_set(username = get_setting_value('MQTT_USER'), password = get_setting_value('MQTT_PASSWORD'))
err_code = mqtt_client.connect(host = get_setting_value('MQTT_BROKER'), port = get_setting_value('MQTT_PORT'))
mqtt_client.username_pw_set(username=get_setting_value('MQTT_USER'), password=get_setting_value('MQTT_PASSWORD'))
err_code = mqtt_client.connect(host=get_setting_value('MQTT_BROKER'), port=get_setting_value('MQTT_PORT'))
if (err_code == mqtt.MQTT_ERR_SUCCESS):
# We (prematurely) set the connection state to connected
# the callback may be delayed
@@ -393,13 +425,13 @@ def mqtt_create_client():
return mqtt_client
#-------------------------------------------------------------------------------
# -----------------------------------------------------------------------------
def mqtt_start(db):
global mqtt_client, mqtt_connected_to_broker
if mqtt_connected_to_broker == False:
mqtt_connected_to_broker = True
if not mqtt_connected_to_broker:
mqtt_client = mqtt_create_client()
@@ -409,7 +441,7 @@ def mqtt_start(db):
# General stats
# Create a generic device for overal stats
if get_setting_value('MQTT_SEND_STATS') == True:
if get_setting_value('MQTT_SEND_STATS'):
# Create a new device representing overall stats
create_generic_device(mqtt_client, deviceId, deviceName)
@@ -429,7 +461,7 @@ def mqtt_start(db):
)
# Generate device-specific MQTT messages if enabled
if get_setting_value('MQTT_SEND_DEVICES') == True:
if get_setting_value('MQTT_SEND_DEVICES'):
# Specific devices processing
@@ -438,9 +470,7 @@ def mqtt_start(db):
sec_delay = len(devices) * int(get_setting_value('MQTT_DELAY_SEC'))*5
mylog('verbose', [f"[{pluginName}] Estimated delay: ", (sec_delay), 's ', '(', round(sec_delay/60,1) , 'min)' ])
debug_index = 0
mylog('verbose', [f"[{pluginName}] Estimated delay: ", (sec_delay), 's ', '(', round(sec_delay/60, 1), 'min)'])
for device in devices:
@@ -492,7 +522,7 @@ def mqtt_start(db):
)
# handle device_tracker
sensorConfig = create_sensor(mqtt_client, deviceId, devDisplayName, 'device_tracker', 'is_home', 'home', device["devMac"])
sensorConfig = create_sensor(mqtt_client, deviceId, devDisplayName, 'device_tracker', 'is_home', 'home', device["devMac"])
# <away|home> are only valid states
state = 'away'
@@ -505,28 +535,23 @@ def mqtt_start(db):
publish_mqtt(mqtt_client, sensorConfig.json_attr_topic, devJson)
#===============================================================================
# =============================================================================
# Home Assistant UTILs
#===============================================================================
# =============================================================================
def to_binary_sensor(input):
# In HA a binary sensor returns ON or OFF
result = "OFF"
"""
Converts various input types to a binary sensor state ("ON" or "OFF") for Home Assistant.
"""
if isinstance(input, (int, float)) and input >= 1:
return "ON"
elif isinstance(input, bool) and input:
return "ON"
elif isinstance(input, str) and input == "1":
return "ON"
elif isinstance(input, bytes) and bytes_to_string(input) == "1":
return "ON"
return "OFF"
# bytestring
if isinstance(input, str):
if input == "1":
result = "ON"
elif isinstance(input, int):
if input == 1:
result = "ON"
elif isinstance(input, bool):
if input == True:
result = "ON"
elif isinstance(input, bytes):
if bytes_to_string(input) == "1":
result = "ON"
return result
# -------------------------------------
# Convert to format that is interpretable by Home Assistant
@@ -537,7 +562,7 @@ def prepTimeStamp(datetime_str):
# If the parsed datetime is naive (i.e., does not contain timezone info), add UTC timezone
if parsed_datetime.tzinfo is None:
parsed_datetime = parsed_datetime.replace(tzinfo=conf.tz)
parsed_datetime = conf.tz.localize(parsed_datetime)
except ValueError:
mylog('verbose', [f"[{pluginName}] Timestamp conversion failed of string '{datetime_str}'"])
@@ -547,9 +572,7 @@ def prepTimeStamp(datetime_str):
# Convert to the required format with 'T' between date and time and ensure the timezone is included
return parsed_datetime.isoformat() # This will include the timezone offset
# -------------INIT---------------------
if __name__ == '__main__':
sys.exit(main())

View File

@@ -4,6 +4,7 @@
require_once $_SERVER['DOCUMENT_ROOT'] . '/php/templates/security.php';
require_once $_SERVER['DOCUMENT_ROOT'] . '/php/server/db.php';
require_once $_SERVER['DOCUMENT_ROOT'] . '/php/templates/language/lang.php';
require_once $_SERVER['DOCUMENT_ROOT'] . '/php/templates/globals.php';
?>
<?php

345
install/ubuntu24/install.sh Executable file
View File

@@ -0,0 +1,345 @@
#!/usr/bin/env bash
# 🛑 Important: This is only used for the bare-metal install 🛑
echo "---------------------------------------------------------"
echo "[INSTALL] Starting NetAlertX installation for Ubuntu"
echo "---------------------------------------------------------"
echo
echo "This script will install NetAlertX on your Ubuntu system."
echo "It will clone the repository, set up necessary files, and start the application."
echo "Please ensure you have a stable internet connection."
echo "---------------------------------------------------------"
# DO NOT CHANGE ANYTHING BELOW THIS LINE!
INSTALL_DIR=/app
INSTALL_SYSTEM_NAME=ubuntu24
INSTALLER_DIR=${INSTALL_DIR}/install/$INSTALL_SYSTEM_NAME
CONF_FILE=app.conf
DB_FILE=app.db
NGINX_CONF_FILE=netalertx.conf
WEB_UI_DIR=/var/www/html/netalertx
NGINX_CONFIG_FILE=/etc/nginx/conf.d/$NGINX_CONF_FILE
OUI_FILE="/usr/share/arp-scan/ieee-oui.txt" # Define the path to ieee-oui.txt and ieee-iab.txt
SCRIPT_DIR="$(cd -- "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
FILEDB=${INSTALL_DIR}/db/${DB_FILE}
PHPVERSION="8.3"
VENV_DIR="/opt/netalertx-python"
GITHUB_REPO="https://github.com/jokob-sk/NetAlertX"
SYSTEMD_UNIT_FILE="/etc/systemd/system/netalertx.service"
SYSTEMD_UNIT_DEFAULTS="/etc/default/netalertx"
ALWAYS_FRESH_INSTALL=false # Set to true to always reset /config and /db on each install
# DO NOT CHANGE ANYTHING ABOVE THIS LINE!
# Check if script is run as root
if [[ $EUID -ne 0 ]]; then
echo "[INSTALL] This script must be run as root. Please use 'sudo'."
exit 1
fi
# Install sudo if not present
echo "---------------------------------------------------------"
echo "[INSTALL] Starting NetAlertX installation for Ubuntu"
echo "---------------------------------------------------------"
echo
apt-get update -y
echo "[INSTALL] Making sure sudo is installed"
apt-get install sudo -y
echo "---------------------------------------------------------"
echo "[INSTALL] Installing dependencies"
echo "---------------------------------------------------------"
echo
# Install core dependencies
apt-get install -y --no-install-recommends \
git \
tini ca-certificates curl libwww-perl perl apt-utils cron build-essential \
sqlite3 net-tools \
python3 python3-venv python3-dev python3-pip
# Install plugin dependencies
apt-get install -y --no-install-recommends \
dnsutils mtr arp-scan snmp iproute2 nmap zip usbutils traceroute nbtscan avahi-daemon avahi-utils
# nginx-core install nginx and nginx-common as dependencies
apt-get install -y --no-install-recommends \
nginx-core \
php${PHPVERSION} php${PHPVERSION}-sqlite3 php php-fpm php-cgi php${PHPVERSION}-fpm php-fpm php-sqlite3 php-curl php-cli
# make sure sqlite is activated
phpenmod -v ${PHPVERSION} sqlite3
echo "---------------------------------------------------------"
echo "[INSTALL] Stopping any NGINX web server and components"
echo " (There may be errors stopping services, that's OK)"
echo "---------------------------------------------------------"
echo
# stopping nginx for setup
systemctl stop nginx
# stopping netalertx for setup
systemctl stop netalertx
# in case and older setup is running, kill it
pkill -f "python ${INSTALL_DIR}/server"
# stopping php fpm
systemctl stop php${PHPVERSION}-fpm
echo "---------------------------------------------------------"
echo "[INSTALL] Downloading NetAlertX repository"
echo "---------------------------------------------------------"
echo
# Clean the directory, ask for confirmation
if [ -d "${INSTALL_DIR}" ]; then
echo "The installation directory exists. Removing it to ensure a clean install."
echo "Are you sure you want to continue? This will delete all existing files in ${INSTALL_DIR}."
echo "This will include ALL YOUR SETTINGS AND DATABASE! (if there are any)"
echo
echo "Type:"
echo " - 'install' to continue and DELETE ALL!"
echo " - 'update' to just update from GIT (keeps your db and settings)"
echo " - 'start' to do nothing, leave install as-is (just run the start script)"
if [ "$1" == "install" ] || [ "$1" == "update" ] || [ "$1" == "start" ]; then
confirmation=$1
else
read -p "Enter your choice: " confirmation
fi
if [ "$confirmation" == "install" ]; then
# Ensure INSTALL_DIR is safe to wipe
if [ -n "${INSTALL_DIR}" ] && [ "${INSTALL_DIR}" != "" ] && [ "${INSTALL_DIR}" != "/" ] && [ "${INSTALL_DIR}" != "." ] && [ -d "${INSTALL_DIR}" ]; then
echo "Removing existing installation..."
# Unmount only if mountpoints exist
mountpoint -q "${INSTALL_DIR}/api" && umount "${INSTALL_DIR}/api" 2>/dev/null
mountpoint -q "${INSTALL_DIR}/front" && umount "${INSTALL_DIR}/front" 2>/dev/null
# Remove all contents safely
rm -rf -- "${INSTALL_DIR}"/* "${INSTALL_DIR}"/.[!.]* "${INSTALL_DIR}"/..?* 2>/dev/null
# Re-clone repository
git clone "${GITHUB_REPO}" "${INSTALL_DIR}/"
else
echo "[INSTALL] INSTALL_DIR is not set, is root, or is invalid. Aborting for safety."
exit 1
fi
elif [ "$confirmation" == "update" ]; then
echo "[INSTALL] Updating the existing installation..."
cd "${INSTALL_DIR}" || { echo "[INSTALL] Failed to change directory to ${INSTALL_DIR}"; exit 1; }
# In case there were changes, stash them
git stash -q
git pull
echo "[INSTALL] If there were any local changes, they have been >>STASHED<<"
echo "[INSTALL] You can recover them with 'git stash pop' in ${INSTALL_DIR}"
echo
elif [ "$confirmation" == "start" ]; then
echo "[INSTALL] Continuing without changes."
else
echo "[INSTALL] Installation aborted."
exit 1
fi
else
git clone https://github.com/jokob-sk/NetAlertX "${INSTALL_DIR}/"
fi
echo "---------------------------------------------------------"
echo "[INSTALL] Setting up Python environment"
echo "---------------------------------------------------------"
echo
# update-alternatives --install /usr/bin/python python /usr/bin/python3 10
python3 -m venv "${VENV_DIR}"
source "${VENV_DIR}/bin/activate"
pip3 install -r "${INSTALLER_DIR}/requirements.txt" || {
echo "[INSTALL] Failed to install Python dependencies"
exit 1
}
# We now should have all dependencies and files in place
# We can now configure the web server and start the application
cd "${INSTALLER_DIR}" || { echo "[INSTALL] Failed to change directory to ${INSTALLER_DIR}"; exit 1; }
# Check for buildtimestamp.txt existence, otherwise create it
if [ ! -f "${INSTALL_DIR}/front/buildtimestamp.txt" ]; then
date +%s > "${INSTALL_DIR}/front/buildtimestamp.txt"
fi
# if custom variables not set we do not need to do anything
if [ -n "${TZ}" ]; then
FILECONF=${INSTALL_DIR}/config/${CONF_FILE}
if [ -f "$FILECONF" ]; then
sed -i -e "s|Europe/Berlin|${TZ}|g" "${INSTALL_DIR}/config/${CONF_FILE}"
else
sed -i -e "s|Europe/Berlin|${TZ}|g" "${INSTALL_DIR}/back/${CONF_FILE}.bak"
fi
fi
echo "---------------------------------------------------------"
echo "[INSTALL] Setting up the web server"
echo "---------------------------------------------------------"
echo
echo "[INSTALL] Updating the existing installation..."
# Remove default NGINX site if it is symlinked, or backup it otherwise
if [ -L /etc/nginx/sites-enabled/default ] ; then
echo "[INSTALL] Disabling default NGINX site, removing sym-link in /etc/nginx/sites-enabled"
rm /etc/nginx/sites-enabled/default
elif [ -f /etc/nginx/sites-enabled/default ]; then
echo "[INSTALL] Disabling default NGINX site, moving config to /etc/nginx/sites-available"
mv /etc/nginx/sites-enabled/default /etc/nginx/sites-available/default.bkp_netalertx
fi
# Clear existing directories and files
if [ -d $WEB_UI_DIR ]; then
echo "[INSTALL] Removing existing NetAlertX web-UI"
rm -R $WEB_UI_DIR
fi
echo "[INSTALL] Removing existing NetAlertX NGINX config"
rm "${NGINX_CONFIG_FILE}" 2>/dev/null || true
# create symbolic link to the install directory
ln -s ${INSTALL_DIR}/front $WEB_UI_DIR
# create symbolic link to NGINX configuration coming with NetAlertX
ln -s "${INSTALLER_DIR}/$NGINX_CONF_FILE" ${NGINX_CONFIG_FILE}
# Use user-supplied port if set
if [ -n "${PORT}" ]; then
echo "[INSTALL] Setting webserver to user-supplied port (${PORT})"
sed -i 's/listen 20211/listen '"${PORT}"'/g' "${NGINX_CONFIG_FILE}"
else
PORT=20211
fi
# Change web interface address if set
if [ -n "${LISTEN_ADDR}" ]; then
echo "[INSTALL] Setting webserver to user-supplied address (${LISTEN_ADDR})"
sed -i -e 's/listen /listen '"${LISTEN_ADDR}":'/g' "${NGINX_CONFIG_FILE}"
else
LISTEN_ADDR="0.0.0.0"
fi
# Change php version
echo "[INSTALL] Setting PHP version to ${PHPVERSION}"
sed -i 's#unix:/run/php/php8.3-fpm.sock#unix:/run/php/php'"${PHPVERSION}"'-fpm.sock#ig' ${NGINX_CONFIG_FILE}
# Run the hardware vendors update at least once
echo "[INSTALL] Run the hardware vendors update"
# Check if ieee-oui.txt or ieee-iab.txt exist
if [ -f "${OUI_FILE}" ]; then
echo "[INSTALL] The file ieee-oui.txt exists. Skipping update_vendors.sh..."
else
echo "[INSTALL] The file ieee-oui.txt does not exist. Running update_vendors..."
# Run the update_vendors.sh script
if [ -f "${INSTALL_DIR}/back/update_vendors.sh" ]; then
"${INSTALL_DIR}/back/update_vendors.sh"
else
echo "[INSTALL] update_vendors.sh script not found in ${INSTALL_DIR}."
fi
fi
echo "---------------------------------------------------------"
echo "[INSTALL] Create log and api mounts"
echo "---------------------------------------------------------"
echo
echo "[INSTALL] Cleaning up old mounts if any"
umount "${INSTALL_DIR}/log"
umount "${INSTALL_DIR}/api"
echo "[INSTALL] Creating log and api folders if they don't exist"
mkdir -p "${INSTALL_DIR}/log" "${INSTALL_DIR}/api"
echo "[INSTALL] Mounting log and api folders as tmpfs"
mount -t tmpfs -o noexec,nosuid,nodev tmpfs "${INSTALL_DIR}/log"
mount -t tmpfs -o noexec,nosuid,nodev tmpfs "${INSTALL_DIR}/api"
# Create log files if they don't exist
echo "[INSTALL] Creating log files if they don't exist"
touch "${INSTALL_DIR}"/log/{app.log,execution_queue.log,app_front.log,app.php_errors.log,stderr.log,stdout.log,db_is_locked.log}
touch "${INSTALL_DIR}"/api/user_notifications.json
# Create plugins sub-directory if it doesn't exist in case a custom log folder is used
mkdir -p "${INSTALL_DIR}"/log/plugins
# DANGER ZONE: ALWAYS_FRESH_INSTALL
if [ "${ALWAYS_FRESH_INSTALL}" = true ]; then
echo "[INSTALL] ❗ ALERT /db and /config folders are cleared because the ALWAYS_FRESH_INSTALL is set to: ${ALWAYS_FRESH_INSTALL}"
# Delete content of "/config/"
rm -rf "${INSTALL_DIR}/config/"*
# Delete content of "/db/"
rm -rf "${INSTALL_DIR}/db/"*
fi
echo "[INSTALL] Copy starter ${DB_FILE} and ${CONF_FILE} if they don't exist"
# Copy starter ${DB_FILE} and ${CONF_FILE} if they don't exist
cp -u "${INSTALL_DIR}/back/${CONF_FILE}" "${INSTALL_DIR}/config/${CONF_FILE}"
cp -u "${INSTALL_DIR}/back/${DB_FILE}" "$FILEDB"
echo "[INSTALL] Fixing permissions after copied starter config & DB"
if [ -f "$FILEDB" ]; then
chown -R www-data:www-data "$FILEDB"
fi
chown root:www-data "${INSTALL_DIR}"/api/user_notifications.json
chgrp -R www-data "${INSTALL_DIR}"
chmod -R u+rwx,g+rwx,o=rx "$WEB_UI_DIR"
chmod -R u+rwx,g+rwx,o=rx "${INSTALL_DIR}"
chmod -R u+rwX,g+rwX,o=rX "${INSTALL_DIR}/log"
chmod -R u+rwX,g+rwX,o=rX "${INSTALL_DIR}/config"
# Check if buildtimestamp.txt doesn't exist
if [ ! -f "${INSTALL_DIR}/front/buildtimestamp.txt" ]; then
# Create buildtimestamp.txt
date +%s > "${INSTALL_DIR}/front/buildtimestamp.txt"
fi
# start PHP and nginx
systemctl start php${PHPVERSION}-fpm || { echo "[INSTALL] Failed to start php${PHPVERSION}-fpm"; exit 1; }
nginx -t || { echo "[INSTALL] nginx config test failed"; exit 1; }
systemctl start nginx || { echo "[INSTALL] Failed to start nginx"; exit 1; }
echo "---------------------------------------------------------"
echo "[INSTALL] Installation complete"
echo "---------------------------------------------------------"
echo
# Export all variables to /etc/default/netalertx file for use by the systemd service
env_vars=( "INSTALL_SYSTEM_NAME" "INSTALLER_DIR" "INSTALL_DIR" "PHPVERSION" "VIRTUAL_ENV" "PATH" )
printf "" > "${SYSTEMD_UNIT_DEFAULTS}"
for var in "${env_vars[@]}"; do
echo "$var=${!var}" >> "${SYSTEMD_UNIT_DEFAULTS}"
done
echo "---------------------------------------------------------"
echo "[INSTALL] Starting netalertx service"
echo "---------------------------------------------------------"
echo
# Create systemd service
cp "${INSTALLER_DIR}/netalertx.service" "${SYSTEMD_UNIT_FILE}" || { echo "[INSTALL] Failed to copy systemd service file"; exit 1; }
# Adjust our path to the correct python in virtualenv
echo "[INSTALL] Setting up systemd unit"
sed -i 's|ExecStart=/usr/bin/python3|ExecStart='"${VIRTUAL_ENV}"'/bin/python3|ig' "/${SYSTEMD_UNIT_FILE}" || { echo "[INSTALL] Failed to setup systemd service file"; exit 1; }
systemctl daemon-reload || { echo "[INSTALL] Failed to reload systemd daemon"; exit 1; }
systemctl enable netalertx || { echo "[INSTALL] Failed to enable NetAlertX service"; exit 1; }
systemctl start netalertx || { echo "[INSTALL] Failed to start NetAlertX service"; exit 1; }
echo "[INSTALL] 🚀 Starting app - navigate to your <server IP>:${PORT}"

View File

@@ -1,106 +0,0 @@
#!/usr/bin/env bash
# 🛑 Important: This is only used for the bare-metal install 🛑
# Update /install/start.ubuntu.sh in most cases is preferred
echo "---------------------------------------------------------"
echo "[INSTALL] Starting NetAlertX installation for Ubuntu"
echo "---------------------------------------------------------"
echo
echo "This script will install NetAlertX on your Ubuntu system."
echo "It will clone the repository, set up necessary files, and start the application."
echo "Please ensure you have a stable internet connection."
echo "---------------------------------------------------------"
# Set environment variables
INSTALL_DIR=/app # Specify the installation directory here
INSTALL_SYSTEM_NAME=ubuntu24
INSTALLER_DIR=$INSTALL_DIR/install/$INSTALL_SYSTEM_NAME
# Check if script is run as root
if [[ $EUID -ne 0 ]]; then
echo "This script must be run as root. Please use 'sudo'."
exit 1
fi
# Prepare the environment
echo "Updating packages"
echo "-----------------"
apt-get update
echo "Making sure sudo is installed"
apt-get install sudo -y
# Install Git
echo "Installing Git"
apt-get install -y git
# Clean the directory, ask for confirmation
if [ -d "$INSTALL_DIR" ]; then
echo "The installation directory exists. Removing it to ensure a clean install."
echo "Are you sure you want to continue? This will delete all existing files in $INSTALL_DIR."
echo "This will include ALL YOUR SETTINGS AND DATABASE! (if there are any)"
echo
echo "Type:"
echo " - 'install' to continue and DELETE ALL!"
echo " - 'update' to just update from GIT (keeps your db and settings)"
echo " - 'start' to do nothing, leave install as-is (just run the start script)"
if [ "$1" == "install" ] || [ "$1" == "update" ] || [ "$1" == "start" ]; then
confirmation=$1
else
read -p "Enter your choice: " confirmation
fi
if [ "$confirmation" == "install" ]; then
# Ensure INSTALL_DIR is safe to wipe
if [ -n "$INSTALL_DIR" ] && [ "$INSTALL_DIR" != "" ] && [ "$INSTALL_DIR" != "/" ] && [ "$INSTALL_DIR" != "." ] && [ -d "$INSTALL_DIR" ]; then
echo "Removing existing installation..."
# Stop nginx if running
if command -v systemctl >/dev/null 2>&1 && systemctl list-units --type=service | grep -q nginx; then
systemctl stop nginx 2>/dev/null
elif command -v service >/dev/null 2>&1; then
service nginx stop 2>/dev/null
fi
# Kill running NetAlertX server processes in this INSTALL_DIR
pkill -f "python.*${INSTALL_DIR}/server" 2>/dev/null
# Unmount only if mountpoints exist
mountpoint -q "$INSTALL_DIR/api" && umount "$INSTALL_DIR/api" 2>/dev/null
mountpoint -q "$INSTALL_DIR/front" && umount "$INSTALL_DIR/front" 2>/dev/null
# Remove all contents safely
rm -rf -- "$INSTALL_DIR"/* "$INSTALL_DIR"/.[!.]* "$INSTALL_DIR"/..?* 2>/dev/null
# Re-clone repository
git clone https://github.com/jokob-sk/NetAlertX "$INSTALL_DIR/"
else
echo "INSTALL_DIR is not set, is root, or is invalid. Aborting for safety."
exit 1
fi
elif [ "$confirmation" == "update" ]; then
echo "Updating the existing installation..."
service nginx stop 2>/dev/null
pkill -f "python ${INSTALL_DIR}/server" 2>/dev/null
cd "$INSTALL_DIR" || { echo "Failed to change directory to $INSTALL_DIR"; exit 1; }
git pull
elif [ "$confirmation" == "start" ]; then
echo "Continuing without changes."
else
echo "Installation aborted."
exit 1
fi
else
git clone https://github.com/jokob-sk/NetAlertX "$INSTALL_DIR/"
fi
# Check for buildtimestamp.txt existence, otherwise create it
if [ ! -f "$INSTALL_DIR/front/buildtimestamp.txt" ]; then
date +%s > "$INSTALL_DIR/front/buildtimestamp.txt"
fi
# Start NetAlertX
# This is where we setup the virtual environment and install dependencies
cd "$INSTALLER_DIR" || { echo "Failed to change directory to $INSTALLER_DIR"; exit 1; }
chmod +x "$INSTALLER_DIR/start.$INSTALL_SYSTEM_NAME.sh"
"$INSTALLER_DIR/start.$INSTALL_SYSTEM_NAME.sh"

View File

@@ -0,0 +1,12 @@
[Unit]
Description=NetAlertX - Network, presence scanner and alert framework
[Service]
EnvironmentFile=/etc/default/netalertx
PassEnvironment=INSTALL_SYSTEM_NAME INSTALLER_DIR INSTALL_DIR PHPVERSION VIRTUAL_ENV PATH
ExecStart=/usr/bin/python3 "${INSTALL_DIR}/server"
Restart=on-failure
Type=simple
[Install]
WantedBy=multi-user.target

View File

@@ -0,0 +1,25 @@
openwrt-luci-rpc
asusrouter
aiohttp
graphene
flask
flask-cors
unifi-sm-api
tplink-omada-client
wakeonlan
pycryptodome
requests
paho-mqtt
scapy
cron-converter
pytz
json2table
dhcp-leases
pyunifi
speedtest-cli
chardet
python-nmap
dnspython
librouteros
yattag
git+https://github.com/foreign-sub/aiofreepybox.git

View File

@@ -1,227 +0,0 @@
#!/usr/bin/env bash
echo "---------------------------------------------------------"
echo "[INSTALL]"
echo "---------------------------------------------------------"
echo
echo "This script will set up and start NetAlertX on your Ubuntu24 system."
# Specify the installation directory here
INSTALL_DIR=/app
# DO NOT CHANGE ANYTHING BELOW THIS LINE!
INSTALL_SYSTEM_NAME=ubuntu24
INSTALLER_DIR=$INSTALL_DIR/install/$INSTALL_SYSTEM_NAME
CONF_FILE=app.conf
DB_FILE=app.db
NGINX_CONF_FILE=netalertx.conf
WEB_UI_DIR=/var/www/html/netalertx
NGINX_CONFIG_FILE=/etc/nginx/conf.d/$NGINX_CONF_FILE
OUI_FILE="/usr/share/arp-scan/ieee-oui.txt" # Define the path to ieee-oui.txt and ieee-iab.txt
INSTALL_PATH=$INSTALL_DIR
FILEDB=$INSTALL_PATH/db/$DB_FILE
PHPVERSION="8.3"
# DO NOT CHANGE ANYTHING ABOVE THIS LINE!
# if custom variables not set we do not need to do anything
if [ -n "${TZ}" ]; then
FILECONF=$INSTALL_PATH/config/$CONF_FILE
if [ -f "$FILECONF" ]; then
sed -i -e "s|Europe/Berlin|${TZ}|g" "$INSTALL_PATH/config/$CONF_FILE"
else
sed -i -e "s|Europe/Berlin|${TZ}|g" "$INSTALL_PATH/back/$CONF_FILE.bak"
fi
fi
# Check if script is run as root
if [[ $EUID -ne 0 ]]; then
echo "This script must be run as root. Please use 'sudo'."
exit 1
fi
echo "---------------------------------------------------------"
echo "[INSTALL] Installing dependencies"
echo "---------------------------------------------------------"
echo
# Install dependencies
apt-get install -y \
tini snmp ca-certificates curl libwww-perl arp-scan perl apt-utils cron \
sqlite3 dnsutils net-tools mtr \
python3 python3-dev iproute2 nmap python3-pip zip usbutils traceroute nbtscan avahi-daemon avahi-utils build-essential
# alternate dependencies
# nginx-core install nginx and nginx-common as dependencies
apt-get install nginx-core php${PHPVERSION} php${PHPVERSION}-sqlite3 php php-cgi php-fpm php-sqlite3 php-curl php-fpm php${PHPVERSION}-fpm php-cli -y
phpenmod -v ${PHPVERSION} sqlite3
update-alternatives --install /usr/bin/python python /usr/bin/python3 10
cd $INSTALLER_DIR || { echo "Failed to change directory to $INSTALLER_DIR"; exit 1; }
# setup virtual python environment so we can use pip3 to install packages
apt-get install python3-venv -y
python3 -m venv myenv
source myenv/bin/activate
# install packages thru pip3
pip3 install openwrt-luci-rpc asusrouter asyncio aiohttp graphene flask flask-cors unifi-sm-api tplink-omada-client wakeonlan pycryptodome requests paho-mqtt scapy cron-converter pytz json2table dhcp-leases pyunifi speedtest-cli chardet python-nmap dnspython librouteros yattag git+https://github.com/foreign-sub/aiofreepybox.git
echo "---------------------------------------------------------"
echo "[INSTALL] Installing NGINX and setting up the web server"
echo "---------------------------------------------------------"
echo
echo "[INSTALL] Stopping any NGINX web server"
service nginx stop 2>/dev/null
pkill -f "python ${INSTALL_DIR}/server" 2>/dev/null
echo "[INSTALL] Updating the existing installation..."
# Remove default NGINX site if it is symlinked, or backup it otherwise
if [ -L /etc/nginx/sites-enabled/default ] ; then
echo "[INSTALL] Disabling default NGINX site, removing sym-link in /etc/nginx/sites-enabled"
rm /etc/nginx/sites-enabled/default
elif [ -f /etc/nginx/sites-enabled/default ]; then
echo "[INSTALL] Disabling default NGINX site, moving config to /etc/nginx/sites-available"
mv /etc/nginx/sites-enabled/default /etc/nginx/sites-available/default.bkp_netalertx
fi
# Clear existing directories and files
if [ -d $WEB_UI_DIR ]; then
echo "[INSTALL] Removing existing NetAlertX web-UI"
rm -R $WEB_UI_DIR
fi
echo "[INSTALL] Removing existing NetAlertX NGINX config"
rm "$NGINX_CONFIG_FILE" 2>/dev/null || true
# create symbolic link to the install directory
ln -s $INSTALL_PATH/front $WEB_UI_DIR
# create symbolic link to NGINX configuration coming with NetAlertX
ln -s "${INSTALLER_DIR}/$NGINX_CONF_FILE" $NGINX_CONFIG_FILE
# Use user-supplied port if set
if [ -n "${PORT}" ]; then
echo "[INSTALL] Setting webserver to user-supplied port ($PORT)"
sed -i 's/listen 20211/listen '"$PORT"'/g' "$NGINX_CONFIG_FILE"
fi
# Change web interface address if set
if [ -n "${LISTEN_ADDR}" ]; then
echo "[INSTALL] Setting webserver to user-supplied address (${LISTEN_ADDR})"
sed -i -e 's/listen /listen '"${LISTEN_ADDR}":'/g' "$NGINX_CONFIG_FILE"
fi
# Change php version
echo "[INSTALL] Setting PHP version to ${PHPVERSION}"
sed -i 's#unix:/run/php/php8.3-fpm.sock#unix:/run/php/php'"${PHPVERSION}"'-fpm.sock#ig' $NGINX_CONFIG_FILE
# Run the hardware vendors update at least once
echo "[INSTALL] Run the hardware vendors update"
# Check if ieee-oui.txt or ieee-iab.txt exist
if [ -f "$OUI_FILE" ]; then
echo "[INSTALL] The file ieee-oui.txt exists. Skipping update_vendors.sh..."
else
echo "[INSTALL] The file ieee-oui.txt does not exist. Running update_vendors..."
# Run the update_vendors.sh script
if [ -f "${INSTALL_PATH}/back/update_vendors.sh" ]; then
"${INSTALL_PATH}/back/update_vendors.sh"
else
echo "[INSTALL] update_vendors.sh script not found in $INSTALL_DIR."
fi
fi
echo "---------------------------------------------------------"
echo "[INSTALL] Create log and api mounts"
echo "---------------------------------------------------------"
echo
echo "[INSTALL] Cleaning up old mounts if any"
umount "${INSTALL_DIR}/log"
umount "${INSTALL_DIR}/api"
echo "[INSTALL] Creating log and api folders if they don't exist"
mkdir -p "${INSTALL_DIR}/log" "${INSTALL_DIR}/api"
echo "[INSTALL] Mounting log and api folders as tmpfs"
mount -t tmpfs -o noexec,nosuid,nodev tmpfs "${INSTALL_DIR}/log"
mount -t tmpfs -o noexec,nosuid,nodev tmpfs "${INSTALL_DIR}/api"
# Create log files if they don't exist
echo "[INSTALL] Creating log files if they don't exist"
touch "${INSTALL_DIR}"/log/{app.log,execution_queue.log,app_front.log,app.php_errors.log,stderr.log,stdout.log,db_is_locked.log}
touch "${INSTALL_DIR}"/api/user_notifications.json
# Create plugins sub-directory if it doesn't exist in case a custom log folder is used
mkdir -p "${INSTALL_DIR}"/log/plugins
# Fixing file permissions
echo "[INSTALL] Fixing file permissions"
chown root:www-data "${INSTALL_DIR}"/api/user_notifications.json
echo "[INSTALL] Fixing WEB_UI_DIR: ${WEB_UI_DIR}"
chmod -R a+rwx $WEB_UI_DIR
echo "[INSTALL] Fixing INSTALL_DIR: ${INSTALL_DIR}"
chmod -R a+rw $INSTALL_PATH/log
chmod -R a+rwx $INSTALL_DIR
echo "[INSTALL] Copy starter $DB_FILE and $CONF_FILE if they don't exist"
# DANGER ZONE: ALWAYS_FRESH_INSTALL
if [ "$ALWAYS_FRESH_INSTALL" = true ]; then
echo "[INSTALL] ❗ ALERT /db and /config folders are cleared because the ALWAYS_FRESH_INSTALL is set to: ${ALWAYS_FRESH_INSTALL}"
# Delete content of "/config/"
rm -rf "${INSTALL_PATH}/config/"*
# Delete content of "/db/"
rm -rf "${INSTALL_PATH}/db/"*
fi
# Copy starter $DB_FILE and $CONF_FILE if they don't exist
cp -u "${INSTALL_PATH}/back/$CONF_FILE" "${INSTALL_PATH}/config/$CONF_FILE"
cp -u "${INSTALL_PATH}/back/$DB_FILE" "$FILEDB"
echo "[INSTALL] Fixing permissions after copied starter config & DB"
if [ -f "$FILEDB" ]; then
chown -R www-data:www-data $FILEDB
fi
chmod -R a+rwx $INSTALL_DIR # second time after we copied the files
chmod -R a+rw $INSTALL_PATH/config
chgrp -R www-data $INSTALL_PATH
# Check if buildtimestamp.txt doesn't exist
if [ ! -f "${INSTALL_PATH}/front/buildtimestamp.txt" ]; then
# Create buildtimestamp.txt
date +%s > "${INSTALL_PATH}/front/buildtimestamp.txt"
fi
# start PHP
/etc/init.d/php${PHPVERSION}-fpm start
nginx -t || { echo "[INSTALL] nginx config test failed"; exit 1; }
/etc/init.d/nginx start
# Activate the virtual python environment
source myenv/bin/activate
echo "[INSTALL] 🚀 Starting app - navigate to your <server IP>:${PORT}"
# Start the NetAlertX python script
# All error and console output being diverted to null,
# otherwise we can get critical errors re I/O
python "$INSTALL_PATH/server/" 2>/dev/null 1>/dev/null &

0
log/plugins/.git-placeholder Normal file → Executable file
View File

View File

@@ -74,6 +74,7 @@ nav:
- Environment Setup: DEV_ENV_SETUP.md
- Devcontainer: DEV_DEVCONTAINER.md
- Custom Plugins: PLUGINS_DEV.md
- Plugin Config: PLUGINS_DEV_CONFIG.md
- Frontend Development: FRONTEND_DEVELOPMENT.md
- Database: DATABASE.md
- Settings: SETTINGS_SYSTEM.md

View File

@@ -154,10 +154,10 @@ class SafeConditionBuilder:
"""
Parse a condition string into safe SQL with parameters.
This method handles basic patterns like:
- AND devName = 'value'
- AND devComments LIKE '%value%'
- AND eve_EventType IN ('type1', 'type2')
This method handles both single and compound conditions:
- Single: AND devName = 'value'
- Compound: AND devName = 'value' AND devVendor = 'Apple'
- Multiple clauses with AND/OR operators
Args:
condition: Condition string to parse
@@ -167,33 +167,245 @@ class SafeConditionBuilder:
"""
condition = condition.strip()
# Handle empty conditions
if not condition:
return "", {}
# Check if this is a compound condition (multiple clauses)
if self._is_compound_condition(condition):
return self._parse_compound_condition(condition)
# Single condition: extract leading logical operator if present
logical_op = None
clause_text = condition
# Check for leading AND
if condition.upper().startswith('AND ') or condition.upper().startswith('AND\t'):
logical_op = 'AND'
clause_text = condition[3:].strip()
# Check for leading OR
elif condition.upper().startswith('OR ') or condition.upper().startswith('OR\t'):
logical_op = 'OR'
clause_text = condition[2:].strip()
# Parse the single condition
return self._parse_single_condition(clause_text, logical_op)
def _is_compound_condition(self, condition: str) -> bool:
"""
Determine if a condition contains multiple clauses (compound condition).
A compound condition has multiple logical operators (AND/OR) connecting
separate comparison clauses.
Args:
condition: Condition string to check
Returns:
True if compound (multiple clauses), False if single clause
"""
# Track if we're inside quotes to avoid counting operators in quoted strings
in_quotes = False
logical_op_count = 0
i = 0
while i < len(condition):
char = condition[i]
# Toggle quote state
if char == "'":
in_quotes = not in_quotes
i += 1
continue
# Only count logical operators outside of quotes
if not in_quotes:
# Look for AND or OR as whole words
remaining = condition[i:].upper()
# Check for AND (must be word boundary)
if remaining.startswith('AND ') or remaining.startswith('AND\t'):
logical_op_count += 1
i += 3
continue
# Check for OR (must be word boundary)
if remaining.startswith('OR ') or remaining.startswith('OR\t'):
logical_op_count += 1
i += 2
continue
i += 1
# A compound condition has more than one logical operator
# (first AND/OR starts the condition, subsequent ones connect clauses)
return logical_op_count > 1
def _parse_compound_condition(self, condition: str) -> Tuple[str, Dict[str, Any]]:
"""
Parse a compound condition with multiple clauses.
Splits the condition into individual clauses, parses each one,
and reconstructs the full condition with all parameters.
Args:
condition: Compound condition string
Returns:
Tuple of (safe_sql_snippet, parameters_dict)
"""
# Split the condition into individual clauses while preserving logical operators
clauses = self._split_by_logical_operators(condition)
# Parse each clause individually
parsed_parts = []
all_params = {}
for clause_text, logical_op in clauses:
# Parse this single clause
sql_part, params = self._parse_single_condition(clause_text, logical_op)
if sql_part:
parsed_parts.append(sql_part)
all_params.update(params)
if not parsed_parts:
raise ValueError("No valid clauses found in compound condition")
# Join all parsed parts
final_sql = " ".join(parsed_parts)
return final_sql, all_params
def _split_by_logical_operators(self, condition: str) -> List[Tuple[str, Optional[str]]]:
"""
Split a compound condition into individual clauses.
Returns a list of tuples: (clause_text, logical_operator)
The logical operator is the AND/OR that precedes the clause.
Args:
condition: Compound condition string
Returns:
List of (clause_text, logical_op) tuples
"""
clauses = []
current_clause = []
current_logical_op = None
in_quotes = False
i = 0
while i < len(condition):
char = condition[i]
# Toggle quote state
if char == "'":
in_quotes = not in_quotes
current_clause.append(char)
i += 1
continue
# Only look for logical operators outside of quotes
if not in_quotes:
remaining = condition[i:].upper()
# Check if we're at a word boundary (start of string or after whitespace)
at_word_boundary = (i == 0 or condition[i-1] in ' \t')
# Check for AND (must be at word boundary)
if at_word_boundary and (remaining.startswith('AND ') or remaining.startswith('AND\t')):
# Save current clause if we have one
if current_clause:
clause_text = ''.join(current_clause).strip()
if clause_text:
clauses.append((clause_text, current_logical_op))
current_clause = []
# Set the logical operator for the next clause
current_logical_op = 'AND'
i += 3 # Skip 'AND'
# Skip whitespace after AND
while i < len(condition) and condition[i] in ' \t':
i += 1
continue
# Check for OR (must be at word boundary)
if at_word_boundary and (remaining.startswith('OR ') or remaining.startswith('OR\t')):
# Save current clause if we have one
if current_clause:
clause_text = ''.join(current_clause).strip()
if clause_text:
clauses.append((clause_text, current_logical_op))
current_clause = []
# Set the logical operator for the next clause
current_logical_op = 'OR'
i += 2 # Skip 'OR'
# Skip whitespace after OR
while i < len(condition) and condition[i] in ' \t':
i += 1
continue
# Add character to current clause
current_clause.append(char)
i += 1
# Don't forget the last clause
if current_clause:
clause_text = ''.join(current_clause).strip()
if clause_text:
clauses.append((clause_text, current_logical_op))
return clauses
def _parse_single_condition(self, condition: str, logical_op: Optional[str] = None) -> Tuple[str, Dict[str, Any]]:
"""
Parse a single condition clause into safe SQL with parameters.
This method handles basic patterns like:
- devName = 'value' (with optional AND/OR prefix)
- devComments LIKE '%value%'
- eve_EventType IN ('type1', 'type2')
Args:
condition: Single condition string to parse
logical_op: Optional logical operator (AND/OR) to prepend
Returns:
Tuple of (safe_sql_snippet, parameters_dict)
"""
condition = condition.strip()
# Handle empty conditions
if not condition:
return "", {}
# Simple pattern matching for common conditions
# Pattern 1: AND/OR column operator value (supporting Unicode in quoted strings)
pattern1 = r'^\s*(AND|OR)?\s+(\w+)\s+(=|!=|<>|<|>|<=|>=|LIKE|NOT\s+LIKE)\s+\'([^\']*)\'\s*$'
# Pattern 1: [AND/OR] column operator value (supporting Unicode in quoted strings)
pattern1 = r'^\s*(\w+)\s+(=|!=|<>|<|>|<=|>=|LIKE|NOT\s+LIKE)\s+\'([^\']*)\'\s*$'
match1 = re.match(pattern1, condition, re.IGNORECASE | re.UNICODE)
if match1:
logical_op, column, operator, value = match1.groups()
column, operator, value = match1.groups()
return self._build_simple_condition(logical_op, column, operator, value)
# Pattern 2: AND/OR column IN ('val1', 'val2', ...)
pattern2 = r'^\s*(AND|OR)?\s+(\w+)\s+(IN|NOT\s+IN)\s+\(([^)]+)\)\s*$'
# Pattern 2: [AND/OR] column IN ('val1', 'val2', ...)
pattern2 = r'^\s*(\w+)\s+(IN|NOT\s+IN)\s+\(([^)]+)\)\s*$'
match2 = re.match(pattern2, condition, re.IGNORECASE)
if match2:
logical_op, column, operator, values_str = match2.groups()
column, operator, values_str = match2.groups()
return self._build_in_condition(logical_op, column, operator, values_str)
# Pattern 3: AND/OR column IS NULL/IS NOT NULL
pattern3 = r'^\s*(AND|OR)?\s+(\w+)\s+(IS\s+NULL|IS\s+NOT\s+NULL)\s*$'
# Pattern 3: [AND/OR] column IS NULL/IS NOT NULL
pattern3 = r'^\s*(\w+)\s+(IS\s+NULL|IS\s+NOT\s+NULL)\s*$'
match3 = re.match(pattern3, condition, re.IGNORECASE)
if match3:
logical_op, column, operator = match3.groups()
column, operator = match3.groups()
return self._build_null_condition(logical_op, column, operator)
# If no patterns match, reject the condition for security

View File

@@ -157,7 +157,7 @@ def importConfigs (pm, db, all_plugins):
# ----------------------------------------
# ccd(key, default, config_dir, name, inputtype, options, group, events=[], desc = "", regex = "", setJsonMetadata = {}, overrideTemplate = {})
conf.LOADED_PLUGINS = ccd('LOADED_PLUGINS', [] , c_d, 'Loaded plugins', '{"dataType":"array","elements":[{"elementType":"select","elementOptions":[{"multiple":"true","ordeable":"true"}],"transformers":[]},{"elementType":"button","elementOptions":[{"sourceSuffixes":[]},{"separator":""},{"cssClasses":"col-xs-12"},{"onClick":"selectChange(this)"},{"getStringKey":"Gen_Change"}],"transformers":[]}]}', '[]', 'General')
conf.LOADED_PLUGINS = ccd('LOADED_PLUGINS', [] , c_d, 'Loaded plugins', '{"dataType":"array","elements":[{"elementType":"select","elementHasInputValue":1,"elementOptions":[{"multiple":"true","ordeable":"true"}],"transformers":[]},{"elementType":"button","elementOptions":[{"sourceSuffixes":[]},{"separator":""},{"cssClasses":"col-xs-12"},{"onClick":"selectChange(this)"},{"getStringKey":"Gen_Change"}],"transformers":[]}]}', '[]', 'General')
conf.DISCOVER_PLUGINS = ccd('DISCOVER_PLUGINS', True , c_d, 'Discover plugins', """{"dataType": "boolean","elements": [{"elementType": "input","elementOptions": [{ "type": "checkbox" }],"transformers": []}]}""", '[]', 'General')
conf.SCAN_SUBNETS = ccd('SCAN_SUBNETS', ['192.168.1.0/24 --interface=eth1', '192.168.1.0/24 --interface=eth0'] , c_d, 'Subnets to scan', '''{"dataType": "array","elements": [{"elementType": "input","elementOptions": [{"placeholder": "192.168.1.0/24 --interface=eth1"},{"suffix": "_in"},{"cssClasses": "col-sm-10"},{"prefillValue": "null"}],"transformers": []},{"elementType": "button","elementOptions": [{"sourceSuffixes": ["_in"]},{"separator": ""},{"cssClasses": "col-xs-12"},{"onClick": "addList(this, false)"},{"getStringKey": "Gen_Add"}],"transformers": []},{"elementType": "select","elementHasInputValue": 1,"elementOptions": [{"multiple": "true"},{"readonly": "true"},{"editable": "true"}],"transformers": []},{"elementType": "button","elementOptions": [{"sourceSuffixes": []},{"separator": ""},{"cssClasses": "col-xs-6"},{"onClick": "removeAllOptions(this)"},{"getStringKey": "Gen_Remove_All"}],"transformers": []},{"elementType": "button","elementOptions": [{"sourceSuffixes": []},{"separator": ""},{"cssClasses": "col-xs-6"},{"onClick": "removeFromList(this)"},{"getStringKey": "Gen_Remove_Last"}],"transformers": []}]}''', '[]', 'General')
conf.LOG_LEVEL = ccd('LOG_LEVEL', 'verbose' , c_d, 'Log verboseness', '{"dataType":"string", "elements": [{"elementType" : "select", "elementOptions" : [] ,"transformers": []}]}', "['none', 'minimal', 'verbose', 'debug', 'trace']", 'General')

View File

@@ -202,18 +202,25 @@ def get_plugins_configs(loadAll):
# Construct the path to the config.json file within the plugin folder
config_path = os.path.join(pluginsPath, d, "config.json")
plugJson = json.loads(get_file_content(config_path))
try:
plugJson = json.loads(get_file_content(config_path))
# Only load plugin if needed
# Fetch the list of enabled plugins from the config, default to an empty list if not set
enabledPlugins = getattr(conf, "LOADED_PLUGINS", [])
# Only load plugin if needed
# Fetch the list of enabled plugins from the config, default to an empty list if not set
enabledPlugins = getattr(conf, "LOADED_PLUGINS", [])
# Load all plugins if `loadAll` is True, the plugin is in the enabled list,
# or no specific plugins are enabled (enabledPlugins is empty)
if loadAll or plugJson["unique_prefix"] in enabledPlugins or enabledPlugins == []:
# Load all plugins if `loadAll` is True, the plugin is in the enabled list,
# or no specific plugins are enabled (enabledPlugins is empty)
if loadAll or plugJson["unique_prefix"] in enabledPlugins or enabledPlugins == []:
# Load the contents of the config.json file as a JSON object and append it to pluginsList
pluginsList.append(plugJson)
# Load the contents of the config.json file as a JSON object and append it to pluginsList
pluginsList.append(plugJson)
except (FileNotFoundError, json.JSONDecodeError) as e:
# Handle the case when the file is not found or JSON decoding fails
mylog('none', [f'[{module_name}] ⚠ ERROR - JSONDecodeError or FileNotFoundError for file {config_path}'])
except Exception as e:
mylog('none', [f'[{module_name}] ⚠ ERROR - Exception for file {config_path}: {str(e)}'])
# Sort pluginsList based on "execution_order"
pluginsListSorted = sorted(pluginsList, key=get_layer)

View File

@@ -0,0 +1,326 @@
"""
Unit tests for SafeConditionBuilder compound condition parsing.
Tests the fix for Issue #1210 - compound conditions with multiple AND/OR clauses.
"""
import sys
import unittest
from unittest.mock import MagicMock
# Mock the logger module before importing SafeConditionBuilder
sys.modules['logger'] = MagicMock()
# Add parent directory to path for imports
sys.path.insert(0, '/tmp/netalertx_hotfix/server/db')
from sql_safe_builder import SafeConditionBuilder
class TestCompoundConditions(unittest.TestCase):
"""Test compound condition parsing functionality."""
def setUp(self):
"""Create a fresh builder instance for each test."""
self.builder = SafeConditionBuilder()
def test_user_failing_filter_six_and_clauses(self):
"""Test the exact user-reported failing filter from Issue #1210."""
condition = (
"AND devLastIP NOT LIKE '192.168.50.%' "
"AND devLastIP NOT LIKE '192.168.60.%' "
"AND devLastIP NOT LIKE '192.168.70.2' "
"AND devLastIP NOT LIKE '192.168.70.5' "
"AND devLastIP NOT LIKE '192.168.70.3' "
"AND devLastIP NOT LIKE '192.168.70.4'"
)
sql, params = self.builder.build_safe_condition(condition)
# Should successfully parse
self.assertIsNotNone(sql)
self.assertIsNotNone(params)
# Should have 6 parameters (one per clause)
self.assertEqual(len(params), 6)
# Should contain all 6 AND operators
self.assertEqual(sql.count('AND'), 6)
# Should contain all 6 NOT LIKE operators
self.assertEqual(sql.count('NOT LIKE'), 6)
# Should have 6 parameter placeholders
self.assertEqual(sql.count(':param_'), 6)
# Verify all IP patterns are in parameters
param_values = list(params.values())
self.assertIn('192.168.50.%', param_values)
self.assertIn('192.168.60.%', param_values)
self.assertIn('192.168.70.2', param_values)
self.assertIn('192.168.70.5', param_values)
self.assertIn('192.168.70.3', param_values)
self.assertIn('192.168.70.4', param_values)
def test_multiple_and_clauses_simple(self):
"""Test multiple AND clauses with simple equality operators."""
condition = "AND devName = 'Device1' AND devVendor = 'Apple' AND devFavorite = '1'"
sql, params = self.builder.build_safe_condition(condition)
# Should have 3 parameters
self.assertEqual(len(params), 3)
# Should have 3 AND operators
self.assertEqual(sql.count('AND'), 3)
# Verify all values are parameterized
param_values = list(params.values())
self.assertIn('Device1', param_values)
self.assertIn('Apple', param_values)
self.assertIn('1', param_values)
def test_multiple_or_clauses(self):
"""Test multiple OR clauses."""
condition = "OR devName = 'Device1' OR devName = 'Device2' OR devName = 'Device3'"
sql, params = self.builder.build_safe_condition(condition)
# Should have 3 parameters
self.assertEqual(len(params), 3)
# Should have 3 OR operators
self.assertEqual(sql.count('OR'), 3)
# Verify all device names are parameterized
param_values = list(params.values())
self.assertIn('Device1', param_values)
self.assertIn('Device2', param_values)
self.assertIn('Device3', param_values)
def test_mixed_and_or_clauses(self):
"""Test mixed AND/OR logical operators."""
condition = "AND devName = 'Device1' OR devName = 'Device2' AND devFavorite = '1'"
sql, params = self.builder.build_safe_condition(condition)
# Should have 3 parameters
self.assertEqual(len(params), 3)
# Should preserve the logical operator order
self.assertIn('AND', sql)
self.assertIn('OR', sql)
# Verify all values are parameterized
param_values = list(params.values())
self.assertIn('Device1', param_values)
self.assertIn('Device2', param_values)
self.assertIn('1', param_values)
def test_single_condition_backward_compatibility(self):
"""Test that single conditions still work (backward compatibility)."""
condition = "AND devName = 'TestDevice'"
sql, params = self.builder.build_safe_condition(condition)
# Should have 1 parameter
self.assertEqual(len(params), 1)
# Should match expected format
self.assertIn('AND devName = :param_', sql)
# Parameter should contain the value
self.assertIn('TestDevice', params.values())
def test_single_condition_like_operator(self):
"""Test single LIKE condition for backward compatibility."""
condition = "AND devComments LIKE '%important%'"
sql, params = self.builder.build_safe_condition(condition)
# Should have 1 parameter
self.assertEqual(len(params), 1)
# Should contain LIKE operator
self.assertIn('LIKE', sql)
# Parameter should contain the pattern
self.assertIn('%important%', params.values())
def test_compound_with_like_patterns(self):
"""Test compound conditions with LIKE patterns."""
condition = "AND devLastIP LIKE '192.168.%' AND devVendor LIKE '%Apple%'"
sql, params = self.builder.build_safe_condition(condition)
# Should have 2 parameters
self.assertEqual(len(params), 2)
# Should have 2 LIKE operators
self.assertEqual(sql.count('LIKE'), 2)
# Verify patterns are parameterized
param_values = list(params.values())
self.assertIn('192.168.%', param_values)
self.assertIn('%Apple%', param_values)
def test_compound_with_inequality_operators(self):
"""Test compound conditions with various inequality operators."""
condition = "AND eve_DateTime > '2024-01-01' AND eve_DateTime < '2024-12-31'"
sql, params = self.builder.build_safe_condition(condition)
# Should have 2 parameters
self.assertEqual(len(params), 2)
# Should have both operators
self.assertIn('>', sql)
self.assertIn('<', sql)
# Verify dates are parameterized
param_values = list(params.values())
self.assertIn('2024-01-01', param_values)
self.assertIn('2024-12-31', param_values)
def test_empty_condition(self):
"""Test empty condition string."""
condition = ""
sql, params = self.builder.build_safe_condition(condition)
# Should return empty results
self.assertEqual(sql, "")
self.assertEqual(params, {})
def test_whitespace_only_condition(self):
"""Test condition with only whitespace."""
condition = " \t\n "
sql, params = self.builder.build_safe_condition(condition)
# Should return empty results
self.assertEqual(sql, "")
self.assertEqual(params, {})
def test_invalid_column_name_rejected(self):
"""Test that invalid column names are rejected."""
condition = "AND malicious_column = 'value'"
with self.assertRaises(ValueError):
self.builder.build_safe_condition(condition)
def test_invalid_operator_rejected(self):
"""Test that invalid operators are rejected."""
condition = "AND devName EXECUTE 'DROP TABLE'"
with self.assertRaises(ValueError):
self.builder.build_safe_condition(condition)
def test_sql_injection_attempt_blocked(self):
"""Test that SQL injection attempts are blocked."""
condition = "AND devName = 'value'; DROP TABLE devices; --"
# Should either reject or sanitize the dangerous input
# The semicolon and comment should not appear in the final SQL
try:
sql, params = self.builder.build_safe_condition(condition)
# If it doesn't raise an error, it should sanitize the input
self.assertNotIn('DROP', sql.upper())
self.assertNotIn(';', sql)
except ValueError:
# Rejection is also acceptable
pass
def test_quoted_string_with_spaces(self):
"""Test that quoted strings with spaces are handled correctly."""
condition = "AND devName = 'My Device Name' AND devComments = 'Has spaces here'"
sql, params = self.builder.build_safe_condition(condition)
# Should have 2 parameters
self.assertEqual(len(params), 2)
# Verify values with spaces are preserved
param_values = list(params.values())
self.assertIn('My Device Name', param_values)
self.assertIn('Has spaces here', param_values)
def test_compound_condition_with_not_equal(self):
"""Test compound conditions with != operator."""
condition = "AND devName != 'Device1' AND devVendor != 'Unknown'"
sql, params = self.builder.build_safe_condition(condition)
# Should have 2 parameters
self.assertEqual(len(params), 2)
# Should have != operators (or converted to <>)
self.assertTrue('!=' in sql or '<>' in sql)
# Verify values are parameterized
param_values = list(params.values())
self.assertIn('Device1', param_values)
self.assertIn('Unknown', param_values)
def test_very_long_compound_condition(self):
"""Test handling of very long compound conditions (10+ clauses)."""
clauses = []
for i in range(10):
clauses.append(f"AND devName != 'Device{i}'")
condition = " ".join(clauses)
sql, params = self.builder.build_safe_condition(condition)
# Should have 10 parameters
self.assertEqual(len(params), 10)
# Should have 10 AND operators
self.assertEqual(sql.count('AND'), 10)
# Verify all device names are parameterized
param_values = list(params.values())
for i in range(10):
self.assertIn(f'Device{i}', param_values)
class TestParameterGeneration(unittest.TestCase):
"""Test parameter generation and naming."""
def setUp(self):
"""Create a fresh builder instance for each test."""
self.builder = SafeConditionBuilder()
def test_parameters_have_unique_names(self):
"""Test that all parameters get unique names."""
condition = "AND devName = 'A' AND devName = 'B' AND devName = 'C'"
sql, params = self.builder.build_safe_condition(condition)
# All parameter names should be unique
param_names = list(params.keys())
self.assertEqual(len(param_names), len(set(param_names)))
def test_parameter_values_match_condition(self):
"""Test that parameter values correctly match the condition values."""
condition = "AND devLastIP NOT LIKE '192.168.1.%' AND devLastIP NOT LIKE '10.0.0.%'"
sql, params = self.builder.build_safe_condition(condition)
# Should have exactly the values from the condition
param_values = sorted(params.values())
expected_values = sorted(['192.168.1.%', '10.0.0.%'])
self.assertEqual(param_values, expected_values)
def test_parameters_referenced_in_sql(self):
"""Test that all parameters are actually referenced in the SQL."""
condition = "AND devName = 'Device1' AND devVendor = 'Apple'"
sql, params = self.builder.build_safe_condition(condition)
# Every parameter should appear in the SQL
for param_name in params.keys():
self.assertIn(f':{param_name}', sql)
if __name__ == '__main__':
unittest.main()