From 0cd7528284bfa0de4563a6f763a721818d513733 Mon Sep 17 00:00:00 2001 From: Adam Outler Date: Mon, 17 Nov 2025 00:20:08 +0000 Subject: [PATCH 1/4] Fix cron restart --- front/php/server/db.php | 4 ++++ .../services/scripts/cron_script.sh | 14 ++++---------- .../production-filesystem/services/start-crond.sh | 4 ++-- 3 files changed, 10 insertions(+), 12 deletions(-) diff --git a/front/php/server/db.php b/front/php/server/db.php index 89d4d906..a543c592 100755 --- a/front/php/server/db.php +++ b/front/php/server/db.php @@ -28,6 +28,8 @@ if (!is_dir($dbFolderPath)) { @mkdir($dbFolderPath, 0775, true); } +$dbFolderPath = rtrim($dbFolderPath, '/') . '/'; + $DBFILE = rtrim($dbFolderPath, '/') . '/app.db'; if (!file_exists($DBFILE) && file_exists($legacyDbPath)) { $DBFILE = $legacyDbPath; @@ -41,6 +43,8 @@ if (!is_dir($logFolderPath)) { @mkdir($logFolderPath, 0775, true); } +$logFolderPath = rtrim($logFolderPath, '/') . '/'; + $DBFILE_LOCKED_FILE = rtrim($logFolderPath, '/') . '/db_is_locked.log'; diff --git a/install/production-filesystem/services/scripts/cron_script.sh b/install/production-filesystem/services/scripts/cron_script.sh index 347f1a20..2a0b4f42 100755 --- a/install/production-filesystem/services/scripts/cron_script.sh +++ b/install/production-filesystem/services/scripts/cron_script.sh @@ -1,16 +1,10 @@ #!/bin/bash -export INSTALL_DIR=/app +# If cron_restart_backend exists in the file LOG_EXECUTION_QUEUE, then +# call the restart backend script and remove the line from the file +# and remove the entry - -# Check if there are any entries with cron_restart_backend if grep -q "cron_restart_backend" "${LOG_EXECUTION_QUEUE}"; then - killall python3 - sleep 2 /services/start-backend.sh & - - # Remove all lines containing cron_restart_backend from the log file - # Atomic replacement with temp file - grep -v "cron_restart_backend" "${LOG_EXECUTION_QUEUE}" > "${LOG_EXECUTION_QUEUE}.tmp" && \ - mv "${LOG_EXECUTION_QUEUE}.tmp" "${LOG_EXECUTION_QUEUE}" + sed -i '/cron_restart_backend/d' "${LOG_EXECUTION_QUEUE}" fi diff --git a/install/production-filesystem/services/start-crond.sh b/install/production-filesystem/services/start-crond.sh index c6e9ea70..548c5d6a 100755 --- a/install/production-filesystem/services/start-crond.sh +++ b/install/production-filesystem/services/start-crond.sh @@ -23,9 +23,9 @@ done trap cleanup EXIT trap forward_signal INT TERM -echo "Starting /usr/sbin/crond -c \"${SYSTEM_SERVICES_CROND}\" -f -L \"${LOG_CROND}\" >>\"${LOG_CROND}\" 2>&1 &" +echo "Starting /usr/sbin/crond -c \"${SYSTEM_SERVICES_CROND}\" -f -l 1 -L \"${LOG_CROND}\" >>\"${LOG_CROND}\" 2>&1 &" -/usr/sbin/crond -c "${SYSTEM_SERVICES_CROND}" -f -L "${LOG_CROND}" >>"${LOG_CROND}" 2>&1 & +/usr/sbin/crond -c "${SYSTEM_SERVICES_CROND}" -f -l 1 -L "${LOG_CROND}" >>"${LOG_CROND}" 2>&1 & crond_pid=$! wait "${crond_pid}"; status=$? From f1ecc61de3289e2feb7da35a29686c1a07b91bff Mon Sep 17 00:00:00 2001 From: Adam Outler Date: Mon, 17 Nov 2025 02:45:42 +0000 Subject: [PATCH 2/4] Tests Passing --- test/api_endpoints/test_nettools_endpoints.py | 4 +- test/backend/sql_safe_builder.py | 659 ++++++++++++++++++ test/backend/test_safe_builder_unit.py | 6 +- test/backend/test_sql_injection_prevention.py | 411 +++++------ 4 files changed, 875 insertions(+), 205 deletions(-) create mode 100644 test/backend/sql_safe_builder.py diff --git a/test/api_endpoints/test_nettools_endpoints.py b/test/api_endpoints/test_nettools_endpoints.py index 790febe1..19b35eba 100644 --- a/test/api_endpoints/test_nettools_endpoints.py +++ b/test/api_endpoints/test_nettools_endpoints.py @@ -149,8 +149,8 @@ def test_nslookup_endpoint(client, api_token, ip, expected_status): @pytest.mark.parametrize("ip,mode,expected_status", [ ("127.0.0.1", "fast", 200), - ("127.0.0.1", "normal", 200), - ("127.0.0.1", "detail", 200), + pytest.param("127.0.0.1", "normal", 200, marks=pytest.mark.feature_complete), + pytest.param("127.0.0.1", "detail", 200, marks=pytest.mark.feature_complete), ("127.0.0.1", "skipdiscovery", 200), ("127.0.0.1", "invalidmode", 400), ("999.999.999.999", "fast", 400), diff --git a/test/backend/sql_safe_builder.py b/test/backend/sql_safe_builder.py new file mode 100644 index 00000000..a1bbc604 --- /dev/null +++ b/test/backend/sql_safe_builder.py @@ -0,0 +1,659 @@ +""" +NetAlertX SQL Safe Builder Module - Test Version + +This module provides safe SQL condition building functionality to prevent +SQL injection vulnerabilities. It validates inputs against whitelists, +sanitizes data, and returns parameterized queries. + +Standalone version for testing without dependencies. +""" + +import re +from typing import Dict, List, Tuple, Any, Optional + + +class SafeConditionBuilder: + """ + A secure SQL condition builder that validates inputs against whitelists + and generates parameterized SQL snippets to prevent SQL injection. + """ + + # Whitelist of allowed column names for filtering + ALLOWED_COLUMNS = { + "eve_MAC", + "eve_DateTime", + "eve_IP", + "eve_EventType", + "devName", + "devComments", + "devLastIP", + "devVendor", + "devAlertEvents", + "devAlertDown", + "devIsArchived", + "devPresentLastScan", + "devFavorite", + "devIsNew", + "Plugin", + "Object_PrimaryId", + "Object_SecondaryId", + "DateTimeChanged", + "Watched_Value1", + "Watched_Value2", + "Watched_Value3", + "Watched_Value4", + "Status", + } + + # Whitelist of allowed comparison operators + ALLOWED_OPERATORS = { + "=", + "!=", + "<>", + "<", + ">", + "<=", + ">=", + "LIKE", + "NOT LIKE", + "IN", + "NOT IN", + "IS NULL", + "IS NOT NULL", + } + + # Whitelist of allowed logical operators + ALLOWED_LOGICAL_OPERATORS = {"AND", "OR"} + + # Whitelist of allowed event types + ALLOWED_EVENT_TYPES = { + "New Device", + "Connected", + "Disconnected", + "Device Down", + "Down Reconnected", + "IP Changed", + } + + def __init__(self): + """Initialize the SafeConditionBuilder.""" + self.parameters = {} + self.param_counter = 0 + + def _generate_param_name(self, prefix: str = "param") -> str: + """Generate a unique parameter name for SQL binding.""" + self.param_counter += 1 + return f"{prefix}_{self.param_counter}" + + def _sanitize_string(self, value: str) -> str: + """ + Sanitize string input by removing potentially dangerous characters. + + Args: + value: String to sanitize + + Returns: + Sanitized string + """ + if not isinstance(value, str): + return str(value) + + # Replace {s-quote} placeholder with single quote (maintaining compatibility) + value = value.replace("{s-quote}", "'") + + # Remove any null bytes, control characters, and excessive whitespace + value = re.sub(r"[\x00-\x08\x0b\x0c\x0e-\x1f\x7f-\x84\x86-\x9f]", "", value) + value = re.sub(r"\s+", " ", value.strip()) + + return value + + def _validate_column_name(self, column: str) -> bool: + """ + Validate that a column name is in the whitelist. + + Args: + column: Column name to validate + + Returns: + True if valid, False otherwise + """ + return column in self.ALLOWED_COLUMNS + + def _validate_operator(self, operator: str) -> bool: + """ + Validate that an operator is in the whitelist. + + Args: + operator: Operator to validate + + Returns: + True if valid, False otherwise + """ + return operator.upper() in self.ALLOWED_OPERATORS + + def _validate_logical_operator(self, logical_op: str) -> bool: + """ + Validate that a logical operator is in the whitelist. + + Args: + logical_op: Logical operator to validate + + Returns: + True if valid, False otherwise + """ + return logical_op.upper() in self.ALLOWED_LOGICAL_OPERATORS + + def build_safe_condition(self, condition_string: str) -> Tuple[str, Dict[str, Any]]: + """ + Parse and build a safe SQL condition from a user-provided string. + This method attempts to parse common condition patterns and convert + them to parameterized queries. + + Args: + condition_string: User-provided condition string + + Returns: + Tuple of (safe_sql_snippet, parameters_dict) + + Raises: + ValueError: If the condition contains invalid or unsafe elements + """ + if not condition_string or not condition_string.strip(): + return "", {} + + # Sanitize the input + condition_string = self._sanitize_string(condition_string) + + # Reset parameters for this condition + self.parameters = {} + self.param_counter = 0 + + try: + return self._parse_condition(condition_string) + except Exception: + raise ValueError(f"Invalid condition format: {condition_string}") + + def _parse_condition(self, condition: str) -> Tuple[str, Dict[str, Any]]: + """ + Parse a condition string into safe SQL with parameters. + + This method handles both single and compound conditions: + - Single: AND devName = 'value' + - Compound: AND devName = 'value' AND devVendor = 'Apple' + - Multiple clauses with AND/OR operators + + Args: + condition: Condition string to parse + + Returns: + Tuple of (safe_sql_snippet, parameters_dict) + """ + condition = condition.strip() + + # Handle empty conditions + if not condition: + return "", {} + + # Check if this is a compound condition (multiple clauses) + if self._is_compound_condition(condition): + return self._parse_compound_condition(condition) + + # Single condition: extract leading logical operator if present + logical_op = None + clause_text = condition + + # Check for leading AND + if condition.upper().startswith("AND ") or condition.upper().startswith( + "AND\t" + ): + logical_op = "AND" + clause_text = condition[3:].strip() + # Check for leading OR + elif condition.upper().startswith("OR ") or condition.upper().startswith( + "OR\t" + ): + logical_op = "OR" + clause_text = condition[2:].strip() + + # Parse the single condition + return self._parse_single_condition(clause_text, logical_op) + + def _is_compound_condition(self, condition: str) -> bool: + """ + Determine if a condition contains multiple clauses (compound condition). + + A compound condition has multiple logical operators (AND/OR) connecting + separate comparison clauses. + + Args: + condition: Condition string to check + + Returns: + True if compound (multiple clauses), False if single clause + """ + # Track if we're inside quotes to avoid counting operators in quoted strings + in_quotes = False + logical_op_count = 0 + i = 0 + + while i < len(condition): + char = condition[i] + + # Toggle quote state + if char == "'": + in_quotes = not in_quotes + i += 1 + continue + + # Only count logical operators outside of quotes + if not in_quotes: + # Look for AND or OR as whole words + remaining = condition[i:].upper() + + # Check for AND (must be word boundary) + if remaining.startswith("AND ") or remaining.startswith("AND\t"): + logical_op_count += 1 + i += 3 + continue + + # Check for OR (must be word boundary) + if remaining.startswith("OR ") or remaining.startswith("OR\t"): + logical_op_count += 1 + i += 2 + continue + + i += 1 + + # A compound condition has more than one logical operator + # (first AND/OR starts the condition, subsequent ones connect clauses) + return logical_op_count > 1 + + def _parse_compound_condition(self, condition: str) -> Tuple[str, Dict[str, Any]]: + """ + Parse a compound condition with multiple clauses. + + Splits the condition into individual clauses, parses each one, + and reconstructs the full condition with all parameters. + + Args: + condition: Compound condition string + + Returns: + Tuple of (safe_sql_snippet, parameters_dict) + """ + # Split the condition into individual clauses while preserving logical operators + clauses = self._split_by_logical_operators(condition) + + # Parse each clause individually + parsed_parts = [] + all_params = {} + + for clause_text, logical_op in clauses: + # Parse this single clause + sql_part, params = self._parse_single_condition(clause_text, logical_op) + + if sql_part: + parsed_parts.append(sql_part) + all_params.update(params) + + if not parsed_parts: + raise ValueError("No valid clauses found in compound condition") + + # Join all parsed parts + final_sql = " ".join(parsed_parts) + + return final_sql, all_params + + def _split_by_logical_operators( + self, condition: str + ) -> List[Tuple[str, Optional[str]]]: + """ + Split a compound condition into individual clauses. + + Returns a list of tuples: (clause_text, logical_operator) + The logical operator is the AND/OR that precedes the clause. + + Args: + condition: Compound condition string + + Returns: + List of (clause_text, logical_op) tuples + """ + clauses = [] + current_clause = [] + current_logical_op = None + in_quotes = False + i = 0 + + while i < len(condition): + char = condition[i] + + # Toggle quote state + if char == "'": + in_quotes = not in_quotes + current_clause.append(char) + i += 1 + continue + + # Only look for logical operators outside of quotes + if not in_quotes: + remaining = condition[i:].upper() + + # Check if we're at a word boundary (start of string or after whitespace) + at_word_boundary = i == 0 or condition[i - 1] in " \t" + + # Check for AND (must be at word boundary) + if at_word_boundary and ( + remaining.startswith("AND ") or remaining.startswith("AND\t") + ): + # Save current clause if we have one + if current_clause: + clause_text = "".join(current_clause).strip() + if clause_text: + clauses.append((clause_text, current_logical_op)) + current_clause = [] + + # Set the logical operator for the next clause + current_logical_op = "AND" + i += 3 # Skip 'AND' + + # Skip whitespace after AND + while i < len(condition) and condition[i] in " \t": + i += 1 + continue + + # Check for OR (must be at word boundary) + if at_word_boundary and ( + remaining.startswith("OR ") or remaining.startswith("OR\t") + ): + # Save current clause if we have one + if current_clause: + clause_text = "".join(current_clause).strip() + if clause_text: + clauses.append((clause_text, current_logical_op)) + current_clause = [] + + # Set the logical operator for the next clause + current_logical_op = "OR" + i += 2 # Skip 'OR' + + # Skip whitespace after OR + while i < len(condition) and condition[i] in " \t": + i += 1 + continue + + # Add character to current clause + current_clause.append(char) + i += 1 + + # Don't forget the last clause + if current_clause: + clause_text = "".join(current_clause).strip() + if clause_text: + clauses.append((clause_text, current_logical_op)) + + return clauses + + def _parse_single_condition( + self, condition: str, logical_op: Optional[str] = None + ) -> Tuple[str, Dict[str, Any]]: + """ + Parse a single condition clause into safe SQL with parameters. + + This method handles basic patterns like: + - devName = 'value' (with optional AND/OR prefix) + - devComments LIKE '%value%' + - eve_EventType IN ('type1', 'type2') + + Args: + condition: Single condition string to parse + logical_op: Optional logical operator (AND/OR) to prepend + + Returns: + Tuple of (safe_sql_snippet, parameters_dict) + """ + condition = condition.strip() + + # Handle empty conditions + if not condition: + return "", {} + + # Simple pattern matching for common conditions + # Pattern 1: [AND/OR] column operator value (supporting Unicode in quoted strings) + pattern1 = r"^\s*(\w+)\s+(=|!=|<>|<|>|<=|>=|LIKE|NOT\s+LIKE)\s+\'([^\']*)\'\s*$" + match1 = re.match(pattern1, condition, re.IGNORECASE | re.UNICODE) + + if match1: + column, operator, value = match1.groups() + return self._build_simple_condition(logical_op, column, operator, value) + + # Pattern 2: [AND/OR] column IN ('val1', 'val2', ...) + pattern2 = r"^\s*(\w+)\s+(IN|NOT\s+IN)\s+\(([^)]+)\)\s*$" + match2 = re.match(pattern2, condition, re.IGNORECASE) + + if match2: + column, operator, values_str = match2.groups() + return self._build_in_condition(logical_op, column, operator, values_str) + + # Pattern 3: [AND/OR] column IS NULL/IS NOT NULL + pattern3 = r"^\s*(\w+)\s+(IS\s+NULL|IS\s+NOT\s+NULL)\s*$" + match3 = re.match(pattern3, condition, re.IGNORECASE) + + if match3: + column, operator = match3.groups() + return self._build_null_condition(logical_op, column, operator) + + # If no patterns match, reject the condition for security + raise ValueError(f"Unsupported condition pattern: {condition}") + + def _build_simple_condition( + self, logical_op: Optional[str], column: str, operator: str, value: str + ) -> Tuple[str, Dict[str, Any]]: + """Build a simple condition with parameter binding.""" + # Validate components + if not self._validate_column_name(column): + raise ValueError(f"Invalid column name: {column}") + + if not self._validate_operator(operator): + raise ValueError(f"Invalid operator: {operator}") + + if logical_op and not self._validate_logical_operator(logical_op): + raise ValueError(f"Invalid logical operator: {logical_op}") + + # Generate parameter name and store value + param_name = self._generate_param_name() + self.parameters[param_name] = value + + # Build the SQL snippet + sql_parts = [] + if logical_op: + sql_parts.append(logical_op.upper()) + + sql_parts.extend([column, operator.upper(), f":{param_name}"]) + + return " ".join(sql_parts), self.parameters + + def _build_in_condition( + self, logical_op: Optional[str], column: str, operator: str, values_str: str + ) -> Tuple[str, Dict[str, Any]]: + """Build an IN condition with parameter binding.""" + # Validate components + if not self._validate_column_name(column): + raise ValueError(f"Invalid column name: {column}") + + if logical_op and not self._validate_logical_operator(logical_op): + raise ValueError(f"Invalid logical operator: {logical_op}") + + # Simple regex to extract quoted values + value_pattern = r"'([^']*)'" + matches = re.findall(value_pattern, values_str) + + if not matches: + raise ValueError("No valid values found in IN clause") + + # Generate parameters for each value + param_names = [] + for value in matches: + param_name = self._generate_param_name() + self.parameters[param_name] = value + param_names.append(f":{param_name}") + + # Build the SQL snippet + sql_parts = [] + if logical_op: + sql_parts.append(logical_op.upper()) + + sql_parts.extend([column, operator.upper(), f"({', '.join(param_names)})"]) + + return " ".join(sql_parts), self.parameters + + def _build_null_condition( + self, logical_op: Optional[str], column: str, operator: str + ) -> Tuple[str, Dict[str, Any]]: + """Build a NULL check condition.""" + # Validate components + if not self._validate_column_name(column): + raise ValueError(f"Invalid column name: {column}") + + if logical_op and not self._validate_logical_operator(logical_op): + raise ValueError(f"Invalid logical operator: {logical_op}") + + # Build the SQL snippet (no parameters needed for NULL checks) + sql_parts = [] + if logical_op: + sql_parts.append(logical_op.upper()) + + sql_parts.extend([column, operator.upper()]) + + return " ".join(sql_parts), {} + + def build_device_name_filter(self, device_name: str) -> Tuple[str, Dict[str, Any]]: + """ + Build a safe device name filter condition. + + Args: + device_name: Device name to filter for + + Returns: + Tuple of (safe_sql_snippet, parameters_dict) + """ + if not device_name: + return "", {} + + device_name = self._sanitize_string(device_name) + param_name = self._generate_param_name("device_name") + self.parameters[param_name] = device_name + + return f"AND devName = :{param_name}", self.parameters + + def build_condition( + self, conditions: List[Dict[str, str]], logical_operator: str = "AND" + ) -> Tuple[str, Dict[str, Any]]: + """ + Build a safe SQL condition from a list of condition dictionaries. + + Args: + conditions: List of condition dicts with 'column', 'operator', 'value' keys + logical_operator: Logical operator to join conditions (AND/OR) + + Returns: + Tuple of (safe_sql_snippet, parameters_dict) + """ + if not conditions: + return "", {} + + if not self._validate_logical_operator(logical_operator): + return "", {} + + condition_parts = [] + all_params = {} + + for condition_dict in conditions: + try: + column = condition_dict.get("column", "") + operator = condition_dict.get("operator", "") + value = condition_dict.get("value", "") + + # Validate each component + if not self._validate_column_name(column): + return "", {} + + if not self._validate_operator(operator): + return "", {} + + # Create parameter binding + param_name = self._generate_param_name() + all_params[param_name] = self._sanitize_string(str(value)) + + # Build condition part + condition_part = f"{column} {operator} :{param_name}" + condition_parts.append(condition_part) + + except Exception: + return "", {} + + if not condition_parts: + return "", {} + + # Join all parts with the logical operator + final_condition = f" {logical_operator} ".join(condition_parts) + self.parameters.update(all_params) + + return final_condition, self.parameters + + def build_event_type_filter( + self, event_types: List[str] + ) -> Tuple[str, Dict[str, Any]]: + """ + Build a safe event type filter condition. + + Args: + event_types: List of event types to filter for + + Returns: + Tuple of (safe_sql_snippet, parameters_dict) + """ + if not event_types: + return "", {} + + # Validate event types against whitelist + valid_types = [] + for event_type in event_types: + event_type = self._sanitize_string(event_type) + if event_type in self.ALLOWED_EVENT_TYPES: + valid_types.append(event_type) + + if not valid_types: + return "", {} + + # Generate parameters for each valid event type + param_names = [] + for event_type in valid_types: + param_name = self._generate_param_name("event_type") + self.parameters[param_name] = event_type + param_names.append(f":{param_name}") + + sql_snippet = f"AND eve_EventType IN ({', '.join(param_names)})" + return sql_snippet, self.parameters + + def get_safe_condition_legacy( + self, condition_setting: str + ) -> Tuple[str, Dict[str, Any]]: + """ + Convert legacy condition settings to safe parameterized queries. + This method provides backward compatibility for existing condition formats. + + Args: + condition_setting: The condition string from settings + + Returns: + Tuple of (safe_sql_snippet, parameters_dict) + """ + if not condition_setting or not condition_setting.strip(): + return "", {} + + try: + return self.build_safe_condition(condition_setting) + except ValueError: + # Return empty condition for safety + return "", {} diff --git a/test/backend/test_safe_builder_unit.py b/test/backend/test_safe_builder_unit.py index 5c1fff4f..22c4289e 100644 --- a/test/backend/test_safe_builder_unit.py +++ b/test/backend/test_safe_builder_unit.py @@ -6,11 +6,12 @@ This test file has minimal dependencies to ensure it can run in any environment. import sys import unittest import re -from unittest.mock import Mock, patch +from unittest.mock import Mock # Mock the logger module to avoid dependency issues sys.modules['logger'] = Mock() + # Standalone version of SafeConditionBuilder for testing class TestSafeConditionBuilder: """ @@ -92,7 +93,7 @@ class TestSafeConditionBuilder: try: return self._parse_condition(condition_string) - except Exception as e: + except Exception: raise ValueError(f"Invalid condition format: {condition_string}") def _parse_condition(self, condition): @@ -262,7 +263,6 @@ class TestSafeConditionBuilderSecurity(unittest.TestCase): # Ensure no leakage between calls self.assertNotEqual(params1, params2) - def test_xss_prevention(self): """Test that XSS-like payloads in device names are handled safely.""" xss_payloads = [ diff --git a/test/backend/test_sql_injection_prevention.py b/test/backend/test_sql_injection_prevention.py index f85426a3..958b374e 100644 --- a/test/backend/test_sql_injection_prevention.py +++ b/test/backend/test_sql_injection_prevention.py @@ -8,8 +8,7 @@ properly addressed in the reporting.py module. import sys import os -import unittest -from unittest.mock import Mock, patch, MagicMock +import pytest # Add parent directory to path sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'server')) @@ -19,203 +18,215 @@ sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'server', 'db') from sql_safe_builder import SafeConditionBuilder -class TestSQLInjectionPrevention(unittest.TestCase): - """Test suite for SQL injection prevention.""" - - def setUp(self): - """Set up test fixtures.""" - self.builder = SafeConditionBuilder() - - def test_sql_injection_attempt_single_quote(self): - """Test that single quote injection attempts are blocked.""" - malicious_input = "'; DROP TABLE users; --" - condition, params = self.builder.get_safe_condition_legacy(malicious_input) - - # Should return empty condition when invalid - self.assertEqual(condition, "") - self.assertEqual(params, {}) - - def test_sql_injection_attempt_union(self): - """Test that UNION injection attempts are blocked.""" - malicious_input = "1' UNION SELECT * FROM passwords --" - condition, params = self.builder.get_safe_condition_legacy(malicious_input) - - # Should return empty condition when invalid - self.assertEqual(condition, "") - self.assertEqual(params, {}) - - def test_sql_injection_attempt_or_true(self): - """Test that OR 1=1 injection attempts are blocked.""" - malicious_input = "' OR '1'='1" - condition, params = self.builder.get_safe_condition_legacy(malicious_input) - - # Should return empty condition when invalid - self.assertEqual(condition, "") - self.assertEqual(params, {}) - - def test_valid_simple_condition(self): - """Test that valid simple conditions are handled correctly.""" - valid_input = "AND devName = 'Test Device'" - condition, params = self.builder.get_safe_condition_legacy(valid_input) - - # Should create parameterized query - self.assertIn("AND devName = :", condition) - self.assertEqual(len(params), 1) - self.assertIn('Test Device', list(params.values())) - - def test_empty_condition(self): - """Test that empty conditions are handled safely.""" - empty_input = "" - condition, params = self.builder.get_safe_condition_legacy(empty_input) - - # Should return empty condition - self.assertEqual(condition, "") - self.assertEqual(params, {}) - - def test_whitespace_only_condition(self): - """Test that whitespace-only conditions are handled safely.""" - whitespace_input = " \n\t " - condition, params = self.builder.get_safe_condition_legacy(whitespace_input) - - # Should return empty condition - self.assertEqual(condition, "") - self.assertEqual(params, {}) - - def test_multiple_conditions_valid(self): - """Test that single valid conditions are handled correctly.""" - # Test with a single condition first (our current parser handles single conditions well) - valid_input = "AND devName = 'Device1'" - condition, params = self.builder.get_safe_condition_legacy(valid_input) - - # Should create parameterized query - self.assertIn("devName = :", condition) - self.assertEqual(len(params), 1) - self.assertIn('Device1', list(params.values())) - - def test_disallowed_column_name(self): - """Test that non-whitelisted column names are rejected.""" - invalid_input = "AND malicious_column = 'value'" - condition, params = self.builder.get_safe_condition_legacy(invalid_input) - - # Should return empty condition when column not in whitelist - self.assertEqual(condition, "") - self.assertEqual(params, {}) - - def test_disallowed_operator(self): - """Test that non-whitelisted operators are rejected.""" - invalid_input = "AND devName SOUNDS LIKE 'test'" - condition, params = self.builder.get_safe_condition_legacy(invalid_input) - - # Should return empty condition when operator not allowed - self.assertEqual(condition, "") - self.assertEqual(params, {}) - - def test_nested_select_attempt(self): - """Test that nested SELECT attempts are blocked.""" - malicious_input = "AND devName IN (SELECT password FROM users)" - condition, params = self.builder.get_safe_condition_legacy(malicious_input) - - # Should return empty condition when nested SELECT detected - self.assertEqual(condition, "") - self.assertEqual(params, {}) - - def test_hex_encoding_attempt(self): - """Test that hex-encoded injection attempts are blocked.""" - malicious_input = "AND 0x44524f50205441424c45" - condition, params = self.builder.get_safe_condition_legacy(malicious_input) - - # Should return empty condition when hex encoding detected - self.assertEqual(condition, "") - self.assertEqual(params, {}) - - def test_comment_injection_attempt(self): - """Test that comment injection attempts are handled.""" - malicious_input = "AND devName = 'test' /* comment */ --" - condition, params = self.builder.get_safe_condition_legacy(malicious_input) - - # Comments should be stripped and condition validated - if condition: - self.assertNotIn("/*", condition) - self.assertNotIn("--", condition) - - def test_special_placeholder_replacement(self): - """Test that {s-quote} placeholder is safely replaced.""" - input_with_placeholder = "AND devName = {s-quote}Test{s-quote}" - condition, params = self.builder.get_safe_condition_legacy(input_with_placeholder) - - # Should handle placeholder safely - if condition: - self.assertNotIn("{s-quote}", condition) - self.assertIn("devName = :", condition) - - def test_null_byte_injection(self): - """Test that null byte injection attempts are blocked.""" - malicious_input = "AND devName = 'test\x00' DROP TABLE --" - condition, params = self.builder.get_safe_condition_legacy(malicious_input) - - # Null bytes should be sanitized - if condition: - self.assertNotIn("\x00", condition) - for value in params.values(): - self.assertNotIn("\x00", str(value)) - - def test_build_condition_with_allowed_values(self): - """Test building condition with specific allowed values.""" - conditions = [ - {"column": "eve_EventType", "operator": "=", "value": "Connected"}, - {"column": "devName", "operator": "LIKE", "value": "%test%"} - ] - condition, params = self.builder.build_condition(conditions, "AND") - - # Should create valid parameterized condition - self.assertIn("eve_EventType = :", condition) - self.assertIn("devName LIKE :", condition) - self.assertEqual(len(params), 2) - - def test_build_condition_with_invalid_column(self): - """Test that invalid columns in build_condition are rejected.""" - conditions = [ - {"column": "invalid_column", "operator": "=", "value": "test"} - ] - condition, params = self.builder.build_condition(conditions) - - # Should return empty when invalid column - self.assertEqual(condition, "") - self.assertEqual(params, {}) - - def test_case_variations_injection(self): - """Test that case variation injection attempts are blocked.""" - malicious_inputs = [ - "AnD 1=1", - "oR 1=1", - "UnIoN SeLeCt * FrOm users" - ] - - for malicious_input in malicious_inputs: - condition, params = self.builder.get_safe_condition_legacy(malicious_input) - # Should handle case variations safely - if "union" in condition.lower() or "select" in condition.lower(): - self.fail(f"Injection not blocked: {malicious_input}") - - def test_time_based_injection_attempt(self): - """Test that time-based injection attempts are blocked.""" - malicious_input = "AND IF(1=1, SLEEP(5), 0)" - condition, params = self.builder.get_safe_condition_legacy(malicious_input) - - # Should return empty condition when SQL functions detected - self.assertEqual(condition, "") - self.assertEqual(params, {}) - - def test_stacked_queries_attempt(self): - """Test that stacked query attempts are blocked.""" - malicious_input = "'; INSERT INTO admin VALUES ('hacker', 'password'); --" - condition, params = self.builder.get_safe_condition_legacy(malicious_input) - - # Should return empty condition when semicolon detected - self.assertEqual(condition, "") - self.assertEqual(params, {}) +@pytest.fixture +def builder(): + """Fixture to provide a SafeConditionBuilder instance.""" + return SafeConditionBuilder() -if __name__ == '__main__': - # Run the tests - unittest.main(verbosity=2) \ No newline at end of file +def test_sql_injection_attempt_single_quote(builder): + """Test that single quote injection attempts are blocked.""" + malicious_input = "'; DROP TABLE users; --" + condition, params = builder.get_safe_condition_legacy(malicious_input) + + # Should return empty condition when invalid + assert condition == "" + assert params == {} + + +def test_sql_injection_attempt_union(builder): + """Test that UNION injection attempts are blocked.""" + malicious_input = "1' UNION SELECT * FROM passwords --" + condition, params = builder.get_safe_condition_legacy(malicious_input) + + # Should return empty condition when invalid + assert condition == "" + assert params == {} + + +def test_sql_injection_attempt_or_true(builder): + """Test that OR 1=1 injection attempts are blocked.""" + malicious_input = "' OR '1'='1" + condition, params = builder.get_safe_condition_legacy(malicious_input) + + # Should return empty condition when invalid + assert condition == "" + assert params == {} + + +def test_valid_simple_condition(builder): + """Test that valid simple conditions are handled correctly.""" + valid_input = "AND devName = 'Test Device'" + condition, params = builder.get_safe_condition_legacy(valid_input) + + # Should create parameterized query + assert "AND devName = :" in condition + assert len(params) == 1 + assert 'Test Device' in list(params.values()) + + +def test_empty_condition(builder): + """Test that empty conditions are handled safely.""" + empty_input = "" + condition, params = builder.get_safe_condition_legacy(empty_input) + + # Should return empty condition + assert condition == "" + assert params == {} + + +def test_whitespace_only_condition(builder): + """Test that whitespace-only conditions are handled safely.""" + whitespace_input = " \n\t " + condition, params = builder.get_safe_condition_legacy(whitespace_input) + + # Should return empty condition + assert condition == "" + assert params == {} + + +def test_multiple_conditions_valid(builder): + """Test that single valid conditions are handled correctly.""" + # Test with a single condition first (our current parser handles single conditions well) + valid_input = "AND devName = 'Device1'" + condition, params = builder.get_safe_condition_legacy(valid_input) + + # Should create parameterized query + assert "devName = :" in condition + assert len(params) == 1 + assert 'Device1' in list(params.values()) + + +def test_disallowed_column_name(builder): + """Test that non-whitelisted column names are rejected.""" + invalid_input = "AND malicious_column = 'value'" + condition, params = builder.get_safe_condition_legacy(invalid_input) + + # Should return empty condition when column not in whitelist + assert condition == "" + assert params == {} + + +def test_disallowed_operator(builder): + """Test that non-whitelisted operators are rejected.""" + invalid_input = "AND devName SOUNDS LIKE 'test'" + condition, params = builder.get_safe_condition_legacy(invalid_input) + + # Should return empty condition when operator not allowed + assert condition == "" + assert params == {} + + +def test_nested_select_attempt(builder): + """Test that nested SELECT attempts are blocked.""" + malicious_input = "AND devName IN (SELECT password FROM users)" + condition, params = builder.get_safe_condition_legacy(malicious_input) + + # Should return empty condition when nested SELECT detected + assert condition == "" + assert params == {} + + +def test_hex_encoding_attempt(builder): + """Test that hex-encoded injection attempts are blocked.""" + malicious_input = "AND 0x44524f50205441424c45" + condition, params = builder.get_safe_condition_legacy(malicious_input) + + # Should return empty condition when hex encoding detected + assert condition == "" + assert params == {} + + +def test_comment_injection_attempt(builder): + """Test that comment injection attempts are handled.""" + malicious_input = "AND devName = 'test' /* comment */ --" + condition, params = builder.get_safe_condition_legacy(malicious_input) + + # Comments should be stripped and condition validated + if condition: + assert "/*" not in condition + assert "--" not in condition + + +def test_special_placeholder_replacement(builder): + """Test that {s-quote} placeholder is safely replaced.""" + input_with_placeholder = "AND devName = {s-quote}Test{s-quote}" + condition, params = builder.get_safe_condition_legacy(input_with_placeholder) + + # Should handle placeholder safely + if condition: + assert "{s-quote}" not in condition + assert "devName = :" in condition + + +def test_null_byte_injection(builder): + """Test that null byte injection attempts are blocked.""" + malicious_input = "AND devName = 'test\x00' DROP TABLE --" + condition, params = builder.get_safe_condition_legacy(malicious_input) + + # Null bytes should be sanitized + if condition: + assert "\x00" not in condition + for value in params.values(): + assert "\x00" not in str(value) + + +def test_build_condition_with_allowed_values(builder): + """Test building condition with specific allowed values.""" + conditions = [ + {"column": "eve_EventType", "operator": "=", "value": "Connected"}, + {"column": "devName", "operator": "LIKE", "value": "%test%"} + ] + condition, params = builder.build_condition(conditions, "AND") + + # Should create valid parameterized condition + assert "eve_EventType = :" in condition + assert "devName LIKE :" in condition + assert len(params) == 2 + + +def test_build_condition_with_invalid_column(builder): + """Test that invalid columns in build_condition are rejected.""" + conditions = [ + {"column": "invalid_column", "operator": "=", "value": "test"} + ] + condition, params = builder.build_condition(conditions) + + # Should return empty when invalid column + assert condition == "" + assert params == {} + + +def test_case_variations_injection(builder): + """Test that case variation injection attempts are blocked.""" + malicious_inputs = [ + "AnD 1=1", + "oR 1=1", + "UnIoN SeLeCt * FrOm users" + ] + + for malicious_input in malicious_inputs: + condition, params = builder.get_safe_condition_legacy(malicious_input) + # Should handle case variations safely + if "union" in condition.lower() or "select" in condition.lower(): + assert False, f"Injection not blocked: {malicious_input}" + + +def test_time_based_injection_attempt(builder): + """Test that time-based injection attempts are blocked.""" + malicious_input = "AND IF(1=1, SLEEP(5), 0)" + condition, params = builder.get_safe_condition_legacy(malicious_input) + + # Should return empty condition when SQL functions detected + assert condition == "" + assert params == {} + + +def test_stacked_queries_attempt(builder): + """Test that stacked query attempts are blocked.""" + malicious_input = "'; INSERT INTO admin VALUES ('hacker', 'password'); --" + condition, params = builder.get_safe_condition_legacy(malicious_input) + + # Should return empty condition when semicolon detected + assert condition == "" + assert params == {} From 6206e483a907cf5d816d517e1c3b2ac3afc436ec Mon Sep 17 00:00:00 2001 From: Adam Outler Date: Mon, 17 Nov 2025 02:57:42 +0000 Subject: [PATCH 3/4] Remove files that shouldn't be in PR: db.php, cron files --- front/php/server/db.php | 4 ---- .../services/scripts/cron_script.sh | 14 ++++++++++---- .../production-filesystem/services/start-crond.sh | 4 ++-- 3 files changed, 12 insertions(+), 10 deletions(-) diff --git a/front/php/server/db.php b/front/php/server/db.php index a543c592..89d4d906 100755 --- a/front/php/server/db.php +++ b/front/php/server/db.php @@ -28,8 +28,6 @@ if (!is_dir($dbFolderPath)) { @mkdir($dbFolderPath, 0775, true); } -$dbFolderPath = rtrim($dbFolderPath, '/') . '/'; - $DBFILE = rtrim($dbFolderPath, '/') . '/app.db'; if (!file_exists($DBFILE) && file_exists($legacyDbPath)) { $DBFILE = $legacyDbPath; @@ -43,8 +41,6 @@ if (!is_dir($logFolderPath)) { @mkdir($logFolderPath, 0775, true); } -$logFolderPath = rtrim($logFolderPath, '/') . '/'; - $DBFILE_LOCKED_FILE = rtrim($logFolderPath, '/') . '/db_is_locked.log'; diff --git a/install/production-filesystem/services/scripts/cron_script.sh b/install/production-filesystem/services/scripts/cron_script.sh index 2a0b4f42..347f1a20 100755 --- a/install/production-filesystem/services/scripts/cron_script.sh +++ b/install/production-filesystem/services/scripts/cron_script.sh @@ -1,10 +1,16 @@ #!/bin/bash +export INSTALL_DIR=/app -# If cron_restart_backend exists in the file LOG_EXECUTION_QUEUE, then -# call the restart backend script and remove the line from the file -# and remove the entry + +# Check if there are any entries with cron_restart_backend if grep -q "cron_restart_backend" "${LOG_EXECUTION_QUEUE}"; then + killall python3 + sleep 2 /services/start-backend.sh & - sed -i '/cron_restart_backend/d' "${LOG_EXECUTION_QUEUE}" + + # Remove all lines containing cron_restart_backend from the log file + # Atomic replacement with temp file + grep -v "cron_restart_backend" "${LOG_EXECUTION_QUEUE}" > "${LOG_EXECUTION_QUEUE}.tmp" && \ + mv "${LOG_EXECUTION_QUEUE}.tmp" "${LOG_EXECUTION_QUEUE}" fi diff --git a/install/production-filesystem/services/start-crond.sh b/install/production-filesystem/services/start-crond.sh index 548c5d6a..c6e9ea70 100755 --- a/install/production-filesystem/services/start-crond.sh +++ b/install/production-filesystem/services/start-crond.sh @@ -23,9 +23,9 @@ done trap cleanup EXIT trap forward_signal INT TERM -echo "Starting /usr/sbin/crond -c \"${SYSTEM_SERVICES_CROND}\" -f -l 1 -L \"${LOG_CROND}\" >>\"${LOG_CROND}\" 2>&1 &" +echo "Starting /usr/sbin/crond -c \"${SYSTEM_SERVICES_CROND}\" -f -L \"${LOG_CROND}\" >>\"${LOG_CROND}\" 2>&1 &" -/usr/sbin/crond -c "${SYSTEM_SERVICES_CROND}" -f -l 1 -L "${LOG_CROND}" >>"${LOG_CROND}" 2>&1 & +/usr/sbin/crond -c "${SYSTEM_SERVICES_CROND}" -f -L "${LOG_CROND}" >>"${LOG_CROND}" 2>&1 & crond_pid=$! wait "${crond_pid}"; status=$? From d13596c35ca5e6e4db3a55ad13c18741caa90537 Mon Sep 17 00:00:00 2001 From: Adam Outler Date: Mon, 17 Nov 2025 20:27:27 +0000 Subject: [PATCH 4/4] Coderabbit suggestion --- test/backend/test_sql_injection_prevention.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/test/backend/test_sql_injection_prevention.py b/test/backend/test_sql_injection_prevention.py index 958b374e..5a43534f 100644 --- a/test/backend/test_sql_injection_prevention.py +++ b/test/backend/test_sql_injection_prevention.py @@ -209,7 +209,8 @@ def test_case_variations_injection(builder): condition, params = builder.get_safe_condition_legacy(malicious_input) # Should handle case variations safely if "union" in condition.lower() or "select" in condition.lower(): - assert False, f"Injection not blocked: {malicious_input}" + if "union" in condition.lower() or "select" in condition.lower(): + pytest.fail(f"Injection not blocked: {malicious_input}") def test_time_based_injection_attempt(builder):