Merge pull request #1364 from adamoutler/improve-mount-built-in-test

Improving mount diagnostics
This commit is contained in:
Jokob @NetAlertX
2025-12-22 18:57:58 +00:00
committed by GitHub
9 changed files with 1925 additions and 359 deletions

View File

@@ -31,6 +31,7 @@ class MountCheckResult:
var_name: str
path: str = ""
is_writeable: bool = False
is_readable: bool = False
is_mounted: bool = False
is_mount_point: bool = False
is_ramdisk: bool = False
@@ -38,6 +39,7 @@ class MountCheckResult:
fstype: str = "N/A"
error: bool = False
write_error: bool = False
read_error: bool = False
performance_issue: bool = False
dataloss_risk: bool = False
category: str = ""
@@ -97,7 +99,7 @@ def _resolve_writeable_state(target_path: str) -> bool:
if os.path.exists(current):
if not os.access(current, os.W_OK):
return False
# OverlayFS/Copy-up check: Try to actually write a file to verify
if os.path.isdir(current):
test_file = os.path.join(current, f".netalertx_write_test_{os.getpid()}")
@@ -108,7 +110,7 @@ def _resolve_writeable_state(target_path: str) -> bool:
return True
except OSError:
return False
return True
parent_dir = os.path.dirname(current)
@@ -119,6 +121,27 @@ def _resolve_writeable_state(target_path: str) -> bool:
return False
def _resolve_readable_state(target_path: str) -> bool:
"""Determine if a path is readable, ascending to the first existing parent."""
current = target_path
seen: set[str] = set()
while True:
if current in seen:
break
seen.add(current)
if os.path.exists(current):
return os.access(current, os.R_OK)
parent_dir = os.path.dirname(current)
if not parent_dir or parent_dir == current:
break
current = parent_dir
return False
def analyze_path(
spec: PathSpec,
mounted_filesystems,
@@ -142,14 +165,20 @@ def analyze_path(
result.path = target_path
# --- 1. Check Write Permissions ---
# --- 1. Check Read/Write Permissions ---
result.is_writeable = _resolve_writeable_state(target_path)
result.is_readable = _resolve_readable_state(target_path)
if not result.is_writeable:
result.error = True
if spec.role != "secondary":
result.write_error = True
if not result.is_readable:
result.error = True
if spec.role != "secondary":
result.read_error = True
# --- 2. Check Filesystem Type (Parent and Self) ---
parent_mount_fstype = ""
longest_mount = ""
@@ -184,6 +213,8 @@ def analyze_path(
result.is_ramdisk = parent_mount_fstype in non_persistent_fstypes
# --- 4. Apply Risk Logic ---
# Keep risk flags about persistence/performance properties of the mount itself.
# Read/write permission problems are surfaced via the R/W columns and error flags.
if spec.category == "persist":
if result.underlying_fs_is_ramdisk or result.is_ramdisk:
result.dataloss_risk = True
@@ -198,17 +229,32 @@ def analyze_path(
return result
def print_warning_message():
def print_warning_message(results: list[MountCheckResult]):
"""Prints a formatted warning to stderr."""
YELLOW = "\033[1;33m"
RESET = "\033[0m"
print(f"{YELLOW}══════════════════════════════════════════════════════════════════════════════", file=sys.stderr)
print("⚠️ ATTENTION: Configuration issues detected (marked with ❌).\n", file=sys.stderr)
for r in results:
issues = []
if not r.is_writeable:
issues.append("error writing")
if not r.is_readable:
issues.append("error reading")
if not r.is_mounted and (r.category == "persist" or r.category == "ramdisk"):
issues.append("not mounted")
if r.dataloss_risk:
issues.append("risk of dataloss")
if r.performance_issue:
issues.append("performance issue")
if issues:
print(f" * {r.path} {', '.join(issues)}", file=sys.stderr)
message = (
"══════════════════════════════════════════════════════════════════════════════\n"
"⚠️ ATTENTION: Configuration issues detected (marked with ❌).\n\n"
" Your configuration has write permission, dataloss, or performance issues\n"
" as shown in the table above.\n\n"
" We recommend starting with the default docker-compose.yml as the\n"
"\n We recommend starting with the default docker-compose.yml as the\n"
" configuration can be quite complex.\n\n"
" Review the documentation for a correct setup:\n"
" https://github.com/jokob-sk/NetAlertX/blob/main/docs/DOCKER_COMPOSE.md\n"
@@ -216,7 +262,7 @@ def print_warning_message():
"══════════════════════════════════════════════════════════════════════════════\n"
)
print(f"{YELLOW}{message}{RESET}", file=sys.stderr)
print(f"{message}{RESET}", file=sys.stderr)
def _get_active_specs() -> list[PathSpec]:
@@ -231,14 +277,14 @@ def _sub_result_is_healthy(result: MountCheckResult) -> bool:
if result.category == "persist":
if not result.is_mounted:
return False
if result.dataloss_risk or result.write_error or result.error:
if result.dataloss_risk or result.write_error or result.read_error or result.error:
return False
return True
if result.category == "ramdisk":
if not result.is_mounted or not result.is_ramdisk:
return False
if result.performance_issue or result.write_error or result.error:
if result.performance_issue or result.write_error or result.read_error or result.error:
return False
return True
@@ -278,19 +324,9 @@ def _apply_primary_rules(specs: list[PathSpec], results_map: dict[str, MountChec
)
all_core_subs_are_mounts = bool(core_sub_results) and len(core_mount_points) == len(core_sub_results)
if all_core_subs_healthy:
if result.write_error:
result.write_error = False
if not result.is_writeable:
result.is_writeable = True
if spec.category == "persist" and result.dataloss_risk:
result.dataloss_risk = False
if result.error and not (result.performance_issue or result.dataloss_risk or result.write_error):
result.error = False
suppress_primary = False
if all_core_subs_healthy and all_core_subs_are_mounts:
if not result.is_mount_point and not result.error and not result.write_error:
if not result.is_mount_point and not result.error and not result.write_error and not result.read_error:
suppress_primary = True
if suppress_primary:
@@ -329,14 +365,14 @@ def main():
results = _apply_primary_rules(active_specs, results_map)
has_issues = any(
r.dataloss_risk or r.error or r.write_error or r.performance_issue
r.dataloss_risk or r.error or r.write_error or r.read_error or r.performance_issue
for r in results
)
has_write_errors = any(r.write_error for r in results)
has_rw_errors = any(r.write_error or r.read_error for r in results)
if has_issues or True: # Always print table for diagnostic purposes
# --- Print Table ---
headers = ["Path", "Writeable", "Mount", "RAMDisk", "Performance", "DataLoss"]
headers = ["Path", "R", "W", "Mount", "RAMDisk", "Performance", "DataLoss"]
CHECK_SYMBOL = ""
CROSS_SYMBOL = ""
@@ -355,7 +391,8 @@ def main():
f" {{:^{col_widths[2]}}} |"
f" {{:^{col_widths[3]}}} |"
f" {{:^{col_widths[4]}}} |"
f" {{:^{col_widths[5]}}} "
f" {{:^{col_widths[5]}}} |"
f" {{:^{col_widths[6]}}} "
)
row_fmt = (
@@ -364,7 +401,8 @@ def main():
f" {{:^{col_widths[2]}}}|" # No space
f" {{:^{col_widths[3]}}}|" # No space
f" {{:^{col_widths[4]}}}|" # No space
f" {{:^{col_widths[5]}}} " # DataLoss is last, needs space
f" {{:^{col_widths[5]}}}|" # No space
f" {{:^{col_widths[6]}}} " # DataLoss is last, needs space
)
separator = "".join([
@@ -378,13 +416,16 @@ def main():
"+",
"-" * (col_widths[4] + 2),
"+",
"-" * (col_widths[5] + 2)
"-" * (col_widths[5] + 2),
"+",
"-" * (col_widths[6] + 2)
])
print(header_fmt.format(*headers))
print(separator)
print(header_fmt.format(*headers), file=sys.stderr)
print(separator, file=sys.stderr)
for r in results:
# Symbol Logic
read_symbol = bool_to_check(r.is_readable)
write_symbol = bool_to_check(r.is_writeable)
mount_symbol = CHECK_SYMBOL if r.is_mounted else CROSS_SYMBOL
@@ -407,21 +448,23 @@ def main():
print(
row_fmt.format(
r.path,
read_symbol,
write_symbol,
mount_symbol,
ramdisk_symbol,
perf_symbol,
dataloss_symbol,
)
),
file=sys.stderr
)
# --- Print Warning ---
if has_issues:
print("\n", file=sys.stderr)
print_warning_message()
print_warning_message(results)
# Exit with error only if there are write permission issues
if has_write_errors and os.environ.get("NETALERTX_DEBUG") != "1":
# Exit with error only if there are read/write permission issues
if has_rw_errors and os.environ.get("NETALERTX_DEBUG") != "1":
sys.exit(1)

View File

@@ -0,0 +1,39 @@
# Expected outcome: Mounts table shows /tmp/api is mounted and writable but NOT readable (R=❌, W=✅)
# Note: This is a diagnostic-only container (entrypoint sleeps); the test chmods/chowns /tmp/api to mode 0300.
services:
netalertx:
network_mode: host
build:
context: ../../../
dockerfile: Dockerfile
image: netalertx-test
container_name: netalertx-test-mount-api_noread
entrypoint: ["sh", "-lc", "sleep infinity"]
cap_drop:
- ALL
cap_add:
- NET_ADMIN
- NET_RAW
- NET_BIND_SERVICE
environment:
NETALERTX_DEBUG: 0
NETALERTX_DATA: /data
NETALERTX_DB: /data/db
NETALERTX_CONFIG: /data/config
SYSTEM_SERVICES_RUN_TMP: /tmp
NETALERTX_API: /tmp/api
NETALERTX_LOG: /tmp/log
SYSTEM_SERVICES_RUN: /tmp/run
SYSTEM_SERVICES_ACTIVE_CONFIG: /tmp/nginx/active-config
volumes:
- type: volume
source: test_netalertx_data
target: /data
read_only: false
tmpfs:
- "/tmp:uid=20211,gid=20211,mode=1700,rw,noexec,nosuid,nodev,async,noatime,nodiratime"
volumes:
test_netalertx_data:

View File

@@ -0,0 +1,39 @@
# Expected outcome: Mounts table shows /data is mounted and writable but NOT readable (R=❌, W=✅)
# Note: This is a diagnostic-only container (entrypoint sleeps); the test chmods/chowns /data to mode 0300.
services:
netalertx:
network_mode: host
build:
context: ../../../
dockerfile: Dockerfile
image: netalertx-test
container_name: netalertx-test-mount-data_noread
entrypoint: ["sh", "-lc", "sleep infinity"]
cap_drop:
- ALL
cap_add:
- NET_ADMIN
- NET_RAW
- NET_BIND_SERVICE
environment:
NETALERTX_DEBUG: 0
NETALERTX_DATA: /data
NETALERTX_DB: /data/db
NETALERTX_CONFIG: /data/config
SYSTEM_SERVICES_RUN_TMP: /tmp
NETALERTX_API: /tmp/api
NETALERTX_LOG: /tmp/log
SYSTEM_SERVICES_RUN: /tmp/run
SYSTEM_SERVICES_ACTIVE_CONFIG: /tmp/nginx/active-config
volumes:
- type: volume
source: test_netalertx_data
target: /data
read_only: false
tmpfs:
- "/tmp:uid=20211,gid=20211,mode=1700,rw,noexec,nosuid,nodev,async,noatime,nodiratime"
volumes:
test_netalertx_data:

View File

@@ -0,0 +1,39 @@
# Expected outcome: Mounts table shows /data/db is mounted and writable but NOT readable (R=❌, W=✅)
# Note: This is a diagnostic-only container (entrypoint sleeps); the test chmods/chowns /data/db to mode 0300.
services:
netalertx:
network_mode: host
build:
context: ../../../
dockerfile: Dockerfile
image: netalertx-test
container_name: netalertx-test-mount-db_noread
entrypoint: ["sh", "-lc", "sleep infinity"]
cap_drop:
- ALL
cap_add:
- NET_ADMIN
- NET_RAW
- NET_BIND_SERVICE
environment:
NETALERTX_DEBUG: 0
NETALERTX_DATA: /data
NETALERTX_DB: /data/db
NETALERTX_CONFIG: /data/config
SYSTEM_SERVICES_RUN_TMP: /tmp
NETALERTX_API: /tmp/api
NETALERTX_LOG: /tmp/log
SYSTEM_SERVICES_RUN: /tmp/run
SYSTEM_SERVICES_ACTIVE_CONFIG: /tmp/nginx/active-config
volumes:
- type: volume
source: test_netalertx_data
target: /data
read_only: false
tmpfs:
- "/tmp:uid=20211,gid=20211,mode=1700,rw,noexec,nosuid,nodev,async,noatime,nodiratime"
volumes:
test_netalertx_data:

View File

@@ -0,0 +1,39 @@
# Expected outcome: Mounts table shows /tmp is mounted and writable but NOT readable (R=❌, W=✅)
# Note: This is a diagnostic-only container (entrypoint sleeps); the test chmods/chowns /tmp to mode 0300.
services:
netalertx:
network_mode: host
build:
context: ../../../
dockerfile: Dockerfile
image: netalertx-test
container_name: netalertx-test-mount-tmp_noread
entrypoint: ["sh", "-lc", "sleep infinity"]
cap_drop:
- ALL
cap_add:
- NET_ADMIN
- NET_RAW
- NET_BIND_SERVICE
environment:
NETALERTX_DEBUG: 0
NETALERTX_DATA: /data
NETALERTX_DB: /data/db
NETALERTX_CONFIG: /data/config
SYSTEM_SERVICES_RUN_TMP: /tmp
NETALERTX_API: /tmp/api
NETALERTX_LOG: /tmp/log
SYSTEM_SERVICES_RUN: /tmp/run
SYSTEM_SERVICES_ACTIVE_CONFIG: /tmp/nginx/active-config
volumes:
- type: volume
source: test_netalertx_data
target: /data
read_only: false
tmpfs:
- "/tmp:uid=20211,gid=20211,mode=1700,rw,noexec,nosuid,nodev,async,noatime,nodiratime"
volumes:
test_netalertx_data:

File diff suppressed because it is too large Load Diff

View File

@@ -18,6 +18,7 @@ def build_netalertx_test_image(request: pytest.FixtureRequest) -> None:
"""Build the docker test image before running any docker-based tests."""
image = os.environ.get("NETALERTX_TEST_IMAGE", "netalertx-test")
project_root = pathlib.Path(__file__).resolve().parents[2]
cmd = [

View File

@@ -589,23 +589,36 @@ def _assert_contains(result, snippet: str, cmd: list[str] = None) -> None:
def _extract_mount_rows(output: str) -> dict[str, list[str]]:
rows: dict[str, list[str]] = {}
in_table = False
expected_cols = 0
for raw_line in (output or "").splitlines():
line = raw_line.rstrip()
if not in_table:
if line.startswith(" Path") and "Writeable" in line:
# Legacy format: Path | Writeable | Mount | RAMDisk | Performance | DataLoss
in_table = True
expected_cols = 5
elif line.startswith(" Path") and "| R" in line and "| W" in line:
# Current format: Path | R | W | Mount | RAMDisk | Performance | DataLoss
in_table = True
expected_cols = 6
continue
if not line.strip():
break
if line.lstrip().startswith("Path"):
continue
if set(line.strip()) <= {"-", "+"}:
continue
parts = [part.strip() for part in line.split("|")]
if len(parts) < 6:
if len(parts) < 1 + expected_cols:
continue
path = parts[0].strip()
rows[path] = parts[1:6]
if not path:
continue
rows[path] = parts[1 : 1 + expected_cols]
return rows
@@ -625,16 +638,49 @@ def _assert_mount_row(
f"Mount table row for {path} not found. Rows: {sorted(rows)}\nOutput:\n{result.output}"
)
columns = rows[path]
labels = ["Writeable", "Mount", "RAMDisk", "Performance", "DataLoss"]
expectations = [write, mount, ramdisk, performance, dataloss]
for idx, expected in enumerate(expectations):
# Legacy: [Writeable, Mount, RAMDisk, Performance, DataLoss]
# Current: [R, W, Mount, RAMDisk, Performance, DataLoss]
if len(columns) == 5:
label_to_value = {
"Writeable": columns[0],
"Mount": columns[1],
"RAMDisk": columns[2],
"Performance": columns[3],
"DataLoss": columns[4],
}
write_label = "Writeable"
elif len(columns) == 6:
label_to_value = {
"R": columns[0],
"W": columns[1],
"Mount": columns[2],
"RAMDisk": columns[3],
"Performance": columns[4],
"DataLoss": columns[5],
}
write_label = "W"
else:
raise AssertionError(
f"Unexpected mount table column count for {path}: {len(columns)}. Columns: {columns}\nOutput:\n{result.output}"
)
checks = [
(write_label, write),
("Mount", mount),
("RAMDisk", ramdisk),
("Performance", performance),
("DataLoss", dataloss),
]
for label, expected in checks:
if expected is None:
continue
actual = columns[idx]
actual = label_to_value.get(label)
if actual != expected:
raise AssertionError(
f"{path} {labels[idx]} expected {expected}, got {actual}.\n"
f"Rows: {rows}\nOutput:\n{result.output}"
f"{path} {label} expected {expected}, got {actual}.\n"
f"Row: {label_to_value}\nOutput:\n{result.output}"
)
@@ -958,6 +1004,7 @@ def test_mandatory_folders_creation(tmp_path: pathlib.Path) -> None:
# Ensure other directories are writable and owned by netalertx user so container gets past mounts.py
for key in [
"data",
"app_db",
"app_config",
"app_log",
@@ -1001,6 +1048,7 @@ def test_writable_config_validation(tmp_path: pathlib.Path) -> None:
# Ensure directories are writable and owned by netalertx user so container gets past mounts.py
for key in [
"data",
"app_db",
"app_config",
"app_log",

View File

@@ -55,6 +55,7 @@ class MountTableRow:
"""Represents a parsed row from the mount diagnostic table."""
path: str
readable: bool
writeable: bool
mount: bool
ramdisk: Optional[bool] # None for
@@ -103,7 +104,7 @@ def parse_mount_table(output: str) -> List[MountTableRow]:
# Split by | and clean up
parts = [part.strip() for part in line.split("|")]
if len(parts) < 6:
if len(parts) < 7:
continue
path = parts[0]
@@ -124,11 +125,12 @@ def parse_mount_table(output: str) -> List[MountTableRow]:
try:
row = MountTableRow(
path=path,
writeable=emoji_to_bool(parts[1]),
mount=emoji_to_bool(parts[2]),
ramdisk=emoji_to_bool(parts[3]),
performance=emoji_to_bool(parts[4]),
dataloss=emoji_to_bool(parts[5]),
readable=emoji_to_bool(parts[1]),
writeable=emoji_to_bool(parts[2]),
mount=emoji_to_bool(parts[3]),
ramdisk=emoji_to_bool(parts[4]),
performance=emoji_to_bool(parts[5]),
dataloss=emoji_to_bool(parts[6]),
)
rows.append(row)
except (IndexError, ValueError):
@@ -140,6 +142,7 @@ def parse_mount_table(output: str) -> List[MountTableRow]:
def assert_table_row(
output: str,
expected_path: str,
readable: Expectation = UNSET,
writeable: Expectation = UNSET,
mount: Expectation = UNSET,
ramdisk: Expectation = UNSET,
@@ -169,7 +172,7 @@ def assert_table_row(
assert raw_line is not None, f"Raw table line for '{expected_path}' not found in output."
raw_parts = [part.strip() for part in raw_line.split("|")]
assert len(raw_parts) >= 6, f"Malformed table row for '{expected_path}': {raw_line}"
assert len(raw_parts) >= 7, f"Malformed table row for '{expected_path}': {raw_line}"
def _check(field_name: str, expected: Expectation, actual: Optional[bool], column_index: int) -> None:
if expected is UNSET:
@@ -183,11 +186,12 @@ def assert_table_row(
f"got '{raw_parts[column_index]}' in row: {raw_line}"
)
_check("writeable", writeable, matching_row.writeable, 1)
_check("mount", mount, matching_row.mount, 2)
_check("ramdisk", ramdisk, matching_row.ramdisk, 3)
_check("performance", performance, matching_row.performance, 4)
_check("dataloss", dataloss, matching_row.dataloss, 5)
_check("readable", readable, matching_row.readable, 1)
_check("writeable", writeable, matching_row.writeable, 2)
_check("mount", mount, matching_row.mount, 3)
_check("ramdisk", ramdisk, matching_row.ramdisk, 4)
_check("performance", performance, matching_row.performance, 5)
_check("dataloss", dataloss, matching_row.dataloss, 6)
return matching_row
@@ -212,9 +216,23 @@ def netalertx_test_image():
image_name = os.environ.get("NETALERTX_TEST_IMAGE", "netalertx-test")
# Check if image exists
result = subprocess.run(
["docker", "images", "-q", image_name], capture_output=True, text=True
)
try:
result = subprocess.run(
["docker", "images", "-q", image_name],
capture_output=True,
text=True,
timeout=10,
check=False,
)
except FileNotFoundError:
pytest.skip("Docker CLI not found; skipping docker-based mount diagnostics tests.")
except subprocess.TimeoutExpired:
pytest.skip("Docker is not responding; skipping docker-based mount diagnostics tests.")
if result.returncode != 0:
pytest.skip(
f"Docker returned error while checking images (rc={result.returncode}): {result.stderr.strip() or '<no stderr>'}"
)
if not result.stdout.strip():
pytest.skip(f"NetAlertX test image '{image_name}' not found. Build it first.")
@@ -293,6 +311,49 @@ def create_test_scenarios() -> List[TestScenario]:
)
)
# Focused coverage: mounted-but-unreadable (-wx) scenarios.
# These are intentionally not part of the full matrix to avoid runtime bloat.
scenarios.extend(
[
TestScenario(
name="data_noread",
path_var="NETALERTX_DATA",
container_path="/data",
is_persistent=True,
docker_compose="docker-compose.mount-test.data_noread.yml",
expected_issues=["table_issues", "warning_message"],
expected_exit_code=0,
),
TestScenario(
name="db_noread",
path_var="NETALERTX_DB",
container_path="/data/db",
is_persistent=True,
docker_compose="docker-compose.mount-test.db_noread.yml",
expected_issues=["table_issues", "warning_message"],
expected_exit_code=0,
),
TestScenario(
name="tmp_noread",
path_var="SYSTEM_SERVICES_RUN_TMP",
container_path="/tmp",
is_persistent=False,
docker_compose="docker-compose.mount-test.tmp_noread.yml",
expected_issues=["table_issues", "warning_message"],
expected_exit_code=0,
),
TestScenario(
name="api_noread",
path_var="NETALERTX_API",
container_path=CONTAINER_PATHS["api"],
is_persistent=False,
docker_compose="docker-compose.mount-test.api_noread.yml",
expected_issues=["table_issues", "warning_message"],
expected_exit_code=0,
),
]
)
return scenarios
@@ -343,6 +404,35 @@ def validate_scenario_table_output(output: str, test_scenario: TestScenario) ->
return
try:
if test_scenario.name.endswith("_noread"):
# Mounted but unreadable: R should fail, W should succeed, and the mount itself
# should otherwise be correctly configured.
if test_scenario.container_path.startswith("/data"):
# Persistent paths: mounted, not a ramdisk, no dataloss flag.
assert_table_row(
output,
test_scenario.container_path,
readable=False,
writeable=True,
mount=True,
ramdisk=None,
performance=None,
dataloss=True,
)
else:
# Ramdisk paths: mounted tmpfs, ramdisk ok, performance ok.
assert_table_row(
output,
test_scenario.container_path,
readable=False,
writeable=True,
mount=True,
ramdisk=True,
performance=True,
dataloss=True,
)
return
if test_scenario.name.startswith("db_"):
if test_scenario.name == "db_ramdisk":
assert_table_row(
@@ -474,6 +564,17 @@ def test_mount_diagnostic(netalertx_test_image, test_scenario):
base_cmd + ["down", "-v"], capture_output=True, timeout=30, env=compose_env
)
# The compose files use a fixed container name; ensure no stale container blocks the run.
container_name = f"netalertx-test-mount-{test_scenario.name}"
subprocess.run(
["docker", "rm", "-f", container_name],
capture_output=True,
text=True,
timeout=30,
check=False,
env=compose_env,
)
cmd_up = base_cmd + ["up", "-d"]
try:
@@ -493,7 +594,6 @@ def test_mount_diagnostic(netalertx_test_image, test_scenario):
time.sleep(1)
# Check if container is still running
container_name = f"netalertx-test-mount-{test_scenario.name}"
result_ps = subprocess.run(
["docker", "ps", "-q", "-f", f"name={container_name}"],
capture_output=True,
@@ -529,6 +629,68 @@ def test_mount_diagnostic(netalertx_test_image, test_scenario):
return # Test passed - container correctly detected issues and exited
# Container is still running - run diagnostic tool
if test_scenario.name.endswith("_noread"):
# Craft a mounted-but-unreadable (-wx) directory owned by uid 20211.
# Do this after container start so entrypoint scripts cannot overwrite it.
prep_cmd = [
"docker",
"exec",
"--user",
"netalertx",
container_name,
"/bin/sh",
"-c",
" ".join(
[
# Baseline structure for stable diagnostics (best-effort).
"mkdir -p /data/db /data/config /tmp/api /tmp/log /tmp/run /tmp/nginx/active-config || true;",
"chmod 0700 /data/db /data/config /tmp/api /tmp/log /tmp/run /tmp/nginx/active-config 2>/dev/null || true;",
# Target path: remove read permission but keep write+execute.
f"chmod 0300 '{test_scenario.container_path}';",
]
),
]
result_prep = subprocess.run(
prep_cmd, capture_output=True, text=True, timeout=30, check=False
)
if result_prep.returncode != 0:
ensure_logs("failed to prepare noread permissions")
pytest.fail(
f"Failed to prepare noread permissions: {result_prep.stderr}\nSTDOUT: {result_prep.stdout}"
)
# Verify as the effective app user: not readable, but writable+executable.
verify_cmd = [
"docker",
"exec",
"--user",
"netalertx",
container_name,
"python3",
"-c",
"".join(
[
"import os, sys; ",
f"p={test_scenario.container_path!r}; ",
"r=os.access(p, os.R_OK); ",
"w=os.access(p, os.W_OK); ",
"x=os.access(p, os.X_OK); ",
"sys.exit(0 if (not r and w and x) else 1)",
]
),
]
result_verify = subprocess.run(
verify_cmd, capture_output=True, text=True, timeout=30, check=False
)
if result_verify.returncode != 0:
ensure_logs("noread verification failed")
pytest.fail(
"noread verification failed for "
f"{test_scenario.container_path}:\n"
f"stdout: {result_verify.stdout}\n"
f"stderr: {result_verify.stderr}"
)
cmd_exec = [
"docker",
"exec",
@@ -543,10 +705,10 @@ def test_mount_diagnostic(netalertx_test_image, test_scenario):
)
diagnostic_output = result_exec.stdout + result_exec.stderr
# The diagnostic tool returns 1 for unwritable paths except active_config, which only warns
# The diagnostic tool returns 1 for rw permission issues except active_config, which only warns
if (test_scenario.name.startswith("active_config_") and "unwritable" in test_scenario.name):
expected_tool_exit = 0
elif "unwritable" in test_scenario.name:
elif "unwritable" in test_scenario.name or test_scenario.name.endswith("_noread"):
expected_tool_exit = 1
else:
expected_tool_exit = 0
@@ -564,8 +726,8 @@ def test_mount_diagnostic(netalertx_test_image, test_scenario):
)
else:
# Should have table output but no warning message
assert "Path" in result_exec.stdout, (
f"Good config {test_scenario.name} should show table, got: {result_exec.stdout}"
assert "Path" in diagnostic_output, (
f"Good config {test_scenario.name} should show table, got: {diagnostic_output}"
)
assert "⚠️" not in diagnostic_output, (
f"Good config {test_scenario.name} should not show warning, got stderr: {result_exec.stderr}"
@@ -583,10 +745,10 @@ def test_table_parsing():
"""Test the table parsing and assertion functions."""
sample_output = """
Path | Writeable | Mount | RAMDisk | Performance | DataLoss
---------------------+-----------+-------+---------+-------------+----------
/data/db | | ❌ | | | ❌
/tmp/api | | ✅ | ✅ | ✅ | ✅
Path | R | W | Mount | RAMDisk | Performance | DataLoss
---------------------+---+---+-------+---------+-------------+----------
/data/db | ✅ | ✅ | ❌ | | | ❌
/tmp/api | ✅ | ✅ | ✅ | ✅ | ✅ | ✅
"""
# Test parsing
@@ -597,6 +759,7 @@ def test_table_parsing():
assert_table_row(
sample_output,
"/data/db",
readable=True,
writeable=True,
mount=False,
ramdisk=None,
@@ -606,6 +769,7 @@ def test_table_parsing():
assert_table_row(
sample_output,
CONTAINER_PATHS["api"],
readable=True,
writeable=True,
mount=True,
ramdisk=True,