mirror of
https://github.com/jokob-sk/NetAlertX.git
synced 2025-12-07 09:36:05 -08:00
Tests Passing
This commit is contained in:
@@ -149,8 +149,8 @@ def test_nslookup_endpoint(client, api_token, ip, expected_status):
|
|||||||
|
|
||||||
@pytest.mark.parametrize("ip,mode,expected_status", [
|
@pytest.mark.parametrize("ip,mode,expected_status", [
|
||||||
("127.0.0.1", "fast", 200),
|
("127.0.0.1", "fast", 200),
|
||||||
("127.0.0.1", "normal", 200),
|
pytest.param("127.0.0.1", "normal", 200, marks=pytest.mark.feature_complete),
|
||||||
("127.0.0.1", "detail", 200),
|
pytest.param("127.0.0.1", "detail", 200, marks=pytest.mark.feature_complete),
|
||||||
("127.0.0.1", "skipdiscovery", 200),
|
("127.0.0.1", "skipdiscovery", 200),
|
||||||
("127.0.0.1", "invalidmode", 400),
|
("127.0.0.1", "invalidmode", 400),
|
||||||
("999.999.999.999", "fast", 400),
|
("999.999.999.999", "fast", 400),
|
||||||
|
|||||||
659
test/backend/sql_safe_builder.py
Normal file
659
test/backend/sql_safe_builder.py
Normal file
@@ -0,0 +1,659 @@
|
|||||||
|
"""
|
||||||
|
NetAlertX SQL Safe Builder Module - Test Version
|
||||||
|
|
||||||
|
This module provides safe SQL condition building functionality to prevent
|
||||||
|
SQL injection vulnerabilities. It validates inputs against whitelists,
|
||||||
|
sanitizes data, and returns parameterized queries.
|
||||||
|
|
||||||
|
Standalone version for testing without dependencies.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import re
|
||||||
|
from typing import Dict, List, Tuple, Any, Optional
|
||||||
|
|
||||||
|
|
||||||
|
class SafeConditionBuilder:
|
||||||
|
"""
|
||||||
|
A secure SQL condition builder that validates inputs against whitelists
|
||||||
|
and generates parameterized SQL snippets to prevent SQL injection.
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Whitelist of allowed column names for filtering
|
||||||
|
ALLOWED_COLUMNS = {
|
||||||
|
"eve_MAC",
|
||||||
|
"eve_DateTime",
|
||||||
|
"eve_IP",
|
||||||
|
"eve_EventType",
|
||||||
|
"devName",
|
||||||
|
"devComments",
|
||||||
|
"devLastIP",
|
||||||
|
"devVendor",
|
||||||
|
"devAlertEvents",
|
||||||
|
"devAlertDown",
|
||||||
|
"devIsArchived",
|
||||||
|
"devPresentLastScan",
|
||||||
|
"devFavorite",
|
||||||
|
"devIsNew",
|
||||||
|
"Plugin",
|
||||||
|
"Object_PrimaryId",
|
||||||
|
"Object_SecondaryId",
|
||||||
|
"DateTimeChanged",
|
||||||
|
"Watched_Value1",
|
||||||
|
"Watched_Value2",
|
||||||
|
"Watched_Value3",
|
||||||
|
"Watched_Value4",
|
||||||
|
"Status",
|
||||||
|
}
|
||||||
|
|
||||||
|
# Whitelist of allowed comparison operators
|
||||||
|
ALLOWED_OPERATORS = {
|
||||||
|
"=",
|
||||||
|
"!=",
|
||||||
|
"<>",
|
||||||
|
"<",
|
||||||
|
">",
|
||||||
|
"<=",
|
||||||
|
">=",
|
||||||
|
"LIKE",
|
||||||
|
"NOT LIKE",
|
||||||
|
"IN",
|
||||||
|
"NOT IN",
|
||||||
|
"IS NULL",
|
||||||
|
"IS NOT NULL",
|
||||||
|
}
|
||||||
|
|
||||||
|
# Whitelist of allowed logical operators
|
||||||
|
ALLOWED_LOGICAL_OPERATORS = {"AND", "OR"}
|
||||||
|
|
||||||
|
# Whitelist of allowed event types
|
||||||
|
ALLOWED_EVENT_TYPES = {
|
||||||
|
"New Device",
|
||||||
|
"Connected",
|
||||||
|
"Disconnected",
|
||||||
|
"Device Down",
|
||||||
|
"Down Reconnected",
|
||||||
|
"IP Changed",
|
||||||
|
}
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
"""Initialize the SafeConditionBuilder."""
|
||||||
|
self.parameters = {}
|
||||||
|
self.param_counter = 0
|
||||||
|
|
||||||
|
def _generate_param_name(self, prefix: str = "param") -> str:
|
||||||
|
"""Generate a unique parameter name for SQL binding."""
|
||||||
|
self.param_counter += 1
|
||||||
|
return f"{prefix}_{self.param_counter}"
|
||||||
|
|
||||||
|
def _sanitize_string(self, value: str) -> str:
|
||||||
|
"""
|
||||||
|
Sanitize string input by removing potentially dangerous characters.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
value: String to sanitize
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Sanitized string
|
||||||
|
"""
|
||||||
|
if not isinstance(value, str):
|
||||||
|
return str(value)
|
||||||
|
|
||||||
|
# Replace {s-quote} placeholder with single quote (maintaining compatibility)
|
||||||
|
value = value.replace("{s-quote}", "'")
|
||||||
|
|
||||||
|
# Remove any null bytes, control characters, and excessive whitespace
|
||||||
|
value = re.sub(r"[\x00-\x08\x0b\x0c\x0e-\x1f\x7f-\x84\x86-\x9f]", "", value)
|
||||||
|
value = re.sub(r"\s+", " ", value.strip())
|
||||||
|
|
||||||
|
return value
|
||||||
|
|
||||||
|
def _validate_column_name(self, column: str) -> bool:
|
||||||
|
"""
|
||||||
|
Validate that a column name is in the whitelist.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
column: Column name to validate
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
True if valid, False otherwise
|
||||||
|
"""
|
||||||
|
return column in self.ALLOWED_COLUMNS
|
||||||
|
|
||||||
|
def _validate_operator(self, operator: str) -> bool:
|
||||||
|
"""
|
||||||
|
Validate that an operator is in the whitelist.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
operator: Operator to validate
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
True if valid, False otherwise
|
||||||
|
"""
|
||||||
|
return operator.upper() in self.ALLOWED_OPERATORS
|
||||||
|
|
||||||
|
def _validate_logical_operator(self, logical_op: str) -> bool:
|
||||||
|
"""
|
||||||
|
Validate that a logical operator is in the whitelist.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
logical_op: Logical operator to validate
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
True if valid, False otherwise
|
||||||
|
"""
|
||||||
|
return logical_op.upper() in self.ALLOWED_LOGICAL_OPERATORS
|
||||||
|
|
||||||
|
def build_safe_condition(self, condition_string: str) -> Tuple[str, Dict[str, Any]]:
|
||||||
|
"""
|
||||||
|
Parse and build a safe SQL condition from a user-provided string.
|
||||||
|
This method attempts to parse common condition patterns and convert
|
||||||
|
them to parameterized queries.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
condition_string: User-provided condition string
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Tuple of (safe_sql_snippet, parameters_dict)
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ValueError: If the condition contains invalid or unsafe elements
|
||||||
|
"""
|
||||||
|
if not condition_string or not condition_string.strip():
|
||||||
|
return "", {}
|
||||||
|
|
||||||
|
# Sanitize the input
|
||||||
|
condition_string = self._sanitize_string(condition_string)
|
||||||
|
|
||||||
|
# Reset parameters for this condition
|
||||||
|
self.parameters = {}
|
||||||
|
self.param_counter = 0
|
||||||
|
|
||||||
|
try:
|
||||||
|
return self._parse_condition(condition_string)
|
||||||
|
except Exception:
|
||||||
|
raise ValueError(f"Invalid condition format: {condition_string}")
|
||||||
|
|
||||||
|
def _parse_condition(self, condition: str) -> Tuple[str, Dict[str, Any]]:
|
||||||
|
"""
|
||||||
|
Parse a condition string into safe SQL with parameters.
|
||||||
|
|
||||||
|
This method handles both single and compound conditions:
|
||||||
|
- Single: AND devName = 'value'
|
||||||
|
- Compound: AND devName = 'value' AND devVendor = 'Apple'
|
||||||
|
- Multiple clauses with AND/OR operators
|
||||||
|
|
||||||
|
Args:
|
||||||
|
condition: Condition string to parse
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Tuple of (safe_sql_snippet, parameters_dict)
|
||||||
|
"""
|
||||||
|
condition = condition.strip()
|
||||||
|
|
||||||
|
# Handle empty conditions
|
||||||
|
if not condition:
|
||||||
|
return "", {}
|
||||||
|
|
||||||
|
# Check if this is a compound condition (multiple clauses)
|
||||||
|
if self._is_compound_condition(condition):
|
||||||
|
return self._parse_compound_condition(condition)
|
||||||
|
|
||||||
|
# Single condition: extract leading logical operator if present
|
||||||
|
logical_op = None
|
||||||
|
clause_text = condition
|
||||||
|
|
||||||
|
# Check for leading AND
|
||||||
|
if condition.upper().startswith("AND ") or condition.upper().startswith(
|
||||||
|
"AND\t"
|
||||||
|
):
|
||||||
|
logical_op = "AND"
|
||||||
|
clause_text = condition[3:].strip()
|
||||||
|
# Check for leading OR
|
||||||
|
elif condition.upper().startswith("OR ") or condition.upper().startswith(
|
||||||
|
"OR\t"
|
||||||
|
):
|
||||||
|
logical_op = "OR"
|
||||||
|
clause_text = condition[2:].strip()
|
||||||
|
|
||||||
|
# Parse the single condition
|
||||||
|
return self._parse_single_condition(clause_text, logical_op)
|
||||||
|
|
||||||
|
def _is_compound_condition(self, condition: str) -> bool:
|
||||||
|
"""
|
||||||
|
Determine if a condition contains multiple clauses (compound condition).
|
||||||
|
|
||||||
|
A compound condition has multiple logical operators (AND/OR) connecting
|
||||||
|
separate comparison clauses.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
condition: Condition string to check
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
True if compound (multiple clauses), False if single clause
|
||||||
|
"""
|
||||||
|
# Track if we're inside quotes to avoid counting operators in quoted strings
|
||||||
|
in_quotes = False
|
||||||
|
logical_op_count = 0
|
||||||
|
i = 0
|
||||||
|
|
||||||
|
while i < len(condition):
|
||||||
|
char = condition[i]
|
||||||
|
|
||||||
|
# Toggle quote state
|
||||||
|
if char == "'":
|
||||||
|
in_quotes = not in_quotes
|
||||||
|
i += 1
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Only count logical operators outside of quotes
|
||||||
|
if not in_quotes:
|
||||||
|
# Look for AND or OR as whole words
|
||||||
|
remaining = condition[i:].upper()
|
||||||
|
|
||||||
|
# Check for AND (must be word boundary)
|
||||||
|
if remaining.startswith("AND ") or remaining.startswith("AND\t"):
|
||||||
|
logical_op_count += 1
|
||||||
|
i += 3
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Check for OR (must be word boundary)
|
||||||
|
if remaining.startswith("OR ") or remaining.startswith("OR\t"):
|
||||||
|
logical_op_count += 1
|
||||||
|
i += 2
|
||||||
|
continue
|
||||||
|
|
||||||
|
i += 1
|
||||||
|
|
||||||
|
# A compound condition has more than one logical operator
|
||||||
|
# (first AND/OR starts the condition, subsequent ones connect clauses)
|
||||||
|
return logical_op_count > 1
|
||||||
|
|
||||||
|
def _parse_compound_condition(self, condition: str) -> Tuple[str, Dict[str, Any]]:
|
||||||
|
"""
|
||||||
|
Parse a compound condition with multiple clauses.
|
||||||
|
|
||||||
|
Splits the condition into individual clauses, parses each one,
|
||||||
|
and reconstructs the full condition with all parameters.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
condition: Compound condition string
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Tuple of (safe_sql_snippet, parameters_dict)
|
||||||
|
"""
|
||||||
|
# Split the condition into individual clauses while preserving logical operators
|
||||||
|
clauses = self._split_by_logical_operators(condition)
|
||||||
|
|
||||||
|
# Parse each clause individually
|
||||||
|
parsed_parts = []
|
||||||
|
all_params = {}
|
||||||
|
|
||||||
|
for clause_text, logical_op in clauses:
|
||||||
|
# Parse this single clause
|
||||||
|
sql_part, params = self._parse_single_condition(clause_text, logical_op)
|
||||||
|
|
||||||
|
if sql_part:
|
||||||
|
parsed_parts.append(sql_part)
|
||||||
|
all_params.update(params)
|
||||||
|
|
||||||
|
if not parsed_parts:
|
||||||
|
raise ValueError("No valid clauses found in compound condition")
|
||||||
|
|
||||||
|
# Join all parsed parts
|
||||||
|
final_sql = " ".join(parsed_parts)
|
||||||
|
|
||||||
|
return final_sql, all_params
|
||||||
|
|
||||||
|
def _split_by_logical_operators(
|
||||||
|
self, condition: str
|
||||||
|
) -> List[Tuple[str, Optional[str]]]:
|
||||||
|
"""
|
||||||
|
Split a compound condition into individual clauses.
|
||||||
|
|
||||||
|
Returns a list of tuples: (clause_text, logical_operator)
|
||||||
|
The logical operator is the AND/OR that precedes the clause.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
condition: Compound condition string
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
List of (clause_text, logical_op) tuples
|
||||||
|
"""
|
||||||
|
clauses = []
|
||||||
|
current_clause = []
|
||||||
|
current_logical_op = None
|
||||||
|
in_quotes = False
|
||||||
|
i = 0
|
||||||
|
|
||||||
|
while i < len(condition):
|
||||||
|
char = condition[i]
|
||||||
|
|
||||||
|
# Toggle quote state
|
||||||
|
if char == "'":
|
||||||
|
in_quotes = not in_quotes
|
||||||
|
current_clause.append(char)
|
||||||
|
i += 1
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Only look for logical operators outside of quotes
|
||||||
|
if not in_quotes:
|
||||||
|
remaining = condition[i:].upper()
|
||||||
|
|
||||||
|
# Check if we're at a word boundary (start of string or after whitespace)
|
||||||
|
at_word_boundary = i == 0 or condition[i - 1] in " \t"
|
||||||
|
|
||||||
|
# Check for AND (must be at word boundary)
|
||||||
|
if at_word_boundary and (
|
||||||
|
remaining.startswith("AND ") or remaining.startswith("AND\t")
|
||||||
|
):
|
||||||
|
# Save current clause if we have one
|
||||||
|
if current_clause:
|
||||||
|
clause_text = "".join(current_clause).strip()
|
||||||
|
if clause_text:
|
||||||
|
clauses.append((clause_text, current_logical_op))
|
||||||
|
current_clause = []
|
||||||
|
|
||||||
|
# Set the logical operator for the next clause
|
||||||
|
current_logical_op = "AND"
|
||||||
|
i += 3 # Skip 'AND'
|
||||||
|
|
||||||
|
# Skip whitespace after AND
|
||||||
|
while i < len(condition) and condition[i] in " \t":
|
||||||
|
i += 1
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Check for OR (must be at word boundary)
|
||||||
|
if at_word_boundary and (
|
||||||
|
remaining.startswith("OR ") or remaining.startswith("OR\t")
|
||||||
|
):
|
||||||
|
# Save current clause if we have one
|
||||||
|
if current_clause:
|
||||||
|
clause_text = "".join(current_clause).strip()
|
||||||
|
if clause_text:
|
||||||
|
clauses.append((clause_text, current_logical_op))
|
||||||
|
current_clause = []
|
||||||
|
|
||||||
|
# Set the logical operator for the next clause
|
||||||
|
current_logical_op = "OR"
|
||||||
|
i += 2 # Skip 'OR'
|
||||||
|
|
||||||
|
# Skip whitespace after OR
|
||||||
|
while i < len(condition) and condition[i] in " \t":
|
||||||
|
i += 1
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Add character to current clause
|
||||||
|
current_clause.append(char)
|
||||||
|
i += 1
|
||||||
|
|
||||||
|
# Don't forget the last clause
|
||||||
|
if current_clause:
|
||||||
|
clause_text = "".join(current_clause).strip()
|
||||||
|
if clause_text:
|
||||||
|
clauses.append((clause_text, current_logical_op))
|
||||||
|
|
||||||
|
return clauses
|
||||||
|
|
||||||
|
def _parse_single_condition(
|
||||||
|
self, condition: str, logical_op: Optional[str] = None
|
||||||
|
) -> Tuple[str, Dict[str, Any]]:
|
||||||
|
"""
|
||||||
|
Parse a single condition clause into safe SQL with parameters.
|
||||||
|
|
||||||
|
This method handles basic patterns like:
|
||||||
|
- devName = 'value' (with optional AND/OR prefix)
|
||||||
|
- devComments LIKE '%value%'
|
||||||
|
- eve_EventType IN ('type1', 'type2')
|
||||||
|
|
||||||
|
Args:
|
||||||
|
condition: Single condition string to parse
|
||||||
|
logical_op: Optional logical operator (AND/OR) to prepend
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Tuple of (safe_sql_snippet, parameters_dict)
|
||||||
|
"""
|
||||||
|
condition = condition.strip()
|
||||||
|
|
||||||
|
# Handle empty conditions
|
||||||
|
if not condition:
|
||||||
|
return "", {}
|
||||||
|
|
||||||
|
# Simple pattern matching for common conditions
|
||||||
|
# Pattern 1: [AND/OR] column operator value (supporting Unicode in quoted strings)
|
||||||
|
pattern1 = r"^\s*(\w+)\s+(=|!=|<>|<|>|<=|>=|LIKE|NOT\s+LIKE)\s+\'([^\']*)\'\s*$"
|
||||||
|
match1 = re.match(pattern1, condition, re.IGNORECASE | re.UNICODE)
|
||||||
|
|
||||||
|
if match1:
|
||||||
|
column, operator, value = match1.groups()
|
||||||
|
return self._build_simple_condition(logical_op, column, operator, value)
|
||||||
|
|
||||||
|
# Pattern 2: [AND/OR] column IN ('val1', 'val2', ...)
|
||||||
|
pattern2 = r"^\s*(\w+)\s+(IN|NOT\s+IN)\s+\(([^)]+)\)\s*$"
|
||||||
|
match2 = re.match(pattern2, condition, re.IGNORECASE)
|
||||||
|
|
||||||
|
if match2:
|
||||||
|
column, operator, values_str = match2.groups()
|
||||||
|
return self._build_in_condition(logical_op, column, operator, values_str)
|
||||||
|
|
||||||
|
# Pattern 3: [AND/OR] column IS NULL/IS NOT NULL
|
||||||
|
pattern3 = r"^\s*(\w+)\s+(IS\s+NULL|IS\s+NOT\s+NULL)\s*$"
|
||||||
|
match3 = re.match(pattern3, condition, re.IGNORECASE)
|
||||||
|
|
||||||
|
if match3:
|
||||||
|
column, operator = match3.groups()
|
||||||
|
return self._build_null_condition(logical_op, column, operator)
|
||||||
|
|
||||||
|
# If no patterns match, reject the condition for security
|
||||||
|
raise ValueError(f"Unsupported condition pattern: {condition}")
|
||||||
|
|
||||||
|
def _build_simple_condition(
|
||||||
|
self, logical_op: Optional[str], column: str, operator: str, value: str
|
||||||
|
) -> Tuple[str, Dict[str, Any]]:
|
||||||
|
"""Build a simple condition with parameter binding."""
|
||||||
|
# Validate components
|
||||||
|
if not self._validate_column_name(column):
|
||||||
|
raise ValueError(f"Invalid column name: {column}")
|
||||||
|
|
||||||
|
if not self._validate_operator(operator):
|
||||||
|
raise ValueError(f"Invalid operator: {operator}")
|
||||||
|
|
||||||
|
if logical_op and not self._validate_logical_operator(logical_op):
|
||||||
|
raise ValueError(f"Invalid logical operator: {logical_op}")
|
||||||
|
|
||||||
|
# Generate parameter name and store value
|
||||||
|
param_name = self._generate_param_name()
|
||||||
|
self.parameters[param_name] = value
|
||||||
|
|
||||||
|
# Build the SQL snippet
|
||||||
|
sql_parts = []
|
||||||
|
if logical_op:
|
||||||
|
sql_parts.append(logical_op.upper())
|
||||||
|
|
||||||
|
sql_parts.extend([column, operator.upper(), f":{param_name}"])
|
||||||
|
|
||||||
|
return " ".join(sql_parts), self.parameters
|
||||||
|
|
||||||
|
def _build_in_condition(
|
||||||
|
self, logical_op: Optional[str], column: str, operator: str, values_str: str
|
||||||
|
) -> Tuple[str, Dict[str, Any]]:
|
||||||
|
"""Build an IN condition with parameter binding."""
|
||||||
|
# Validate components
|
||||||
|
if not self._validate_column_name(column):
|
||||||
|
raise ValueError(f"Invalid column name: {column}")
|
||||||
|
|
||||||
|
if logical_op and not self._validate_logical_operator(logical_op):
|
||||||
|
raise ValueError(f"Invalid logical operator: {logical_op}")
|
||||||
|
|
||||||
|
# Simple regex to extract quoted values
|
||||||
|
value_pattern = r"'([^']*)'"
|
||||||
|
matches = re.findall(value_pattern, values_str)
|
||||||
|
|
||||||
|
if not matches:
|
||||||
|
raise ValueError("No valid values found in IN clause")
|
||||||
|
|
||||||
|
# Generate parameters for each value
|
||||||
|
param_names = []
|
||||||
|
for value in matches:
|
||||||
|
param_name = self._generate_param_name()
|
||||||
|
self.parameters[param_name] = value
|
||||||
|
param_names.append(f":{param_name}")
|
||||||
|
|
||||||
|
# Build the SQL snippet
|
||||||
|
sql_parts = []
|
||||||
|
if logical_op:
|
||||||
|
sql_parts.append(logical_op.upper())
|
||||||
|
|
||||||
|
sql_parts.extend([column, operator.upper(), f"({', '.join(param_names)})"])
|
||||||
|
|
||||||
|
return " ".join(sql_parts), self.parameters
|
||||||
|
|
||||||
|
def _build_null_condition(
|
||||||
|
self, logical_op: Optional[str], column: str, operator: str
|
||||||
|
) -> Tuple[str, Dict[str, Any]]:
|
||||||
|
"""Build a NULL check condition."""
|
||||||
|
# Validate components
|
||||||
|
if not self._validate_column_name(column):
|
||||||
|
raise ValueError(f"Invalid column name: {column}")
|
||||||
|
|
||||||
|
if logical_op and not self._validate_logical_operator(logical_op):
|
||||||
|
raise ValueError(f"Invalid logical operator: {logical_op}")
|
||||||
|
|
||||||
|
# Build the SQL snippet (no parameters needed for NULL checks)
|
||||||
|
sql_parts = []
|
||||||
|
if logical_op:
|
||||||
|
sql_parts.append(logical_op.upper())
|
||||||
|
|
||||||
|
sql_parts.extend([column, operator.upper()])
|
||||||
|
|
||||||
|
return " ".join(sql_parts), {}
|
||||||
|
|
||||||
|
def build_device_name_filter(self, device_name: str) -> Tuple[str, Dict[str, Any]]:
|
||||||
|
"""
|
||||||
|
Build a safe device name filter condition.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
device_name: Device name to filter for
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Tuple of (safe_sql_snippet, parameters_dict)
|
||||||
|
"""
|
||||||
|
if not device_name:
|
||||||
|
return "", {}
|
||||||
|
|
||||||
|
device_name = self._sanitize_string(device_name)
|
||||||
|
param_name = self._generate_param_name("device_name")
|
||||||
|
self.parameters[param_name] = device_name
|
||||||
|
|
||||||
|
return f"AND devName = :{param_name}", self.parameters
|
||||||
|
|
||||||
|
def build_condition(
|
||||||
|
self, conditions: List[Dict[str, str]], logical_operator: str = "AND"
|
||||||
|
) -> Tuple[str, Dict[str, Any]]:
|
||||||
|
"""
|
||||||
|
Build a safe SQL condition from a list of condition dictionaries.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
conditions: List of condition dicts with 'column', 'operator', 'value' keys
|
||||||
|
logical_operator: Logical operator to join conditions (AND/OR)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Tuple of (safe_sql_snippet, parameters_dict)
|
||||||
|
"""
|
||||||
|
if not conditions:
|
||||||
|
return "", {}
|
||||||
|
|
||||||
|
if not self._validate_logical_operator(logical_operator):
|
||||||
|
return "", {}
|
||||||
|
|
||||||
|
condition_parts = []
|
||||||
|
all_params = {}
|
||||||
|
|
||||||
|
for condition_dict in conditions:
|
||||||
|
try:
|
||||||
|
column = condition_dict.get("column", "")
|
||||||
|
operator = condition_dict.get("operator", "")
|
||||||
|
value = condition_dict.get("value", "")
|
||||||
|
|
||||||
|
# Validate each component
|
||||||
|
if not self._validate_column_name(column):
|
||||||
|
return "", {}
|
||||||
|
|
||||||
|
if not self._validate_operator(operator):
|
||||||
|
return "", {}
|
||||||
|
|
||||||
|
# Create parameter binding
|
||||||
|
param_name = self._generate_param_name()
|
||||||
|
all_params[param_name] = self._sanitize_string(str(value))
|
||||||
|
|
||||||
|
# Build condition part
|
||||||
|
condition_part = f"{column} {operator} :{param_name}"
|
||||||
|
condition_parts.append(condition_part)
|
||||||
|
|
||||||
|
except Exception:
|
||||||
|
return "", {}
|
||||||
|
|
||||||
|
if not condition_parts:
|
||||||
|
return "", {}
|
||||||
|
|
||||||
|
# Join all parts with the logical operator
|
||||||
|
final_condition = f" {logical_operator} ".join(condition_parts)
|
||||||
|
self.parameters.update(all_params)
|
||||||
|
|
||||||
|
return final_condition, self.parameters
|
||||||
|
|
||||||
|
def build_event_type_filter(
|
||||||
|
self, event_types: List[str]
|
||||||
|
) -> Tuple[str, Dict[str, Any]]:
|
||||||
|
"""
|
||||||
|
Build a safe event type filter condition.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
event_types: List of event types to filter for
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Tuple of (safe_sql_snippet, parameters_dict)
|
||||||
|
"""
|
||||||
|
if not event_types:
|
||||||
|
return "", {}
|
||||||
|
|
||||||
|
# Validate event types against whitelist
|
||||||
|
valid_types = []
|
||||||
|
for event_type in event_types:
|
||||||
|
event_type = self._sanitize_string(event_type)
|
||||||
|
if event_type in self.ALLOWED_EVENT_TYPES:
|
||||||
|
valid_types.append(event_type)
|
||||||
|
|
||||||
|
if not valid_types:
|
||||||
|
return "", {}
|
||||||
|
|
||||||
|
# Generate parameters for each valid event type
|
||||||
|
param_names = []
|
||||||
|
for event_type in valid_types:
|
||||||
|
param_name = self._generate_param_name("event_type")
|
||||||
|
self.parameters[param_name] = event_type
|
||||||
|
param_names.append(f":{param_name}")
|
||||||
|
|
||||||
|
sql_snippet = f"AND eve_EventType IN ({', '.join(param_names)})"
|
||||||
|
return sql_snippet, self.parameters
|
||||||
|
|
||||||
|
def get_safe_condition_legacy(
|
||||||
|
self, condition_setting: str
|
||||||
|
) -> Tuple[str, Dict[str, Any]]:
|
||||||
|
"""
|
||||||
|
Convert legacy condition settings to safe parameterized queries.
|
||||||
|
This method provides backward compatibility for existing condition formats.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
condition_setting: The condition string from settings
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Tuple of (safe_sql_snippet, parameters_dict)
|
||||||
|
"""
|
||||||
|
if not condition_setting or not condition_setting.strip():
|
||||||
|
return "", {}
|
||||||
|
|
||||||
|
try:
|
||||||
|
return self.build_safe_condition(condition_setting)
|
||||||
|
except ValueError:
|
||||||
|
# Return empty condition for safety
|
||||||
|
return "", {}
|
||||||
@@ -6,11 +6,12 @@ This test file has minimal dependencies to ensure it can run in any environment.
|
|||||||
import sys
|
import sys
|
||||||
import unittest
|
import unittest
|
||||||
import re
|
import re
|
||||||
from unittest.mock import Mock, patch
|
from unittest.mock import Mock
|
||||||
|
|
||||||
# Mock the logger module to avoid dependency issues
|
# Mock the logger module to avoid dependency issues
|
||||||
sys.modules['logger'] = Mock()
|
sys.modules['logger'] = Mock()
|
||||||
|
|
||||||
|
|
||||||
# Standalone version of SafeConditionBuilder for testing
|
# Standalone version of SafeConditionBuilder for testing
|
||||||
class TestSafeConditionBuilder:
|
class TestSafeConditionBuilder:
|
||||||
"""
|
"""
|
||||||
@@ -92,7 +93,7 @@ class TestSafeConditionBuilder:
|
|||||||
|
|
||||||
try:
|
try:
|
||||||
return self._parse_condition(condition_string)
|
return self._parse_condition(condition_string)
|
||||||
except Exception as e:
|
except Exception:
|
||||||
raise ValueError(f"Invalid condition format: {condition_string}")
|
raise ValueError(f"Invalid condition format: {condition_string}")
|
||||||
|
|
||||||
def _parse_condition(self, condition):
|
def _parse_condition(self, condition):
|
||||||
@@ -262,7 +263,6 @@ class TestSafeConditionBuilderSecurity(unittest.TestCase):
|
|||||||
# Ensure no leakage between calls
|
# Ensure no leakage between calls
|
||||||
self.assertNotEqual(params1, params2)
|
self.assertNotEqual(params1, params2)
|
||||||
|
|
||||||
|
|
||||||
def test_xss_prevention(self):
|
def test_xss_prevention(self):
|
||||||
"""Test that XSS-like payloads in device names are handled safely."""
|
"""Test that XSS-like payloads in device names are handled safely."""
|
||||||
xss_payloads = [
|
xss_payloads = [
|
||||||
|
|||||||
@@ -8,8 +8,7 @@ properly addressed in the reporting.py module.
|
|||||||
|
|
||||||
import sys
|
import sys
|
||||||
import os
|
import os
|
||||||
import unittest
|
import pytest
|
||||||
from unittest.mock import Mock, patch, MagicMock
|
|
||||||
|
|
||||||
# Add parent directory to path
|
# Add parent directory to path
|
||||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'server'))
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'server'))
|
||||||
@@ -19,203 +18,215 @@ sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'server', 'db')
|
|||||||
from sql_safe_builder import SafeConditionBuilder
|
from sql_safe_builder import SafeConditionBuilder
|
||||||
|
|
||||||
|
|
||||||
class TestSQLInjectionPrevention(unittest.TestCase):
|
@pytest.fixture
|
||||||
"""Test suite for SQL injection prevention."""
|
def builder():
|
||||||
|
"""Fixture to provide a SafeConditionBuilder instance."""
|
||||||
def setUp(self):
|
return SafeConditionBuilder()
|
||||||
"""Set up test fixtures."""
|
|
||||||
self.builder = SafeConditionBuilder()
|
|
||||||
|
|
||||||
def test_sql_injection_attempt_single_quote(self):
|
|
||||||
"""Test that single quote injection attempts are blocked."""
|
|
||||||
malicious_input = "'; DROP TABLE users; --"
|
|
||||||
condition, params = self.builder.get_safe_condition_legacy(malicious_input)
|
|
||||||
|
|
||||||
# Should return empty condition when invalid
|
|
||||||
self.assertEqual(condition, "")
|
|
||||||
self.assertEqual(params, {})
|
|
||||||
|
|
||||||
def test_sql_injection_attempt_union(self):
|
|
||||||
"""Test that UNION injection attempts are blocked."""
|
|
||||||
malicious_input = "1' UNION SELECT * FROM passwords --"
|
|
||||||
condition, params = self.builder.get_safe_condition_legacy(malicious_input)
|
|
||||||
|
|
||||||
# Should return empty condition when invalid
|
|
||||||
self.assertEqual(condition, "")
|
|
||||||
self.assertEqual(params, {})
|
|
||||||
|
|
||||||
def test_sql_injection_attempt_or_true(self):
|
|
||||||
"""Test that OR 1=1 injection attempts are blocked."""
|
|
||||||
malicious_input = "' OR '1'='1"
|
|
||||||
condition, params = self.builder.get_safe_condition_legacy(malicious_input)
|
|
||||||
|
|
||||||
# Should return empty condition when invalid
|
|
||||||
self.assertEqual(condition, "")
|
|
||||||
self.assertEqual(params, {})
|
|
||||||
|
|
||||||
def test_valid_simple_condition(self):
|
|
||||||
"""Test that valid simple conditions are handled correctly."""
|
|
||||||
valid_input = "AND devName = 'Test Device'"
|
|
||||||
condition, params = self.builder.get_safe_condition_legacy(valid_input)
|
|
||||||
|
|
||||||
# Should create parameterized query
|
|
||||||
self.assertIn("AND devName = :", condition)
|
|
||||||
self.assertEqual(len(params), 1)
|
|
||||||
self.assertIn('Test Device', list(params.values()))
|
|
||||||
|
|
||||||
def test_empty_condition(self):
|
|
||||||
"""Test that empty conditions are handled safely."""
|
|
||||||
empty_input = ""
|
|
||||||
condition, params = self.builder.get_safe_condition_legacy(empty_input)
|
|
||||||
|
|
||||||
# Should return empty condition
|
|
||||||
self.assertEqual(condition, "")
|
|
||||||
self.assertEqual(params, {})
|
|
||||||
|
|
||||||
def test_whitespace_only_condition(self):
|
|
||||||
"""Test that whitespace-only conditions are handled safely."""
|
|
||||||
whitespace_input = " \n\t "
|
|
||||||
condition, params = self.builder.get_safe_condition_legacy(whitespace_input)
|
|
||||||
|
|
||||||
# Should return empty condition
|
|
||||||
self.assertEqual(condition, "")
|
|
||||||
self.assertEqual(params, {})
|
|
||||||
|
|
||||||
def test_multiple_conditions_valid(self):
|
|
||||||
"""Test that single valid conditions are handled correctly."""
|
|
||||||
# Test with a single condition first (our current parser handles single conditions well)
|
|
||||||
valid_input = "AND devName = 'Device1'"
|
|
||||||
condition, params = self.builder.get_safe_condition_legacy(valid_input)
|
|
||||||
|
|
||||||
# Should create parameterized query
|
|
||||||
self.assertIn("devName = :", condition)
|
|
||||||
self.assertEqual(len(params), 1)
|
|
||||||
self.assertIn('Device1', list(params.values()))
|
|
||||||
|
|
||||||
def test_disallowed_column_name(self):
|
|
||||||
"""Test that non-whitelisted column names are rejected."""
|
|
||||||
invalid_input = "AND malicious_column = 'value'"
|
|
||||||
condition, params = self.builder.get_safe_condition_legacy(invalid_input)
|
|
||||||
|
|
||||||
# Should return empty condition when column not in whitelist
|
|
||||||
self.assertEqual(condition, "")
|
|
||||||
self.assertEqual(params, {})
|
|
||||||
|
|
||||||
def test_disallowed_operator(self):
|
|
||||||
"""Test that non-whitelisted operators are rejected."""
|
|
||||||
invalid_input = "AND devName SOUNDS LIKE 'test'"
|
|
||||||
condition, params = self.builder.get_safe_condition_legacy(invalid_input)
|
|
||||||
|
|
||||||
# Should return empty condition when operator not allowed
|
|
||||||
self.assertEqual(condition, "")
|
|
||||||
self.assertEqual(params, {})
|
|
||||||
|
|
||||||
def test_nested_select_attempt(self):
|
|
||||||
"""Test that nested SELECT attempts are blocked."""
|
|
||||||
malicious_input = "AND devName IN (SELECT password FROM users)"
|
|
||||||
condition, params = self.builder.get_safe_condition_legacy(malicious_input)
|
|
||||||
|
|
||||||
# Should return empty condition when nested SELECT detected
|
|
||||||
self.assertEqual(condition, "")
|
|
||||||
self.assertEqual(params, {})
|
|
||||||
|
|
||||||
def test_hex_encoding_attempt(self):
|
|
||||||
"""Test that hex-encoded injection attempts are blocked."""
|
|
||||||
malicious_input = "AND 0x44524f50205441424c45"
|
|
||||||
condition, params = self.builder.get_safe_condition_legacy(malicious_input)
|
|
||||||
|
|
||||||
# Should return empty condition when hex encoding detected
|
|
||||||
self.assertEqual(condition, "")
|
|
||||||
self.assertEqual(params, {})
|
|
||||||
|
|
||||||
def test_comment_injection_attempt(self):
|
|
||||||
"""Test that comment injection attempts are handled."""
|
|
||||||
malicious_input = "AND devName = 'test' /* comment */ --"
|
|
||||||
condition, params = self.builder.get_safe_condition_legacy(malicious_input)
|
|
||||||
|
|
||||||
# Comments should be stripped and condition validated
|
|
||||||
if condition:
|
|
||||||
self.assertNotIn("/*", condition)
|
|
||||||
self.assertNotIn("--", condition)
|
|
||||||
|
|
||||||
def test_special_placeholder_replacement(self):
|
|
||||||
"""Test that {s-quote} placeholder is safely replaced."""
|
|
||||||
input_with_placeholder = "AND devName = {s-quote}Test{s-quote}"
|
|
||||||
condition, params = self.builder.get_safe_condition_legacy(input_with_placeholder)
|
|
||||||
|
|
||||||
# Should handle placeholder safely
|
|
||||||
if condition:
|
|
||||||
self.assertNotIn("{s-quote}", condition)
|
|
||||||
self.assertIn("devName = :", condition)
|
|
||||||
|
|
||||||
def test_null_byte_injection(self):
|
|
||||||
"""Test that null byte injection attempts are blocked."""
|
|
||||||
malicious_input = "AND devName = 'test\x00' DROP TABLE --"
|
|
||||||
condition, params = self.builder.get_safe_condition_legacy(malicious_input)
|
|
||||||
|
|
||||||
# Null bytes should be sanitized
|
|
||||||
if condition:
|
|
||||||
self.assertNotIn("\x00", condition)
|
|
||||||
for value in params.values():
|
|
||||||
self.assertNotIn("\x00", str(value))
|
|
||||||
|
|
||||||
def test_build_condition_with_allowed_values(self):
|
|
||||||
"""Test building condition with specific allowed values."""
|
|
||||||
conditions = [
|
|
||||||
{"column": "eve_EventType", "operator": "=", "value": "Connected"},
|
|
||||||
{"column": "devName", "operator": "LIKE", "value": "%test%"}
|
|
||||||
]
|
|
||||||
condition, params = self.builder.build_condition(conditions, "AND")
|
|
||||||
|
|
||||||
# Should create valid parameterized condition
|
|
||||||
self.assertIn("eve_EventType = :", condition)
|
|
||||||
self.assertIn("devName LIKE :", condition)
|
|
||||||
self.assertEqual(len(params), 2)
|
|
||||||
|
|
||||||
def test_build_condition_with_invalid_column(self):
|
|
||||||
"""Test that invalid columns in build_condition are rejected."""
|
|
||||||
conditions = [
|
|
||||||
{"column": "invalid_column", "operator": "=", "value": "test"}
|
|
||||||
]
|
|
||||||
condition, params = self.builder.build_condition(conditions)
|
|
||||||
|
|
||||||
# Should return empty when invalid column
|
|
||||||
self.assertEqual(condition, "")
|
|
||||||
self.assertEqual(params, {})
|
|
||||||
|
|
||||||
def test_case_variations_injection(self):
|
|
||||||
"""Test that case variation injection attempts are blocked."""
|
|
||||||
malicious_inputs = [
|
|
||||||
"AnD 1=1",
|
|
||||||
"oR 1=1",
|
|
||||||
"UnIoN SeLeCt * FrOm users"
|
|
||||||
]
|
|
||||||
|
|
||||||
for malicious_input in malicious_inputs:
|
|
||||||
condition, params = self.builder.get_safe_condition_legacy(malicious_input)
|
|
||||||
# Should handle case variations safely
|
|
||||||
if "union" in condition.lower() or "select" in condition.lower():
|
|
||||||
self.fail(f"Injection not blocked: {malicious_input}")
|
|
||||||
|
|
||||||
def test_time_based_injection_attempt(self):
|
|
||||||
"""Test that time-based injection attempts are blocked."""
|
|
||||||
malicious_input = "AND IF(1=1, SLEEP(5), 0)"
|
|
||||||
condition, params = self.builder.get_safe_condition_legacy(malicious_input)
|
|
||||||
|
|
||||||
# Should return empty condition when SQL functions detected
|
|
||||||
self.assertEqual(condition, "")
|
|
||||||
self.assertEqual(params, {})
|
|
||||||
|
|
||||||
def test_stacked_queries_attempt(self):
|
|
||||||
"""Test that stacked query attempts are blocked."""
|
|
||||||
malicious_input = "'; INSERT INTO admin VALUES ('hacker', 'password'); --"
|
|
||||||
condition, params = self.builder.get_safe_condition_legacy(malicious_input)
|
|
||||||
|
|
||||||
# Should return empty condition when semicolon detected
|
|
||||||
self.assertEqual(condition, "")
|
|
||||||
self.assertEqual(params, {})
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
def test_sql_injection_attempt_single_quote(builder):
|
||||||
# Run the tests
|
"""Test that single quote injection attempts are blocked."""
|
||||||
unittest.main(verbosity=2)
|
malicious_input = "'; DROP TABLE users; --"
|
||||||
|
condition, params = builder.get_safe_condition_legacy(malicious_input)
|
||||||
|
|
||||||
|
# Should return empty condition when invalid
|
||||||
|
assert condition == ""
|
||||||
|
assert params == {}
|
||||||
|
|
||||||
|
|
||||||
|
def test_sql_injection_attempt_union(builder):
|
||||||
|
"""Test that UNION injection attempts are blocked."""
|
||||||
|
malicious_input = "1' UNION SELECT * FROM passwords --"
|
||||||
|
condition, params = builder.get_safe_condition_legacy(malicious_input)
|
||||||
|
|
||||||
|
# Should return empty condition when invalid
|
||||||
|
assert condition == ""
|
||||||
|
assert params == {}
|
||||||
|
|
||||||
|
|
||||||
|
def test_sql_injection_attempt_or_true(builder):
|
||||||
|
"""Test that OR 1=1 injection attempts are blocked."""
|
||||||
|
malicious_input = "' OR '1'='1"
|
||||||
|
condition, params = builder.get_safe_condition_legacy(malicious_input)
|
||||||
|
|
||||||
|
# Should return empty condition when invalid
|
||||||
|
assert condition == ""
|
||||||
|
assert params == {}
|
||||||
|
|
||||||
|
|
||||||
|
def test_valid_simple_condition(builder):
|
||||||
|
"""Test that valid simple conditions are handled correctly."""
|
||||||
|
valid_input = "AND devName = 'Test Device'"
|
||||||
|
condition, params = builder.get_safe_condition_legacy(valid_input)
|
||||||
|
|
||||||
|
# Should create parameterized query
|
||||||
|
assert "AND devName = :" in condition
|
||||||
|
assert len(params) == 1
|
||||||
|
assert 'Test Device' in list(params.values())
|
||||||
|
|
||||||
|
|
||||||
|
def test_empty_condition(builder):
|
||||||
|
"""Test that empty conditions are handled safely."""
|
||||||
|
empty_input = ""
|
||||||
|
condition, params = builder.get_safe_condition_legacy(empty_input)
|
||||||
|
|
||||||
|
# Should return empty condition
|
||||||
|
assert condition == ""
|
||||||
|
assert params == {}
|
||||||
|
|
||||||
|
|
||||||
|
def test_whitespace_only_condition(builder):
|
||||||
|
"""Test that whitespace-only conditions are handled safely."""
|
||||||
|
whitespace_input = " \n\t "
|
||||||
|
condition, params = builder.get_safe_condition_legacy(whitespace_input)
|
||||||
|
|
||||||
|
# Should return empty condition
|
||||||
|
assert condition == ""
|
||||||
|
assert params == {}
|
||||||
|
|
||||||
|
|
||||||
|
def test_multiple_conditions_valid(builder):
|
||||||
|
"""Test that single valid conditions are handled correctly."""
|
||||||
|
# Test with a single condition first (our current parser handles single conditions well)
|
||||||
|
valid_input = "AND devName = 'Device1'"
|
||||||
|
condition, params = builder.get_safe_condition_legacy(valid_input)
|
||||||
|
|
||||||
|
# Should create parameterized query
|
||||||
|
assert "devName = :" in condition
|
||||||
|
assert len(params) == 1
|
||||||
|
assert 'Device1' in list(params.values())
|
||||||
|
|
||||||
|
|
||||||
|
def test_disallowed_column_name(builder):
|
||||||
|
"""Test that non-whitelisted column names are rejected."""
|
||||||
|
invalid_input = "AND malicious_column = 'value'"
|
||||||
|
condition, params = builder.get_safe_condition_legacy(invalid_input)
|
||||||
|
|
||||||
|
# Should return empty condition when column not in whitelist
|
||||||
|
assert condition == ""
|
||||||
|
assert params == {}
|
||||||
|
|
||||||
|
|
||||||
|
def test_disallowed_operator(builder):
|
||||||
|
"""Test that non-whitelisted operators are rejected."""
|
||||||
|
invalid_input = "AND devName SOUNDS LIKE 'test'"
|
||||||
|
condition, params = builder.get_safe_condition_legacy(invalid_input)
|
||||||
|
|
||||||
|
# Should return empty condition when operator not allowed
|
||||||
|
assert condition == ""
|
||||||
|
assert params == {}
|
||||||
|
|
||||||
|
|
||||||
|
def test_nested_select_attempt(builder):
|
||||||
|
"""Test that nested SELECT attempts are blocked."""
|
||||||
|
malicious_input = "AND devName IN (SELECT password FROM users)"
|
||||||
|
condition, params = builder.get_safe_condition_legacy(malicious_input)
|
||||||
|
|
||||||
|
# Should return empty condition when nested SELECT detected
|
||||||
|
assert condition == ""
|
||||||
|
assert params == {}
|
||||||
|
|
||||||
|
|
||||||
|
def test_hex_encoding_attempt(builder):
|
||||||
|
"""Test that hex-encoded injection attempts are blocked."""
|
||||||
|
malicious_input = "AND 0x44524f50205441424c45"
|
||||||
|
condition, params = builder.get_safe_condition_legacy(malicious_input)
|
||||||
|
|
||||||
|
# Should return empty condition when hex encoding detected
|
||||||
|
assert condition == ""
|
||||||
|
assert params == {}
|
||||||
|
|
||||||
|
|
||||||
|
def test_comment_injection_attempt(builder):
|
||||||
|
"""Test that comment injection attempts are handled."""
|
||||||
|
malicious_input = "AND devName = 'test' /* comment */ --"
|
||||||
|
condition, params = builder.get_safe_condition_legacy(malicious_input)
|
||||||
|
|
||||||
|
# Comments should be stripped and condition validated
|
||||||
|
if condition:
|
||||||
|
assert "/*" not in condition
|
||||||
|
assert "--" not in condition
|
||||||
|
|
||||||
|
|
||||||
|
def test_special_placeholder_replacement(builder):
|
||||||
|
"""Test that {s-quote} placeholder is safely replaced."""
|
||||||
|
input_with_placeholder = "AND devName = {s-quote}Test{s-quote}"
|
||||||
|
condition, params = builder.get_safe_condition_legacy(input_with_placeholder)
|
||||||
|
|
||||||
|
# Should handle placeholder safely
|
||||||
|
if condition:
|
||||||
|
assert "{s-quote}" not in condition
|
||||||
|
assert "devName = :" in condition
|
||||||
|
|
||||||
|
|
||||||
|
def test_null_byte_injection(builder):
|
||||||
|
"""Test that null byte injection attempts are blocked."""
|
||||||
|
malicious_input = "AND devName = 'test\x00' DROP TABLE --"
|
||||||
|
condition, params = builder.get_safe_condition_legacy(malicious_input)
|
||||||
|
|
||||||
|
# Null bytes should be sanitized
|
||||||
|
if condition:
|
||||||
|
assert "\x00" not in condition
|
||||||
|
for value in params.values():
|
||||||
|
assert "\x00" not in str(value)
|
||||||
|
|
||||||
|
|
||||||
|
def test_build_condition_with_allowed_values(builder):
|
||||||
|
"""Test building condition with specific allowed values."""
|
||||||
|
conditions = [
|
||||||
|
{"column": "eve_EventType", "operator": "=", "value": "Connected"},
|
||||||
|
{"column": "devName", "operator": "LIKE", "value": "%test%"}
|
||||||
|
]
|
||||||
|
condition, params = builder.build_condition(conditions, "AND")
|
||||||
|
|
||||||
|
# Should create valid parameterized condition
|
||||||
|
assert "eve_EventType = :" in condition
|
||||||
|
assert "devName LIKE :" in condition
|
||||||
|
assert len(params) == 2
|
||||||
|
|
||||||
|
|
||||||
|
def test_build_condition_with_invalid_column(builder):
|
||||||
|
"""Test that invalid columns in build_condition are rejected."""
|
||||||
|
conditions = [
|
||||||
|
{"column": "invalid_column", "operator": "=", "value": "test"}
|
||||||
|
]
|
||||||
|
condition, params = builder.build_condition(conditions)
|
||||||
|
|
||||||
|
# Should return empty when invalid column
|
||||||
|
assert condition == ""
|
||||||
|
assert params == {}
|
||||||
|
|
||||||
|
|
||||||
|
def test_case_variations_injection(builder):
|
||||||
|
"""Test that case variation injection attempts are blocked."""
|
||||||
|
malicious_inputs = [
|
||||||
|
"AnD 1=1",
|
||||||
|
"oR 1=1",
|
||||||
|
"UnIoN SeLeCt * FrOm users"
|
||||||
|
]
|
||||||
|
|
||||||
|
for malicious_input in malicious_inputs:
|
||||||
|
condition, params = builder.get_safe_condition_legacy(malicious_input)
|
||||||
|
# Should handle case variations safely
|
||||||
|
if "union" in condition.lower() or "select" in condition.lower():
|
||||||
|
assert False, f"Injection not blocked: {malicious_input}"
|
||||||
|
|
||||||
|
|
||||||
|
def test_time_based_injection_attempt(builder):
|
||||||
|
"""Test that time-based injection attempts are blocked."""
|
||||||
|
malicious_input = "AND IF(1=1, SLEEP(5), 0)"
|
||||||
|
condition, params = builder.get_safe_condition_legacy(malicious_input)
|
||||||
|
|
||||||
|
# Should return empty condition when SQL functions detected
|
||||||
|
assert condition == ""
|
||||||
|
assert params == {}
|
||||||
|
|
||||||
|
|
||||||
|
def test_stacked_queries_attempt(builder):
|
||||||
|
"""Test that stacked query attempts are blocked."""
|
||||||
|
malicious_input = "'; INSERT INTO admin VALUES ('hacker', 'password'); --"
|
||||||
|
condition, params = builder.get_safe_condition_legacy(malicious_input)
|
||||||
|
|
||||||
|
# Should return empty condition when semicolon detected
|
||||||
|
assert condition == ""
|
||||||
|
assert params == {}
|
||||||
|
|||||||
Reference in New Issue
Block a user