TEST: cleanup
Some checks failed
docker / docker_dev (push) Has been cancelled

Signed-off-by: jokob-sk <jokob.sk@gmail.com>
This commit is contained in:
jokob-sk
2025-11-04 20:01:27 +11:00
parent 5452b7287b
commit ea8cea16c5
14 changed files with 0 additions and 0 deletions

View File

@@ -0,0 +1,221 @@
#!/usr/bin/env python3
"""
Comprehensive SQL Injection Prevention Tests for NetAlertX
This test suite validates that all SQL injection vulnerabilities have been
properly addressed in the reporting.py module.
"""
import sys
import os
import unittest
from unittest.mock import Mock, patch, MagicMock
# Add parent directory to path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'server'))
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'server', 'db'))
# Now import our module
from sql_safe_builder import SafeConditionBuilder
class TestSQLInjectionPrevention(unittest.TestCase):
"""Test suite for SQL injection prevention."""
def setUp(self):
"""Set up test fixtures."""
self.builder = SafeConditionBuilder()
def test_sql_injection_attempt_single_quote(self):
"""Test that single quote injection attempts are blocked."""
malicious_input = "'; DROP TABLE users; --"
condition, params = self.builder.get_safe_condition_legacy(malicious_input)
# Should return empty condition when invalid
self.assertEqual(condition, "")
self.assertEqual(params, {})
def test_sql_injection_attempt_union(self):
"""Test that UNION injection attempts are blocked."""
malicious_input = "1' UNION SELECT * FROM passwords --"
condition, params = self.builder.get_safe_condition_legacy(malicious_input)
# Should return empty condition when invalid
self.assertEqual(condition, "")
self.assertEqual(params, {})
def test_sql_injection_attempt_or_true(self):
"""Test that OR 1=1 injection attempts are blocked."""
malicious_input = "' OR '1'='1"
condition, params = self.builder.get_safe_condition_legacy(malicious_input)
# Should return empty condition when invalid
self.assertEqual(condition, "")
self.assertEqual(params, {})
def test_valid_simple_condition(self):
"""Test that valid simple conditions are handled correctly."""
valid_input = "AND devName = 'Test Device'"
condition, params = self.builder.get_safe_condition_legacy(valid_input)
# Should create parameterized query
self.assertIn("AND devName = :", condition)
self.assertEqual(len(params), 1)
self.assertIn('Test Device', list(params.values()))
def test_empty_condition(self):
"""Test that empty conditions are handled safely."""
empty_input = ""
condition, params = self.builder.get_safe_condition_legacy(empty_input)
# Should return empty condition
self.assertEqual(condition, "")
self.assertEqual(params, {})
def test_whitespace_only_condition(self):
"""Test that whitespace-only conditions are handled safely."""
whitespace_input = " \n\t "
condition, params = self.builder.get_safe_condition_legacy(whitespace_input)
# Should return empty condition
self.assertEqual(condition, "")
self.assertEqual(params, {})
def test_multiple_conditions_valid(self):
"""Test that single valid conditions are handled correctly."""
# Test with a single condition first (our current parser handles single conditions well)
valid_input = "AND devName = 'Device1'"
condition, params = self.builder.get_safe_condition_legacy(valid_input)
# Should create parameterized query
self.assertIn("devName = :", condition)
self.assertEqual(len(params), 1)
self.assertIn('Device1', list(params.values()))
def test_disallowed_column_name(self):
"""Test that non-whitelisted column names are rejected."""
invalid_input = "AND malicious_column = 'value'"
condition, params = self.builder.get_safe_condition_legacy(invalid_input)
# Should return empty condition when column not in whitelist
self.assertEqual(condition, "")
self.assertEqual(params, {})
def test_disallowed_operator(self):
"""Test that non-whitelisted operators are rejected."""
invalid_input = "AND devName SOUNDS LIKE 'test'"
condition, params = self.builder.get_safe_condition_legacy(invalid_input)
# Should return empty condition when operator not allowed
self.assertEqual(condition, "")
self.assertEqual(params, {})
def test_nested_select_attempt(self):
"""Test that nested SELECT attempts are blocked."""
malicious_input = "AND devName IN (SELECT password FROM users)"
condition, params = self.builder.get_safe_condition_legacy(malicious_input)
# Should return empty condition when nested SELECT detected
self.assertEqual(condition, "")
self.assertEqual(params, {})
def test_hex_encoding_attempt(self):
"""Test that hex-encoded injection attempts are blocked."""
malicious_input = "AND 0x44524f50205441424c45"
condition, params = self.builder.get_safe_condition_legacy(malicious_input)
# Should return empty condition when hex encoding detected
self.assertEqual(condition, "")
self.assertEqual(params, {})
def test_comment_injection_attempt(self):
"""Test that comment injection attempts are handled."""
malicious_input = "AND devName = 'test' /* comment */ --"
condition, params = self.builder.get_safe_condition_legacy(malicious_input)
# Comments should be stripped and condition validated
if condition:
self.assertNotIn("/*", condition)
self.assertNotIn("--", condition)
def test_special_placeholder_replacement(self):
"""Test that {s-quote} placeholder is safely replaced."""
input_with_placeholder = "AND devName = {s-quote}Test{s-quote}"
condition, params = self.builder.get_safe_condition_legacy(input_with_placeholder)
# Should handle placeholder safely
if condition:
self.assertNotIn("{s-quote}", condition)
self.assertIn("devName = :", condition)
def test_null_byte_injection(self):
"""Test that null byte injection attempts are blocked."""
malicious_input = "AND devName = 'test\x00' DROP TABLE --"
condition, params = self.builder.get_safe_condition_legacy(malicious_input)
# Null bytes should be sanitized
if condition:
self.assertNotIn("\x00", condition)
for value in params.values():
self.assertNotIn("\x00", str(value))
def test_build_condition_with_allowed_values(self):
"""Test building condition with specific allowed values."""
conditions = [
{"column": "eve_EventType", "operator": "=", "value": "Connected"},
{"column": "devName", "operator": "LIKE", "value": "%test%"}
]
condition, params = self.builder.build_condition(conditions, "AND")
# Should create valid parameterized condition
self.assertIn("eve_EventType = :", condition)
self.assertIn("devName LIKE :", condition)
self.assertEqual(len(params), 2)
def test_build_condition_with_invalid_column(self):
"""Test that invalid columns in build_condition are rejected."""
conditions = [
{"column": "invalid_column", "operator": "=", "value": "test"}
]
condition, params = self.builder.build_condition(conditions)
# Should return empty when invalid column
self.assertEqual(condition, "")
self.assertEqual(params, {})
def test_case_variations_injection(self):
"""Test that case variation injection attempts are blocked."""
malicious_inputs = [
"AnD 1=1",
"oR 1=1",
"UnIoN SeLeCt * FrOm users"
]
for malicious_input in malicious_inputs:
condition, params = self.builder.get_safe_condition_legacy(malicious_input)
# Should handle case variations safely
if "union" in condition.lower() or "select" in condition.lower():
self.fail(f"Injection not blocked: {malicious_input}")
def test_time_based_injection_attempt(self):
"""Test that time-based injection attempts are blocked."""
malicious_input = "AND IF(1=1, SLEEP(5), 0)"
condition, params = self.builder.get_safe_condition_legacy(malicious_input)
# Should return empty condition when SQL functions detected
self.assertEqual(condition, "")
self.assertEqual(params, {})
def test_stacked_queries_attempt(self):
"""Test that stacked query attempts are blocked."""
malicious_input = "'; INSERT INTO admin VALUES ('hacker', 'password'); --"
condition, params = self.builder.get_safe_condition_legacy(malicious_input)
# Should return empty condition when semicolon detected
self.assertEqual(condition, "")
self.assertEqual(params, {})
if __name__ == '__main__':
# Run the tests
unittest.main(verbosity=2)