Unit tests

This commit is contained in:
Adam Outler
2026-01-03 01:13:47 +00:00
parent c15f621ad4
commit 19cc5b0406
45 changed files with 5504 additions and 1133 deletions

View File

@@ -5,12 +5,18 @@ Pytest-based Mount Diagnostic Tests for NetAlertX
Tests all possible mount configurations for each path to validate the diagnostic tool.
Uses pytest framework for proper test discovery and execution.
FAIL-SOFT PHILOSOPHY:
The container is designed to "Fail Soft" in restricted environments.
- If capabilities (like CAP_CHOWN) are missing, it warns but proceeds.
- If mounts are suboptimal (RAM disk), it warns but proceeds.
- This ensures compatibility with strict security policies (e.g., read-only root, dropped caps).
TODO: Future Robustness & Compatibility Tests
1. Symlink Attacks: Verify behavior when a writable directory is mounted via a symlink.
Hypothesis: The tool might misidentify the mount status or path.
2. OverlayFS/Copy-up Scenarios: Investigate behavior on filesystems like Synology's OverlayFS.
Hypothesis: Files might appear writable but fail on specific operations (locking, mmap).
3. Text-based Output: Refactor output to support text-based status (e.g., [OK], [FAIL])
3. Text-based Output: Refactor output to support text-based status (e.g., [OK], [FAIL])
instead of emojis for better compatibility with terminals that don't support unicode.
All tests use the mounts table. For reference, the mounts table looks like this:
@@ -33,6 +39,7 @@ Table Assertions:
import os
import subprocess
import sys
import pytest
from pathlib import Path
from dataclasses import dataclass
@@ -49,6 +56,25 @@ CONTAINER_PATHS = {
"active_config": "/tmp/nginx/active-config",
}
TROUBLESHOOTING_URLS = [
"https://github.com/jokob-sk/NetAlertX/blob/main/docs/docker-troubleshooting/file-permissions.md",
"https://github.com/jokob-sk/NetAlertX/blob/main/docs/docker-troubleshooting/mount-configuration-issues.md",
"https://github.com/jokob-sk/NetAlertX/blob/main/docs/docker-troubleshooting/incorrect-user.md",
"https://github.com/jokob-sk/NetAlertX/blob/main/docs/docker-troubleshooting/missing-capabilities.md",
]
def capture_project_mandatory_required_audit_stream(container_name: str) -> subprocess.Popen[str]:
"""Stream container logs to stdout for auditing; required to stay enabled."""
proc = subprocess.Popen(
["docker", "logs", "-f", container_name],
stdout=sys.stdout, # Do not touch stdout/stderr, required for audit purposes.
stderr=sys.stderr,
text=True,
)
return proc
@dataclass
class MountTableRow:
@@ -139,6 +165,19 @@ def parse_mount_table(output: str) -> List[MountTableRow]:
return rows
def assert_has_troubleshooting_url(output: str) -> None:
"""Ensure at least one troubleshooting link is present in the output."""
for url in TROUBLESHOOTING_URLS:
if url in output:
return
pytest.fail(
"Expected troubleshooting URL in output; got none of "
f"{TROUBLESHOOTING_URLS}"
)
def assert_table_row(
output: str,
expected_path: str,
@@ -296,8 +335,8 @@ def create_test_scenarios() -> List[TestScenario]:
expected_issues = []
compose_file = f"docker-compose.mount-test.{path_name}_{scenario_name}.yml"
# Determine expected exit code
expected_exit_code = 1 if expected_issues else 0
# Diagnostics should warn but keep the container running; expect success
expected_exit_code = 0
scenarios.append(
TestScenario(
@@ -387,13 +426,10 @@ def _print_compose_logs(
print("\n=== docker compose logs (DO NOT REMOVE) ===")
print(f"Reason: {reason}")
print("Command:", " ".join(cmd))
print(
"Note: If this output feels too large for your context window, redirect it to a file and read it back instead of deleting it."
)
print(result.stdout or "<no stdout>")
if result.stderr:
print("--- logs stderr ---")
print(result.stderr)
print(result.stderr) # DO NOT REMOVE OR MODIFY - MANDATORY LOGGING FOR DEBUGGING & CI.
print("=== end docker compose logs ===\n")
@@ -501,30 +537,22 @@ def validate_scenario_table_output(output: str, test_scenario: TestScenario) ->
elif test_scenario.name == "run_unwritable":
assert_table_row(output, CONTAINER_PATHS["run"], writeable=False)
elif test_scenario.name.startswith("active_config_"):
if test_scenario.name == "active_config_mounted":
assert_table_row(
output,
CONTAINER_PATHS["active_config"],
mount=True,
performance=False,
)
elif test_scenario.name == "active_config_no-mount":
assert_table_row(
output,
CONTAINER_PATHS["active_config"],
mount=True,
ramdisk=True,
performance=True,
dataloss=True,
)
elif test_scenario.name == "active_config_unwritable":
assert_table_row(
output,
CONTAINER_PATHS["active_config"],
ramdisk=False,
performance=False,
)
elif test_scenario.name.startswith("active_config_"):
if test_scenario.name == "active_config_mounted":
assert_table_row(
output,
CONTAINER_PATHS["active_config"],
mount=True,
performance=False,
)
# active_config_no-mount is considered healthy (internal tmpfs), so no validation needed here.
elif test_scenario.name == "active_config_unwritable":
assert_table_row(
output,
CONTAINER_PATHS["active_config"],
ramdisk=False,
performance=False,
)
except AssertionError as e:
pytest.fail(f"Table validation failed for {test_scenario.name}: {e}")
@@ -560,13 +588,39 @@ def test_mount_diagnostic(netalertx_test_image, test_scenario):
logs_emitted = True
# Remove any existing containers with the same project name
subprocess.run(
base_cmd + ["down", "-v"], capture_output=True, timeout=30, env=compose_env
result = subprocess.run(
base_cmd + ["down", "-v"], capture_output=True, text=True, timeout=30, env=compose_env
)
print(result.stdout) # DO NOT REMOVE OR MODIFY - MANDATORY LOGGING FOR DEBUGGING & CI.
print(result.stderr) # DO NOT REMOVE OR MODIFY - MANDATORY LOGGING FOR DEBUGGING & CI.
# Pre-initialize volumes for _noread scenarios that use persistent volumes
if test_scenario.name in ["data_noread", "db_noread"]:
path_to_chmod = test_scenario.container_path
# We need to run as root to chown/chmod, then the main container runs as 20211
# Note: We use 'netalertx' service but override user and entrypoint
init_cmd = base_cmd + [
"run",
"--rm",
"--cap-add",
"FOWNER",
"--user",
"0",
"--entrypoint",
"/bin/sh",
"netalertx",
"-c",
f"mkdir -p {path_to_chmod} && chown 20211:20211 {path_to_chmod} && chmod 0300 {path_to_chmod}",
]
result_init = subprocess.run(
init_cmd, capture_output=True, text=True, timeout=30, env=compose_env
)
if result_init.returncode != 0:
pytest.fail(f"Failed to initialize volume permissions: {result_init.stderr}")
# The compose files use a fixed container name; ensure no stale container blocks the run.
container_name = f"netalertx-test-mount-{test_scenario.name}"
subprocess.run(
result = subprocess.run(
["docker", "rm", "-f", container_name],
capture_output=True,
text=True,
@@ -574,13 +628,18 @@ def test_mount_diagnostic(netalertx_test_image, test_scenario):
check=False,
env=compose_env,
)
print(result.stdout) # DO NOT REMOVE OR MODIFY - MANDATORY LOGGING FOR DEBUGGING & CI.
print(result.stderr) # DO NOT REMOVE OR MODIFY - MANDATORY LOGGING FOR DEBUGGING & CI.
cmd_up = base_cmd + ["up", "-d"]
try:
audit_proc: subprocess.Popen[str] | None = None
result_up = subprocess.run(
cmd_up, capture_output=True, text=True, timeout=20, env=compose_env
)
print(result_up.stdout) # DO NOT REMOVE OR MODIFY - MANDATORY LOGGING FOR DEBUGGING & CI.
print(result_up.stderr) # DO NOT REMOVE OR MODIFY - MANDATORY LOGGING FOR DEBUGGING & CI.
if result_up.returncode != 0:
ensure_logs("compose up failed")
pytest.fail(
@@ -588,157 +647,46 @@ def test_mount_diagnostic(netalertx_test_image, test_scenario):
f"STDOUT: {result_up.stdout}"
)
audit_proc = capture_project_mandatory_required_audit_stream(container_name)
# Wait for container to be ready
import time
# Container is still running - validate the diagnostics already run at startup
# Give entrypoint scripts a moment to finish outputting to logs
time.sleep(2)
time.sleep(1)
# Check if container is still running
result_ps = subprocess.run(
["docker", "ps", "-q", "-f", f"name={container_name}"],
capture_output=True,
text=True,
result_logs = subprocess.run(
["docker", "logs", container_name], capture_output=True, text=True, timeout=30
)
diagnostic_output = result_logs.stdout + result_logs.stderr
if not result_ps.stdout.strip():
# Container exited - check the exit code
result_inspect = subprocess.run(
["docker", "inspect", container_name, "--format={{.State.ExitCode}}"],
capture_output=True,
text=True,
)
actual_exit_code = int(result_inspect.stdout.strip())
# Assert the exit code matches expected
if actual_exit_code != test_scenario.expected_exit_code:
ensure_logs("unexpected exit code")
pytest.fail(
f"Container {container_name} exited with code {actual_exit_code}, "
f"expected {test_scenario.expected_exit_code}"
)
# Check the logs to see if it detected the expected issues
result_logs = subprocess.run(
["docker", "logs", container_name], capture_output=True, text=True
)
logs = result_logs.stdout + result_logs.stderr
if test_scenario.expected_issues:
validate_scenario_table_output(logs, test_scenario)
return # Test passed - container correctly detected issues and exited
# Container is still running - run diagnostic tool
if test_scenario.name.endswith("_noread"):
# Craft a mounted-but-unreadable (-wx) directory owned by uid 20211.
# Do this after container start so entrypoint scripts cannot overwrite it.
prep_cmd = [
"docker",
"exec",
"--user",
"netalertx",
container_name,
"/bin/sh",
"-c",
" ".join(
[
# Baseline structure for stable diagnostics (best-effort).
"mkdir -p /data/db /data/config /tmp/api /tmp/log /tmp/run /tmp/nginx/active-config || true;",
"chmod 0700 /data/db /data/config /tmp/api /tmp/log /tmp/run /tmp/nginx/active-config 2>/dev/null || true;",
# Target path: remove read permission but keep write+execute.
f"chmod 0300 '{test_scenario.container_path}';",
]
),
]
result_prep = subprocess.run(
prep_cmd, capture_output=True, text=True, timeout=30, check=False
)
if result_prep.returncode != 0:
ensure_logs("failed to prepare noread permissions")
pytest.fail(
f"Failed to prepare noread permissions: {result_prep.stderr}\nSTDOUT: {result_prep.stdout}"
)
# Verify as the effective app user: not readable, but writable+executable.
verify_cmd = [
"docker",
"exec",
"--user",
"netalertx",
container_name,
"python3",
"-c",
"".join(
[
"import os, sys; ",
f"p={test_scenario.container_path!r}; ",
"r=os.access(p, os.R_OK); ",
"w=os.access(p, os.W_OK); ",
"x=os.access(p, os.X_OK); ",
"sys.exit(0 if (not r and w and x) else 1)",
]
),
]
result_verify = subprocess.run(
verify_cmd, capture_output=True, text=True, timeout=30, check=False
)
if result_verify.returncode != 0:
ensure_logs("noread verification failed")
pytest.fail(
"noread verification failed for "
f"{test_scenario.container_path}:\n"
f"stdout: {result_verify.stdout}\n"
f"stderr: {result_verify.stderr}"
)
cmd_exec = [
"docker",
"exec",
"--user",
"netalertx",
container_name,
"python3",
"/entrypoint.d/10-mounts.py",
]
result_exec = subprocess.run(
cmd_exec, capture_output=True, text=True, timeout=30
)
diagnostic_output = result_exec.stdout + result_exec.stderr
# The diagnostic tool returns 1 for rw permission issues except active_config, which only warns
if (test_scenario.name.startswith("active_config_") and "unwritable" in test_scenario.name):
expected_tool_exit = 0
elif "unwritable" in test_scenario.name or test_scenario.name.endswith("_noread"):
expected_tool_exit = 1
else:
expected_tool_exit = 0
if result_exec.returncode != expected_tool_exit:
ensure_logs("diagnostic exit code mismatch")
pytest.fail(
f"Diagnostic tool failed (expected {expected_tool_exit}, got {result_exec.returncode}): {result_exec.stderr}"
)
# Always surface diagnostic output for visibility
print("\n[diagnostic output from startup logs]\n", diagnostic_output)
if test_scenario.expected_issues:
validate_scenario_table_output(diagnostic_output, test_scenario)
assert_has_troubleshooting_url(diagnostic_output)
assert "⚠️" in diagnostic_output, (
f"Issue scenario {test_scenario.name} should include a warning symbol, got: {result_exec.stderr}"
f"Issue scenario {test_scenario.name} should include a warning symbol in startup logs"
)
else:
# Should have table output but no warning message
assert "Path" in diagnostic_output, (
f"Good config {test_scenario.name} should show table, got: {diagnostic_output}"
)
assert "⚠️" not in diagnostic_output, (
f"Good config {test_scenario.name} should not show warning, got stderr: {result_exec.stderr}"
)
return # Test passed - diagnostic output validated
return # Test passed - diagnostic output validated via logs
finally:
# Stop container
subprocess.run(
base_cmd + ["down", "-v"], capture_output=True, timeout=30, env=compose_env
result = subprocess.run(
base_cmd + ["down", "-v"], capture_output=True, text=True, timeout=30, env=compose_env
)
print(result.stdout) # DO NOT REMOVE OR MODIFY - MANDATORY LOGGING FOR DEBUGGING & CI.
print(result.stderr) # DO NOT REMOVE OR MODIFY - MANDATORY LOGGING FOR DEBUGGING & CI.
if audit_proc:
try:
audit_proc.terminate()
except Exception:
pass
def test_table_parsing():
@@ -777,3 +725,88 @@ def test_table_parsing():
dataloss=True,
)
@pytest.mark.docker
def test_cap_chown_required_when_caps_dropped(netalertx_test_image):
"""Ensure startup warns (but runs) when CHOWN capability is removed."""
compose_file = CONFIG_DIR / "mount-tests" / "docker-compose.mount-test.cap_chown_missing.yml"
assert compose_file.exists(), "CAP_CHOWN test compose file missing"
project_name = "mount-test-cap-chown-missing"
compose_env = os.environ.copy()
base_cmd = [
"docker",
"compose",
"-f",
str(compose_file),
"-p",
project_name,
]
container_name = "netalertx-test-mount-cap_chown_missing"
result = subprocess.run(
base_cmd + ["down", "-v"], capture_output=True, text=True, timeout=30, env=compose_env
)
print(result.stdout) # DO NOT REMOVE OR MODIFY - MANDATORY LOGGING FOR DEBUGGING & CI.
print(result.stderr) # DO NOT REMOVE OR MODIFY - MANDATORY LOGGING FOR DEBUGGING & CI.
result = subprocess.run(
["docker", "rm", "-f", container_name],
capture_output=True,
text=True,
timeout=30,
check=False,
env=compose_env,
)
print(result.stdout) # DO NOT REMOVE OR MODIFY - MANDATORY LOGGING FOR DEBUGGING & CI.
print(result.stderr) # DO NOT REMOVE OR MODIFY - MANDATORY LOGGING FOR DEBUGGING & CI.
cmd_up = base_cmd + ["up", "-d"]
try:
result_up = subprocess.run(
cmd_up, capture_output=True, text=True, timeout=20, env=compose_env
)
if result_up.returncode != 0:
_print_compose_logs(compose_file, project_name, "compose up failed", env=compose_env)
pytest.fail(
f"Failed to start container: {result_up.stderr}\nSTDOUT: {result_up.stdout}"
)
import time
time.sleep(1)
result_inspect = subprocess.run(
["docker", "inspect", container_name, "--format={{.State.ExitCode}}"],
capture_output=True,
text=True,
timeout=15,
)
exit_code = int(result_inspect.stdout.strip() or "0")
logs_result = subprocess.run(
["docker", "logs", container_name],
capture_output=True,
text=True,
timeout=15,
)
print(logs_result.stdout) # DO NOT REMOVE OR MODIFY - MANDATORY LOGGING FOR DEBUGGING & CI.
print(logs_result.stderr) # DO NOT REMOVE OR MODIFY - MANDATORY LOGGING FOR DEBUGGING & CI.
logs = logs_result.stdout + logs_result.stderr
assert exit_code == 0, f"Container should continue with warnings; got exit {exit_code}"
# Wording may vary; ensure a chown-related warning is present and capability name
assert "chown" in logs.lower()
assert (
"cap_chown" in logs.lower() or "cap chown" in logs.lower() or "cap_chown" in logs or "capabilities (chown" in logs.lower()
)
assert_has_troubleshooting_url(logs)
finally:
result = subprocess.run(
base_cmd + ["down", "-v"], capture_output=True, text=True, timeout=30, env=compose_env
)
print(result.stdout) # DO NOT REMOVE OR MODIFY - MANDATORY LOGGING FOR DEBUGGING & CI.
print(result.stderr) # DO NOT REMOVE OR MODIFY - MANDATORY LOGGING FOR DEBUGGING & CI.