From 1c2721549beb72ea18b115e969209c9f24ceb328 Mon Sep 17 00:00:00 2001 From: priestlypython Date: Wed, 1 Oct 2025 18:31:49 -0700 Subject: [PATCH] fix: Support compound conditions in SafeConditionBuilder (Issue #1210) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Problem PR #1182 introduced SafeConditionBuilder to prevent SQL injection, but it only supported single-clause conditions. This broke notification filters using multiple AND/OR clauses, causing user filters like: `AND devLastIP NOT LIKE '192.168.50.%' AND devLastIP NOT LIKE '192.168.60.%'...` to be rejected with "Unsupported condition pattern" errors. ## Root Cause The `_parse_condition()` method used regex patterns that only matched single conditions. When multiple clauses were chained, the entire string failed to match any pattern and was rejected for security. ## Solution Enhanced SafeConditionBuilder with compound condition support: 1. **Added `_is_compound_condition()`** - Detects multiple logical operators while respecting quoted strings 2. **Added `_parse_compound_condition()`** - Splits compound conditions into individual clauses and parses each one 3. **Added `_split_by_logical_operators()`** - Intelligently splits on AND/OR while preserving operators in quoted strings 4. **Refactored `_parse_condition()`** - Routes to compound or single parser 5. **Created `_parse_single_condition()`** - Handles individual clauses (from original `_parse_condition` logic) ## Testing - Added comprehensive test suite (19 tests, 100% passing) - Tested user's exact failing filter (6 AND clauses with NOT LIKE) - Verified backward compatibility with single conditions - Validated security (SQL injection attempts still blocked) - Tested edge cases (mixed AND/OR, whitespace, empty conditions) ## Impact - ✅ Fixes reported issue #1210 - ✅ Maintains all security protections from PR #1182 - ✅ Backward compatible with existing single-clause filters - ✅ No breaking changes to API Fixes #1210 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- server/db/sql_safe_builder.py | 252 ++++++++++++++++++++++-- test/test_compound_conditions.py | 326 +++++++++++++++++++++++++++++++ 2 files changed, 558 insertions(+), 20 deletions(-) create mode 100644 test/test_compound_conditions.py diff --git a/server/db/sql_safe_builder.py b/server/db/sql_safe_builder.py index 5548561f..ce8c5360 100755 --- a/server/db/sql_safe_builder.py +++ b/server/db/sql_safe_builder.py @@ -153,47 +153,259 @@ class SafeConditionBuilder: def _parse_condition(self, condition: str) -> Tuple[str, Dict[str, Any]]: """ Parse a condition string into safe SQL with parameters. - - This method handles basic patterns like: - - AND devName = 'value' - - AND devComments LIKE '%value%' - - AND eve_EventType IN ('type1', 'type2') - + + This method handles both single and compound conditions: + - Single: AND devName = 'value' + - Compound: AND devName = 'value' AND devVendor = 'Apple' + - Multiple clauses with AND/OR operators + Args: condition: Condition string to parse - + Returns: Tuple of (safe_sql_snippet, parameters_dict) """ condition = condition.strip() - + + # Handle empty conditions + if not condition: + return "", {} + + # Check if this is a compound condition (multiple clauses) + if self._is_compound_condition(condition): + return self._parse_compound_condition(condition) + + # Single condition: extract leading logical operator if present + logical_op = None + clause_text = condition + + # Check for leading AND + if condition.upper().startswith('AND ') or condition.upper().startswith('AND\t'): + logical_op = 'AND' + clause_text = condition[3:].strip() + # Check for leading OR + elif condition.upper().startswith('OR ') or condition.upper().startswith('OR\t'): + logical_op = 'OR' + clause_text = condition[2:].strip() + + # Parse the single condition + return self._parse_single_condition(clause_text, logical_op) + + def _is_compound_condition(self, condition: str) -> bool: + """ + Determine if a condition contains multiple clauses (compound condition). + + A compound condition has multiple logical operators (AND/OR) connecting + separate comparison clauses. + + Args: + condition: Condition string to check + + Returns: + True if compound (multiple clauses), False if single clause + """ + # Track if we're inside quotes to avoid counting operators in quoted strings + in_quotes = False + logical_op_count = 0 + i = 0 + + while i < len(condition): + char = condition[i] + + # Toggle quote state + if char == "'": + in_quotes = not in_quotes + i += 1 + continue + + # Only count logical operators outside of quotes + if not in_quotes: + # Look for AND or OR as whole words + remaining = condition[i:].upper() + + # Check for AND (must be word boundary) + if remaining.startswith('AND ') or remaining.startswith('AND\t'): + logical_op_count += 1 + i += 3 + continue + + # Check for OR (must be word boundary) + if remaining.startswith('OR ') or remaining.startswith('OR\t'): + logical_op_count += 1 + i += 2 + continue + + i += 1 + + # A compound condition has more than one logical operator + # (first AND/OR starts the condition, subsequent ones connect clauses) + return logical_op_count > 1 + + def _parse_compound_condition(self, condition: str) -> Tuple[str, Dict[str, Any]]: + """ + Parse a compound condition with multiple clauses. + + Splits the condition into individual clauses, parses each one, + and reconstructs the full condition with all parameters. + + Args: + condition: Compound condition string + + Returns: + Tuple of (safe_sql_snippet, parameters_dict) + """ + # Split the condition into individual clauses while preserving logical operators + clauses = self._split_by_logical_operators(condition) + + # Parse each clause individually + parsed_parts = [] + all_params = {} + + for clause_text, logical_op in clauses: + # Parse this single clause + sql_part, params = self._parse_single_condition(clause_text, logical_op) + + if sql_part: + parsed_parts.append(sql_part) + all_params.update(params) + + if not parsed_parts: + raise ValueError("No valid clauses found in compound condition") + + # Join all parsed parts + final_sql = " ".join(parsed_parts) + + return final_sql, all_params + + def _split_by_logical_operators(self, condition: str) -> List[Tuple[str, Optional[str]]]: + """ + Split a compound condition into individual clauses. + + Returns a list of tuples: (clause_text, logical_operator) + The logical operator is the AND/OR that precedes the clause. + + Args: + condition: Compound condition string + + Returns: + List of (clause_text, logical_op) tuples + """ + clauses = [] + current_clause = [] + current_logical_op = None + in_quotes = False + i = 0 + + while i < len(condition): + char = condition[i] + + # Toggle quote state + if char == "'": + in_quotes = not in_quotes + current_clause.append(char) + i += 1 + continue + + # Only look for logical operators outside of quotes + if not in_quotes: + remaining = condition[i:].upper() + + # Check if we're at a word boundary (start of string or after whitespace) + at_word_boundary = (i == 0 or condition[i-1] in ' \t') + + # Check for AND (must be at word boundary) + if at_word_boundary and (remaining.startswith('AND ') or remaining.startswith('AND\t')): + # Save current clause if we have one + if current_clause: + clause_text = ''.join(current_clause).strip() + if clause_text: + clauses.append((clause_text, current_logical_op)) + current_clause = [] + + # Set the logical operator for the next clause + current_logical_op = 'AND' + i += 3 # Skip 'AND' + + # Skip whitespace after AND + while i < len(condition) and condition[i] in ' \t': + i += 1 + continue + + # Check for OR (must be at word boundary) + if at_word_boundary and (remaining.startswith('OR ') or remaining.startswith('OR\t')): + # Save current clause if we have one + if current_clause: + clause_text = ''.join(current_clause).strip() + if clause_text: + clauses.append((clause_text, current_logical_op)) + current_clause = [] + + # Set the logical operator for the next clause + current_logical_op = 'OR' + i += 2 # Skip 'OR' + + # Skip whitespace after OR + while i < len(condition) and condition[i] in ' \t': + i += 1 + continue + + # Add character to current clause + current_clause.append(char) + i += 1 + + # Don't forget the last clause + if current_clause: + clause_text = ''.join(current_clause).strip() + if clause_text: + clauses.append((clause_text, current_logical_op)) + + return clauses + + def _parse_single_condition(self, condition: str, logical_op: Optional[str] = None) -> Tuple[str, Dict[str, Any]]: + """ + Parse a single condition clause into safe SQL with parameters. + + This method handles basic patterns like: + - devName = 'value' (with optional AND/OR prefix) + - devComments LIKE '%value%' + - eve_EventType IN ('type1', 'type2') + + Args: + condition: Single condition string to parse + logical_op: Optional logical operator (AND/OR) to prepend + + Returns: + Tuple of (safe_sql_snippet, parameters_dict) + """ + condition = condition.strip() + # Handle empty conditions if not condition: return "", {} # Simple pattern matching for common conditions - # Pattern 1: AND/OR column operator value (supporting Unicode in quoted strings) - pattern1 = r'^\s*(AND|OR)?\s+(\w+)\s+(=|!=|<>|<|>|<=|>=|LIKE|NOT\s+LIKE)\s+\'([^\']*)\'\s*$' + # Pattern 1: [AND/OR] column operator value (supporting Unicode in quoted strings) + pattern1 = r'^\s*(\w+)\s+(=|!=|<>|<|>|<=|>=|LIKE|NOT\s+LIKE)\s+\'([^\']*)\'\s*$' match1 = re.match(pattern1, condition, re.IGNORECASE | re.UNICODE) - + if match1: - logical_op, column, operator, value = match1.groups() + column, operator, value = match1.groups() return self._build_simple_condition(logical_op, column, operator, value) - # Pattern 2: AND/OR column IN ('val1', 'val2', ...) - pattern2 = r'^\s*(AND|OR)?\s+(\w+)\s+(IN|NOT\s+IN)\s+\(([^)]+)\)\s*$' + # Pattern 2: [AND/OR] column IN ('val1', 'val2', ...) + pattern2 = r'^\s*(\w+)\s+(IN|NOT\s+IN)\s+\(([^)]+)\)\s*$' match2 = re.match(pattern2, condition, re.IGNORECASE) - + if match2: - logical_op, column, operator, values_str = match2.groups() + column, operator, values_str = match2.groups() return self._build_in_condition(logical_op, column, operator, values_str) - # Pattern 3: AND/OR column IS NULL/IS NOT NULL - pattern3 = r'^\s*(AND|OR)?\s+(\w+)\s+(IS\s+NULL|IS\s+NOT\s+NULL)\s*$' + # Pattern 3: [AND/OR] column IS NULL/IS NOT NULL + pattern3 = r'^\s*(\w+)\s+(IS\s+NULL|IS\s+NOT\s+NULL)\s*$' match3 = re.match(pattern3, condition, re.IGNORECASE) - + if match3: - logical_op, column, operator = match3.groups() + column, operator = match3.groups() return self._build_null_condition(logical_op, column, operator) # If no patterns match, reject the condition for security diff --git a/test/test_compound_conditions.py b/test/test_compound_conditions.py new file mode 100644 index 00000000..e7d15557 --- /dev/null +++ b/test/test_compound_conditions.py @@ -0,0 +1,326 @@ +""" +Unit tests for SafeConditionBuilder compound condition parsing. + +Tests the fix for Issue #1210 - compound conditions with multiple AND/OR clauses. +""" + +import sys +import unittest +from unittest.mock import MagicMock + +# Mock the logger module before importing SafeConditionBuilder +sys.modules['logger'] = MagicMock() + +# Add parent directory to path for imports +sys.path.insert(0, '/tmp/netalertx_hotfix/server/db') + +from sql_safe_builder import SafeConditionBuilder + + +class TestCompoundConditions(unittest.TestCase): + """Test compound condition parsing functionality.""" + + def setUp(self): + """Create a fresh builder instance for each test.""" + self.builder = SafeConditionBuilder() + + def test_user_failing_filter_six_and_clauses(self): + """Test the exact user-reported failing filter from Issue #1210.""" + condition = ( + "AND devLastIP NOT LIKE '192.168.50.%' " + "AND devLastIP NOT LIKE '192.168.60.%' " + "AND devLastIP NOT LIKE '192.168.70.2' " + "AND devLastIP NOT LIKE '192.168.70.5' " + "AND devLastIP NOT LIKE '192.168.70.3' " + "AND devLastIP NOT LIKE '192.168.70.4'" + ) + + sql, params = self.builder.build_safe_condition(condition) + + # Should successfully parse + self.assertIsNotNone(sql) + self.assertIsNotNone(params) + + # Should have 6 parameters (one per clause) + self.assertEqual(len(params), 6) + + # Should contain all 6 AND operators + self.assertEqual(sql.count('AND'), 6) + + # Should contain all 6 NOT LIKE operators + self.assertEqual(sql.count('NOT LIKE'), 6) + + # Should have 6 parameter placeholders + self.assertEqual(sql.count(':param_'), 6) + + # Verify all IP patterns are in parameters + param_values = list(params.values()) + self.assertIn('192.168.50.%', param_values) + self.assertIn('192.168.60.%', param_values) + self.assertIn('192.168.70.2', param_values) + self.assertIn('192.168.70.5', param_values) + self.assertIn('192.168.70.3', param_values) + self.assertIn('192.168.70.4', param_values) + + def test_multiple_and_clauses_simple(self): + """Test multiple AND clauses with simple equality operators.""" + condition = "AND devName = 'Device1' AND devVendor = 'Apple' AND devFavorite = '1'" + + sql, params = self.builder.build_safe_condition(condition) + + # Should have 3 parameters + self.assertEqual(len(params), 3) + + # Should have 3 AND operators + self.assertEqual(sql.count('AND'), 3) + + # Verify all values are parameterized + param_values = list(params.values()) + self.assertIn('Device1', param_values) + self.assertIn('Apple', param_values) + self.assertIn('1', param_values) + + def test_multiple_or_clauses(self): + """Test multiple OR clauses.""" + condition = "OR devName = 'Device1' OR devName = 'Device2' OR devName = 'Device3'" + + sql, params = self.builder.build_safe_condition(condition) + + # Should have 3 parameters + self.assertEqual(len(params), 3) + + # Should have 3 OR operators + self.assertEqual(sql.count('OR'), 3) + + # Verify all device names are parameterized + param_values = list(params.values()) + self.assertIn('Device1', param_values) + self.assertIn('Device2', param_values) + self.assertIn('Device3', param_values) + + def test_mixed_and_or_clauses(self): + """Test mixed AND/OR logical operators.""" + condition = "AND devName = 'Device1' OR devName = 'Device2' AND devFavorite = '1'" + + sql, params = self.builder.build_safe_condition(condition) + + # Should have 3 parameters + self.assertEqual(len(params), 3) + + # Should preserve the logical operator order + self.assertIn('AND', sql) + self.assertIn('OR', sql) + + # Verify all values are parameterized + param_values = list(params.values()) + self.assertIn('Device1', param_values) + self.assertIn('Device2', param_values) + self.assertIn('1', param_values) + + def test_single_condition_backward_compatibility(self): + """Test that single conditions still work (backward compatibility).""" + condition = "AND devName = 'TestDevice'" + + sql, params = self.builder.build_safe_condition(condition) + + # Should have 1 parameter + self.assertEqual(len(params), 1) + + # Should match expected format + self.assertIn('AND devName = :param_', sql) + + # Parameter should contain the value + self.assertIn('TestDevice', params.values()) + + def test_single_condition_like_operator(self): + """Test single LIKE condition for backward compatibility.""" + condition = "AND devComments LIKE '%important%'" + + sql, params = self.builder.build_safe_condition(condition) + + # Should have 1 parameter + self.assertEqual(len(params), 1) + + # Should contain LIKE operator + self.assertIn('LIKE', sql) + + # Parameter should contain the pattern + self.assertIn('%important%', params.values()) + + def test_compound_with_like_patterns(self): + """Test compound conditions with LIKE patterns.""" + condition = "AND devLastIP LIKE '192.168.%' AND devVendor LIKE '%Apple%'" + + sql, params = self.builder.build_safe_condition(condition) + + # Should have 2 parameters + self.assertEqual(len(params), 2) + + # Should have 2 LIKE operators + self.assertEqual(sql.count('LIKE'), 2) + + # Verify patterns are parameterized + param_values = list(params.values()) + self.assertIn('192.168.%', param_values) + self.assertIn('%Apple%', param_values) + + def test_compound_with_inequality_operators(self): + """Test compound conditions with various inequality operators.""" + condition = "AND eve_DateTime > '2024-01-01' AND eve_DateTime < '2024-12-31'" + + sql, params = self.builder.build_safe_condition(condition) + + # Should have 2 parameters + self.assertEqual(len(params), 2) + + # Should have both operators + self.assertIn('>', sql) + self.assertIn('<', sql) + + # Verify dates are parameterized + param_values = list(params.values()) + self.assertIn('2024-01-01', param_values) + self.assertIn('2024-12-31', param_values) + + def test_empty_condition(self): + """Test empty condition string.""" + condition = "" + + sql, params = self.builder.build_safe_condition(condition) + + # Should return empty results + self.assertEqual(sql, "") + self.assertEqual(params, {}) + + def test_whitespace_only_condition(self): + """Test condition with only whitespace.""" + condition = " \t\n " + + sql, params = self.builder.build_safe_condition(condition) + + # Should return empty results + self.assertEqual(sql, "") + self.assertEqual(params, {}) + + def test_invalid_column_name_rejected(self): + """Test that invalid column names are rejected.""" + condition = "AND malicious_column = 'value'" + + with self.assertRaises(ValueError): + self.builder.build_safe_condition(condition) + + def test_invalid_operator_rejected(self): + """Test that invalid operators are rejected.""" + condition = "AND devName EXECUTE 'DROP TABLE'" + + with self.assertRaises(ValueError): + self.builder.build_safe_condition(condition) + + def test_sql_injection_attempt_blocked(self): + """Test that SQL injection attempts are blocked.""" + condition = "AND devName = 'value'; DROP TABLE devices; --" + + # Should either reject or sanitize the dangerous input + # The semicolon and comment should not appear in the final SQL + try: + sql, params = self.builder.build_safe_condition(condition) + # If it doesn't raise an error, it should sanitize the input + self.assertNotIn('DROP', sql.upper()) + self.assertNotIn(';', sql) + except ValueError: + # Rejection is also acceptable + pass + + def test_quoted_string_with_spaces(self): + """Test that quoted strings with spaces are handled correctly.""" + condition = "AND devName = 'My Device Name' AND devComments = 'Has spaces here'" + + sql, params = self.builder.build_safe_condition(condition) + + # Should have 2 parameters + self.assertEqual(len(params), 2) + + # Verify values with spaces are preserved + param_values = list(params.values()) + self.assertIn('My Device Name', param_values) + self.assertIn('Has spaces here', param_values) + + def test_compound_condition_with_not_equal(self): + """Test compound conditions with != operator.""" + condition = "AND devName != 'Device1' AND devVendor != 'Unknown'" + + sql, params = self.builder.build_safe_condition(condition) + + # Should have 2 parameters + self.assertEqual(len(params), 2) + + # Should have != operators (or converted to <>) + self.assertTrue('!=' in sql or '<>' in sql) + + # Verify values are parameterized + param_values = list(params.values()) + self.assertIn('Device1', param_values) + self.assertIn('Unknown', param_values) + + def test_very_long_compound_condition(self): + """Test handling of very long compound conditions (10+ clauses).""" + clauses = [] + for i in range(10): + clauses.append(f"AND devName != 'Device{i}'") + + condition = " ".join(clauses) + sql, params = self.builder.build_safe_condition(condition) + + # Should have 10 parameters + self.assertEqual(len(params), 10) + + # Should have 10 AND operators + self.assertEqual(sql.count('AND'), 10) + + # Verify all device names are parameterized + param_values = list(params.values()) + for i in range(10): + self.assertIn(f'Device{i}', param_values) + + +class TestParameterGeneration(unittest.TestCase): + """Test parameter generation and naming.""" + + def setUp(self): + """Create a fresh builder instance for each test.""" + self.builder = SafeConditionBuilder() + + def test_parameters_have_unique_names(self): + """Test that all parameters get unique names.""" + condition = "AND devName = 'A' AND devName = 'B' AND devName = 'C'" + + sql, params = self.builder.build_safe_condition(condition) + + # All parameter names should be unique + param_names = list(params.keys()) + self.assertEqual(len(param_names), len(set(param_names))) + + def test_parameter_values_match_condition(self): + """Test that parameter values correctly match the condition values.""" + condition = "AND devLastIP NOT LIKE '192.168.1.%' AND devLastIP NOT LIKE '10.0.0.%'" + + sql, params = self.builder.build_safe_condition(condition) + + # Should have exactly the values from the condition + param_values = sorted(params.values()) + expected_values = sorted(['192.168.1.%', '10.0.0.%']) + self.assertEqual(param_values, expected_values) + + def test_parameters_referenced_in_sql(self): + """Test that all parameters are actually referenced in the SQL.""" + condition = "AND devName = 'Device1' AND devVendor = 'Apple'" + + sql, params = self.builder.build_safe_condition(condition) + + # Every parameter should appear in the SQL + for param_name in params.keys(): + self.assertIn(f':{param_name}', sql) + + +if __name__ == '__main__': + unittest.main()