Files
NetAlertX/test/backend/test_sql_injection_prevention.py
jokob-sk ebeb7a07af BE: linting fixes 2
Signed-off-by: jokob-sk <jokob.sk@gmail.com>
2025-11-22 20:43:36 +11:00

234 lines
7.8 KiB
Python

#!/usr/bin/env python3
"""
Comprehensive SQL Injection Prevention Tests for NetAlertX
This test suite validates that all SQL injection vulnerabilities have been
properly addressed in the reporting.py module.
"""
import sys
import os
import pytest
# Add parent directory to path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'server'))
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'server', 'db'))
# Now import our module
from sql_safe_builder import SafeConditionBuilder # noqa: E402 [flake8 lint suppression]
@pytest.fixture
def builder():
"""Fixture to provide a SafeConditionBuilder instance."""
return SafeConditionBuilder()
def test_sql_injection_attempt_single_quote(builder):
"""Test that single quote injection attempts are blocked."""
malicious_input = "'; DROP TABLE users; --"
condition, params = builder.get_safe_condition_legacy(malicious_input)
# Should return empty condition when invalid
assert condition == ""
assert params == {}
def test_sql_injection_attempt_union(builder):
"""Test that UNION injection attempts are blocked."""
malicious_input = "1' UNION SELECT * FROM passwords --"
condition, params = builder.get_safe_condition_legacy(malicious_input)
# Should return empty condition when invalid
assert condition == ""
assert params == {}
def test_sql_injection_attempt_or_true(builder):
"""Test that OR 1=1 injection attempts are blocked."""
malicious_input = "' OR '1'='1"
condition, params = builder.get_safe_condition_legacy(malicious_input)
# Should return empty condition when invalid
assert condition == ""
assert params == {}
def test_valid_simple_condition(builder):
"""Test that valid simple conditions are handled correctly."""
valid_input = "AND devName = 'Test Device'"
condition, params = builder.get_safe_condition_legacy(valid_input)
# Should create parameterized query
assert "AND devName = :" in condition
assert len(params) == 1
assert 'Test Device' in list(params.values())
def test_empty_condition(builder):
"""Test that empty conditions are handled safely."""
empty_input = ""
condition, params = builder.get_safe_condition_legacy(empty_input)
# Should return empty condition
assert condition == ""
assert params == {}
def test_whitespace_only_condition(builder):
"""Test that whitespace-only conditions are handled safely."""
whitespace_input = " \n\t "
condition, params = builder.get_safe_condition_legacy(whitespace_input)
# Should return empty condition
assert condition == ""
assert params == {}
def test_multiple_conditions_valid(builder):
"""Test that single valid conditions are handled correctly."""
# Test with a single condition first (our current parser handles single conditions well)
valid_input = "AND devName = 'Device1'"
condition, params = builder.get_safe_condition_legacy(valid_input)
# Should create parameterized query
assert "devName = :" in condition
assert len(params) == 1
assert 'Device1' in list(params.values())
def test_disallowed_column_name(builder):
"""Test that non-whitelisted column names are rejected."""
invalid_input = "AND malicious_column = 'value'"
condition, params = builder.get_safe_condition_legacy(invalid_input)
# Should return empty condition when column not in whitelist
assert condition == ""
assert params == {}
def test_disallowed_operator(builder):
"""Test that non-whitelisted operators are rejected."""
invalid_input = "AND devName SOUNDS LIKE 'test'"
condition, params = builder.get_safe_condition_legacy(invalid_input)
# Should return empty condition when operator not allowed
assert condition == ""
assert params == {}
def test_nested_select_attempt(builder):
"""Test that nested SELECT attempts are blocked."""
malicious_input = "AND devName IN (SELECT password FROM users)"
condition, params = builder.get_safe_condition_legacy(malicious_input)
# Should return empty condition when nested SELECT detected
assert condition == ""
assert params == {}
def test_hex_encoding_attempt(builder):
"""Test that hex-encoded injection attempts are blocked."""
malicious_input = "AND 0x44524f50205441424c45"
condition, params = builder.get_safe_condition_legacy(malicious_input)
# Should return empty condition when hex encoding detected
assert condition == ""
assert params == {}
def test_comment_injection_attempt(builder):
"""Test that comment injection attempts are handled."""
malicious_input = "AND devName = 'test' /* comment */ --"
condition, params = builder.get_safe_condition_legacy(malicious_input)
# Comments should be stripped and condition validated
if condition:
assert "/*" not in condition
assert "--" not in condition
def test_special_placeholder_replacement(builder):
"""Test that {s-quote} placeholder is safely replaced."""
input_with_placeholder = "AND devName = {s-quote}Test{s-quote}"
condition, params = builder.get_safe_condition_legacy(input_with_placeholder)
# Should handle placeholder safely
if condition:
assert "{s-quote}" not in condition
assert "devName = :" in condition
def test_null_byte_injection(builder):
"""Test that null byte injection attempts are blocked."""
malicious_input = "AND devName = 'test\x00' DROP TABLE --"
condition, params = builder.get_safe_condition_legacy(malicious_input)
# Null bytes should be sanitized
if condition:
assert "\x00" not in condition
for value in params.values():
assert "\x00" not in str(value)
def test_build_condition_with_allowed_values(builder):
"""Test building condition with specific allowed values."""
conditions = [
{"column": "eve_EventType", "operator": "=", "value": "Connected"},
{"column": "devName", "operator": "LIKE", "value": "%test%"}
]
condition, params = builder.build_condition(conditions, "AND")
# Should create valid parameterized condition
assert "eve_EventType = :" in condition
assert "devName LIKE :" in condition
assert len(params) == 2
def test_build_condition_with_invalid_column(builder):
"""Test that invalid columns in build_condition are rejected."""
conditions = [
{"column": "invalid_column", "operator": "=", "value": "test"}
]
condition, params = builder.build_condition(conditions)
# Should return empty when invalid column
assert condition == ""
assert params == {}
def test_case_variations_injection(builder):
"""Test that case variation injection attempts are blocked."""
malicious_inputs = [
"AnD 1=1",
"oR 1=1",
"UnIoN SeLeCt * FrOm users"
]
for malicious_input in malicious_inputs:
condition, params = builder.get_safe_condition_legacy(malicious_input)
# Should handle case variations safely
if "union" in condition.lower() or "select" in condition.lower():
if "union" in condition.lower() or "select" in condition.lower():
pytest.fail(f"Injection not blocked: {malicious_input}")
def test_time_based_injection_attempt(builder):
"""Test that time-based injection attempts are blocked."""
malicious_input = "AND IF(1=1, SLEEP(5), 0)"
condition, params = builder.get_safe_condition_legacy(malicious_input)
# Should return empty condition when SQL functions detected
assert condition == ""
assert params == {}
def test_stacked_queries_attempt(builder):
"""Test that stacked query attempts are blocked."""
malicious_input = "'; INSERT INTO admin VALUES ('hacker', 'password'); --"
condition, params = builder.get_safe_condition_legacy(malicious_input)
# Should return empty condition when semicolon detected
assert condition == ""
assert params == {}