Merge pull request #1363 from adamoutler/allow-other-users

Allow other users (Non-Synology)
This commit is contained in:
Jokob @NetAlertX
2025-12-21 20:25:16 +00:00
committed by GitHub
21 changed files with 761 additions and 380 deletions

View File

@@ -17,6 +17,7 @@ import pytest
IMAGE = os.environ.get("NETALERTX_TEST_IMAGE", "netalertx-test")
GRACE_SECONDS = float(os.environ.get("NETALERTX_TEST_GRACE", "2"))
DEFAULT_CAPS = ["NET_RAW", "NET_ADMIN", "NET_BIND_SERVICE"]
SUBPROCESS_TIMEOUT_SECONDS = float(os.environ.get("NETALERTX_TEST_SUBPROCESS_TIMEOUT", "60"))
CONTAINER_TARGETS: dict[str, str] = {
"data": "/data",
@@ -45,78 +46,73 @@ def _unique_label(prefix: str) -> str:
return f"{prefix.upper()}__NETALERTX_INTENTIONAL__{uuid.uuid4().hex[:6]}"
def _create_docker_volume(prefix: str) -> str:
name = f"netalertx-test-{prefix}-{uuid.uuid4().hex[:8]}".lower()
subprocess.run(
["docker", "volume", "create", name],
check=True,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
return name
def _repo_root() -> pathlib.Path:
env = os.environ.get("NETALERTX_REPO_ROOT")
if env:
return pathlib.Path(env)
cur = pathlib.Path(__file__).resolve()
for parent in cur.parents:
if any(
[
(parent / "pyproject.toml").exists(),
(parent / ".git").exists(),
(parent / "back").exists() and (parent / "db").exists(),
]
):
return parent
return cur.parents[2]
def _remove_docker_volume(name: str) -> None:
subprocess.run(
["docker", "volume", "rm", "-f", name],
check=False,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
def _docker_visible_tmp_root() -> pathlib.Path:
"""Return a docker-daemon-visible scratch directory for bind mounts.
Pytest's default tmp_path lives under /tmp inside the devcontainer, which may
not be visible to the Docker daemon that evaluates bind mount source paths.
We use /tmp/pytest-docker-mounts instead of the repo.
"""
root = pathlib.Path("/tmp/pytest-docker-mounts")
root.mkdir(parents=True, exist_ok=True)
try:
root.chmod(0o777)
except PermissionError:
# Best-effort; the directory only needs to be writable by the current user.
pass
return root
def _chown_path(host_path: pathlib.Path, uid: int, gid: int) -> None:
"""Chown a host path using the test image with host user namespace."""
if not host_path.exists():
raise RuntimeError(f"Cannot chown missing path {host_path}")
cmd = [
"docker",
"run",
"--rm",
"--userns",
"host",
"--user",
"0:0",
"--entrypoint",
"/bin/chown",
"-v",
f"{host_path}:/mnt",
IMAGE,
"-R",
f"{uid}:{gid}",
"/mnt",
]
def _docker_visible_path(path: pathlib.Path) -> pathlib.Path:
"""Map a path into `_docker_visible_tmp_root()` when it lives under /tmp."""
try:
subprocess.run(
cmd,
check=True,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
except subprocess.CalledProcessError as exc:
raise RuntimeError(f"Failed to chown {host_path} to {uid}:{gid}") from exc
if str(path).startswith("/tmp/"):
return _docker_visible_tmp_root() / path.name
except Exception:
pass
return path
def _setup_mount_tree(
tmp_path: pathlib.Path,
prefix: str,
*,
seed_config: bool = True,
seed_db: bool = True,
) -> dict[str, pathlib.Path]:
"""Create a compose-like host tree with permissive perms for arbitrary UID/GID."""
label = _unique_label(prefix)
base = tmp_path / f"{label}_MOUNT_ROOT"
base = _docker_visible_tmp_root() / f"{label}_MOUNT_ROOT"
base.mkdir()
base.chmod(0o777)
paths: dict[str, pathlib.Path] = {}
# Create unified /data mount root
data_root = base / f"{label}_DATA_INTENTIONAL_NETALERTX_TEST"
data_root.mkdir(parents=True, exist_ok=True)
data_root.chmod(0o777)
paths["data"] = data_root
# Create required data subdirectories and aliases
db_dir = data_root / "db"
db_dir.mkdir(exist_ok=True)
db_dir.chmod(0o777)
@@ -129,17 +125,12 @@ def _setup_mount_tree(
paths["app_config"] = config_dir
paths["data_config"] = config_dir
# Optional /tmp mounts that certain tests intentionally bind
for key in OPTIONAL_TMP_KEYS:
folder_name = f"{label}_{key.upper()}_INTENTIONAL_NETALERTX_TEST"
host_path = base / folder_name
host_path.mkdir(parents=True, exist_ok=True)
try:
host_path.chmod(0o777)
except PermissionError:
pass
host_path.chmod(0o777)
paths[key] = host_path
# Provide backwards-compatible aliases where helpful
if key == "app_log":
paths["log"] = host_path
elif key == "app_api":
@@ -147,54 +138,45 @@ def _setup_mount_tree(
elif key == "nginx_conf":
paths["nginx_active"] = host_path
# Determine repo root from env or by walking up from this file
repo_root_env = os.environ.get("NETALERTX_REPO_ROOT")
if repo_root_env:
repo_root = pathlib.Path(repo_root_env)
else:
repo_root = None
cur = pathlib.Path(__file__).resolve()
for parent in cur.parents:
if any([
(parent / "pyproject.toml").exists(),
(parent / ".git").exists(),
(parent / "back").exists() and (parent / "db").exists()
]):
repo_root = parent
break
if repo_root is None:
repo_root = cur.parents[2]
repo_root = _repo_root()
if seed_config:
config_file = paths["app_config"] / "app.conf"
config_src = repo_root / "back" / "app.conf"
if not config_src.exists():
print(
f"[WARN] Seed file not found: {config_src}. Set NETALERTX_REPO_ROOT or run from repo root. Skipping copy."
)
else:
shutil.copyfile(config_src, config_file)
config_file.chmod(0o600)
config_dst = paths["app_config"] / "app.conf"
if config_src.exists():
shutil.copyfile(config_src, config_dst)
config_dst.chmod(0o666)
if seed_db:
db_file = paths["app_db"] / "app.db"
db_src = repo_root / "db" / "app.db"
if not db_src.exists():
print(
f"[WARN] Seed file not found: {db_src}. Set NETALERTX_REPO_ROOT or run from repo root. Skipping copy."
)
else:
shutil.copyfile(db_src, db_file)
db_file.chmod(0o600)
db_dst = paths["app_db"] / "app.db"
if db_src.exists():
shutil.copyfile(db_src, db_dst)
db_dst.chmod(0o666)
_chown_netalertx(base)
# Ensure every mount point is world-writable so arbitrary UID/GID can write
for p in paths.values():
if p.is_dir():
p.chmod(0o777)
for child in p.iterdir():
if child.is_dir():
child.chmod(0o777)
else:
child.chmod(0o666)
else:
p.chmod(0o666)
return paths
def _setup_fixed_mount_tree(base: pathlib.Path) -> dict[str, pathlib.Path]:
base = _docker_visible_path(base)
if base.exists():
shutil.rmtree(base)
base.mkdir(parents=True)
try:
base.chmod(0o777)
except PermissionError:
pass
paths: dict[str, pathlib.Path] = {}
@@ -252,6 +234,42 @@ def _build_volume_args_for_keys(
return bindings
def _chown_path(host_path: pathlib.Path, uid: int, gid: int) -> None:
"""Chown a host path using the test image with host user namespace."""
if not host_path.exists():
raise RuntimeError(f"Cannot chown missing path {host_path}")
cmd = [
"docker",
"run",
"--rm",
"--userns",
"host",
"--user",
"0:0",
"--entrypoint",
"/bin/chown",
"-v",
f"{host_path}:/mnt",
IMAGE,
"-R",
f"{uid}:{gid}",
"/mnt",
]
try:
subprocess.run(
cmd,
check=True,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
timeout=SUBPROCESS_TIMEOUT_SECONDS,
)
except subprocess.CalledProcessError as exc:
raise RuntimeError(f"Failed to chown {host_path} to {uid}:{gid}") from exc
def _chown_root(host_path: pathlib.Path) -> None:
_chown_path(host_path, 0, 0)
@@ -260,6 +278,166 @@ def _chown_netalertx(host_path: pathlib.Path) -> None:
_chown_path(host_path, 20211, 20211)
def _docker_volume_rm(volume_name: str) -> None:
subprocess.run(
["docker", "volume", "rm", "-f", volume_name],
check=False,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
timeout=SUBPROCESS_TIMEOUT_SECONDS,
)
def _docker_volume_create(volume_name: str) -> None:
subprocess.run(
["docker", "volume", "create", volume_name],
check=True,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
timeout=SUBPROCESS_TIMEOUT_SECONDS,
)
def _fresh_named_volume(prefix: str) -> str:
name = _unique_label(prefix).lower().replace("__", "-")
# Ensure we're exercising Docker's fresh-volume copy-up behavior.
_docker_volume_rm(name)
return name
def _ensure_volume_copy_up(volume_name: str) -> None:
"""Ensure a named volume is initialized from the NetAlertX image.
If we write into the volume first (e.g., with an Alpine helper container),
Docker will not perform the image-to-volume copy-up and the volume root may
stay root:root 0755, breaking arbitrary UID/GID runs.
"""
subprocess.run(
[
"docker",
"run",
"--rm",
"--userns",
"host",
"-v",
f"{volume_name}:/data",
"--entrypoint",
"/bin/sh",
IMAGE,
"-c",
"true",
],
check=True,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
timeout=SUBPROCESS_TIMEOUT_SECONDS,
)
def _seed_volume_text_file(
volume_name: str,
container_path: str,
content: str,
*,
chmod_mode: str = "644",
user: str | None = None,
) -> None:
"""Create/overwrite a text file inside a named volume.
Uses a tiny helper container so we don't rely on bind mounts (which are
resolved on the Docker daemon host).
"""
cmd = [
"docker",
"run",
"--rm",
"--userns",
"host",
]
if user:
cmd.extend(["--user", user])
cmd.extend(
[
"-v",
f"{volume_name}:/data",
"alpine:3.22",
"sh",
"-c",
f"set -eu; mkdir -p \"$(dirname '{container_path}')\"; cat > '{container_path}'; chmod {chmod_mode} '{container_path}'",
]
)
subprocess.run(
cmd,
input=content,
text=True,
check=True,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
timeout=SUBPROCESS_TIMEOUT_SECONDS,
)
def _volume_has_file(volume_name: str, container_path: str) -> bool:
return (
subprocess.run(
[
"docker",
"run",
"--rm",
"--userns",
"host",
"-v",
f"{volume_name}:/data",
"alpine:3.22",
"sh",
"-c",
f"test -f '{container_path}'",
],
check=False,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
timeout=SUBPROCESS_TIMEOUT_SECONDS,
).returncode
== 0
)
@pytest.mark.parametrize(
"uid_gid",
[
(1001, 1001),
(1502, 1502),
],
)
def test_nonroot_custom_uid_logs_note(
tmp_path: pathlib.Path,
uid_gid: tuple[int, int],
) -> None:
"""Ensure arbitrary non-root UID/GID can run with compose-like mounts."""
uid, gid = uid_gid
vol = _fresh_named_volume(f"note_uid_{uid}")
try:
# Fresh named volume at /data: matches default docker-compose UX.
result = _run_container(
f"note-uid-{uid}",
volumes=None,
volume_specs=[f"{vol}:/data"],
user=f"{uid}:{gid}",
sleep_seconds=5,
)
finally:
_docker_volume_rm(vol)
_assert_contains(result, f"NetAlertX note: current UID {uid} GID {gid}", result.args)
assert "expected UID" in result.output
assert result.returncode == 0
def _run_container(
label: str,
volumes: list[tuple[str, str, bool]] | None = None,
@@ -272,34 +450,64 @@ def _run_container(
volume_specs: list[str] | None = None,
sleep_seconds: float = GRACE_SECONDS,
wait_for_exit: bool = False,
pre_entrypoint: str | None = None,
userns_mode: str | None = "host",
image: str = IMAGE,
) -> subprocess.CompletedProcess[str]:
name = f"netalertx-test-{label}-{uuid.uuid4().hex[:8]}".lower()
tmp_uid = 20211
tmp_gid = 20211
if user:
try:
u_str, g_str = user.split(":", 1)
tmp_uid = int(u_str)
tmp_gid = int(g_str)
except Exception:
# Keep defaults if user format is unexpected.
tmp_uid = 20211
tmp_gid = 20211
# Clean up any existing container with this name
subprocess.run(
["docker", "rm", "-f", name],
check=False,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
timeout=SUBPROCESS_TIMEOUT_SECONDS,
)
cmd: list[str] = ["docker", "run", "--rm", "--name", name]
# Avoid flakiness in host-network runs when the host already uses the
# default NetAlertX ports. Tests can still override explicitly via `env`.
effective_env: dict[str, str] = dict(env or {})
if network_mode == "host":
if "PORT" not in effective_env:
effective_env["PORT"] = str(30000 + (int(uuid.uuid4().hex[:4], 16) % 20000))
if "GRAPHQL_PORT" not in effective_env:
gql = 30000 + (int(uuid.uuid4().hex[4:8], 16) % 20000)
if str(gql) == effective_env["PORT"]:
gql = 30000 + ((gql + 1) % 20000)
effective_env["GRAPHQL_PORT"] = str(gql)
if network_mode:
cmd.extend(["--network", network_mode])
cmd.extend(["--userns", "host"])
# Add default ramdisk to /tmp with permissions 777
cmd.extend(["--tmpfs", "/tmp:mode=777"])
if userns_mode:
cmd.extend(["--userns", userns_mode])
# Match docker-compose UX: /tmp is tmpfs with 1700 and owned by the runtime UID/GID.
cmd.extend(["--tmpfs", f"/tmp:mode=1700,uid={tmp_uid},gid={tmp_gid}"])
if user:
cmd.extend(["--user", user])
if drop_caps:
if drop_caps is not None:
for cap in drop_caps:
cmd.extend(["--cap-drop", cap])
else:
cmd.extend(["--cap-drop", "ALL"])
for cap in DEFAULT_CAPS:
cmd.extend(["--cap-add", cap])
if env:
for key, value in env.items():
if effective_env:
for key, value in effective_env.items():
cmd.extend(["-e", f"{key}={value}"])
if extra_args:
cmd.extend(extra_args)
@@ -323,17 +531,24 @@ def _run_container(
mounts_ls += f" {target}"
mounts_ls += " || true; echo '--- END MOUNTS ---'; \n"
setup_script = ""
if pre_entrypoint:
setup_script = pre_entrypoint
if not setup_script.endswith("\n"):
setup_script += "\n"
if wait_for_exit:
script = mounts_ls + "sh /entrypoint.sh"
script = mounts_ls + setup_script + "sh /entrypoint.sh"
else:
script = "".join([
mounts_ls,
setup_script,
"sh /entrypoint.sh & pid=$!; ",
f"sleep {sleep_seconds}; ",
"if kill -0 $pid >/dev/null 2>&1; then kill -TERM $pid >/dev/null 2>&1 || true; fi; ",
"wait $pid; code=$?; if [ $code -eq 143 ]; then exit 0; fi; exit $code"
])
cmd.extend(["--entrypoint", "/bin/sh", IMAGE, "-c", script])
cmd.extend(["--entrypoint", "/bin/sh", image, "-c", script])
# Print the full Docker command for debugging
print("\n--- DOCKER CMD ---\n", " ".join(cmd), "\n--- END CMD ---\n")
@@ -342,7 +557,7 @@ def _run_container(
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
timeout=sleep_seconds + 30,
timeout=max(SUBPROCESS_TIMEOUT_SECONDS, sleep_seconds + 30),
check=False,
)
# Combine and clean stdout and stderr
@@ -509,23 +724,17 @@ def test_missing_host_network_warns(tmp_path: pathlib.Path) -> None:
Check script: check-network-mode.sh
Sample message: "⚠️ ATTENTION: NetAlertX is not running with --network=host. Bridge networking..."
"""
base = tmp_path / "missing_host_net_base"
paths = _setup_fixed_mount_tree(base)
# Ensure directories are writable and owned by netalertx user so container can operate
for key in ["data", "app_db", "app_config"]:
paths[key].chmod(0o777)
_chown_netalertx(paths[key])
# Create a config file so the writable check passes
config_file = paths["app_config"] / "app.conf"
config_file.write_text("test config")
config_file.chmod(0o666)
_chown_netalertx(config_file)
volumes = _build_volume_args_for_keys(paths, {"data"})
result = _run_container(
"missing-host-network",
volumes,
network_mode=None,
)
vol = _fresh_named_volume("missing_host_network")
try:
result = _run_container(
"missing-host-network",
volumes=None,
volume_specs=[f"{vol}:/data"],
network_mode=None,
sleep_seconds=15,
)
finally:
_docker_volume_rm(vol)
_assert_contains(result, "not running with --network=host", result.args)
@@ -536,146 +745,146 @@ def test_missing_host_network_warns(tmp_path: pathlib.Path) -> None:
# top level.
if False: # pragma: no cover - placeholder until writable /data fixtures exist for these flows
def test_running_as_uid_1000_warns(tmp_path: pathlib.Path) -> None:
# No output assertion, just returncode check
"""Test running as wrong user - simulates using arbitrary user instead of netalertx.
7. Running as Wrong User: Simulates running as arbitrary user (UID 1000) instead
of netalertx user. Permission errors due to incorrect user context.
Expected: Permission errors, guidance to use correct user.
def test_missing_app_conf_triggers_seed(tmp_path: pathlib.Path) -> None:
"""Test missing configuration file seeding - simulates corrupted/missing app.conf.
Check script: /entrypoint.d/60-user-netalertx.sh
Sample message: "⚠️ ATTENTION: NetAlertX is running as UID 1000:1000. Hardened permissions..."
"""
paths = _setup_mount_tree(tmp_path, "run_as_1000")
volumes = _build_volume_args_for_keys(paths, {"data"})
9. Missing Configuration File: Simulates corrupted/missing app.conf.
Container automatically regenerates default configuration on startup.
Expected: Automatic regeneration of default configuration.
Check script: /entrypoint.d/15-first-run-config.sh
Sample message: "Default configuration written to"
"""
vol = _fresh_named_volume("missing_app_conf")
try:
result = _run_container(
"run-as-1000",
volumes,
user="1000:1000",
"missing-app-conf",
volumes=None,
volume_specs=[f"{vol}:/data"],
sleep_seconds=15,
)
_assert_contains(result, "NetAlertX is running as UID 1000:1000", result.args)
finally:
_docker_volume_rm(vol)
_assert_contains(result, "Default configuration written to", result.args)
assert result.returncode == 0
def test_missing_app_conf_triggers_seed(tmp_path: pathlib.Path) -> None:
"""Test missing configuration file seeding - simulates corrupted/missing app.conf.
9. Missing Configuration File: Simulates corrupted/missing app.conf.
Container automatically regenerates default configuration on startup.
Expected: Automatic regeneration of default configuration.
def test_missing_app_db_triggers_seed(tmp_path: pathlib.Path) -> None:
"""Test missing database file seeding - simulates corrupted/missing app.db.
Check script: /entrypoint.d/15-first-run-config.sh
Sample message: "Default configuration written to"
"""
base = tmp_path / "missing_app_conf_base"
paths = _setup_fixed_mount_tree(base)
for key in ["data", "app_db", "app_config"]:
paths[key].chmod(0o777)
_chown_netalertx(paths[key])
(paths["app_config"] / "testfile.txt").write_text("test")
volumes = _build_volume_args_for_keys(paths, {"data"})
result = _run_container("missing-app-conf", volumes, sleep_seconds=5)
_assert_contains(result, "Default configuration written to", result.args)
assert result.returncode == 0
10. Missing Database File: Simulates corrupted/missing app.db.
Container automatically creates initial database schema on startup.
Expected: Automatic creation of initial database schema.
def test_missing_app_db_triggers_seed(tmp_path: pathlib.Path) -> None:
"""Test missing database file seeding - simulates corrupted/missing app.db.
10. Missing Database File: Simulates corrupted/missing app.db.
Container automatically creates initial database schema on startup.
Expected: Automatic creation of initial database schema.
Check script: /entrypoint.d/20-first-run-db.sh
Sample message: "Building initial database schema"
"""
base = tmp_path / "missing_app_db_base"
paths = _setup_fixed_mount_tree(base)
_chown_netalertx(paths["app_db"])
(paths["app_db"] / "testfile.txt").write_text("test")
volumes = _build_volume_args_for_keys(paths, {"data"})
Check script: /entrypoint.d/20-first-run-db.sh
Sample message: "Building initial database schema"
"""
vol = _fresh_named_volume("missing_app_db")
try:
_ensure_volume_copy_up(vol)
# Seed only app.conf; leave app.db missing to trigger first-run DB schema creation.
_seed_volume_text_file(
vol,
"/data/config/app.conf",
"TIMEZONE='UTC'\n",
chmod_mode="644",
user="20211:20211",
)
result = _run_container(
"missing-app-db",
volumes,
volumes=None,
volume_specs=[f"{vol}:/data"],
user="20211:20211",
sleep_seconds=5,
wait_for_exit=True,
sleep_seconds=20,
)
_assert_contains(result, "Building initial database schema", result.args)
assert result.returncode != 0
assert _volume_has_file(vol, "/data/db/app.db")
finally:
_docker_volume_rm(vol)
assert result.returncode == 0
def test_custom_port_without_writable_conf(tmp_path: pathlib.Path) -> None:
"""Test custom port configuration without writable nginx config mount.
4. Custom Port Without Nginx Config Mount: Simulates setting custom LISTEN_ADDR/PORT
without mounting nginx config. Container starts but uses default address.
Expected: Container starts but uses default address, warning about missing config mount.
def test_custom_port_without_writable_conf(tmp_path: pathlib.Path) -> None:
"""Test custom port configuration without writable nginx config mount.
Check script: check-nginx-config.sh
Sample messages: "⚠️ ATTENTION: Nginx configuration mount /tmp/nginx/active-config is missing."
"⚠️ ATTENTION: Unable to write to /tmp/nginx/active-config/netalertx.conf."
"""
paths = _setup_mount_tree(tmp_path, "custom_port_ro_conf")
for key in ["app_db", "app_config", "app_log", "app_api", "services_run"]:
paths[key].chmod(0o777)
paths["nginx_conf"].chmod(0o500)
volumes = _build_volume_args_for_keys(
paths,
{"data", "app_log", "app_api", "services_run", "nginx_conf"},
4. Custom Port Without Nginx Config Mount: Simulates setting custom LISTEN_ADDR/PORT
without mounting nginx config. Container starts but uses default address.
Expected: Container starts but uses default address, warning about missing config mount.
Check script: check-nginx-config.sh
Sample messages: "⚠️ ATTENTION: Nginx configuration mount /tmp/nginx/active-config is missing."
"⚠️ ATTENTION: Unable to write to /tmp/nginx/active-config/netalertx.conf."
"""
vol = _fresh_named_volume("custom_port_ro_conf")
extra_args = [
"--tmpfs",
f"{VOLUME_MAP['nginx_conf']}:uid=20211,gid=20211,mode=500",
]
try:
result = _run_container(
"custom-port-ro-conf",
volumes=None,
volume_specs=[f"{vol}:/data"],
env={"PORT": "24444", "LISTEN_ADDR": "127.0.0.1"},
user="20211:20211",
extra_args=extra_args,
sleep_seconds=15,
)
try:
result = _run_container(
"custom-port-ro-conf",
volumes,
env={"PORT": "24444", "LISTEN_ADDR": "127.0.0.1"},
user="20211:20211",
sleep_seconds=5,
)
_assert_contains(result, "Unable to write to", result.args)
_assert_contains(
result, f"{VOLUME_MAP['nginx_conf']}/netalertx.conf", result.args
)
assert result.returncode != 0
finally:
paths["nginx_conf"].chmod(0o755)
finally:
_docker_volume_rm(vol)
_assert_contains(result, "Unable to write to", result.args)
_assert_contains(
result, f"{VOLUME_MAP['nginx_conf']}/netalertx.conf", result.args
)
assert result.returncode != 0
def test_excessive_capabilities_warning(tmp_path: pathlib.Path) -> None:
"""Test excessive capabilities detection - simulates container with extra capabilities.
def test_excessive_capabilities_warning(tmp_path: pathlib.Path) -> None:
"""Test excessive capabilities detection - simulates container with extra capabilities.
11. Excessive Capabilities: Simulates container with capabilities beyond the required
NET_ADMIN, NET_RAW, and NET_BIND_SERVICE.
Expected: Warning about excessive capabilities detected.
11. Excessive Capabilities: Simulates container with capabilities beyond the required
NET_ADMIN, NET_RAW, and NET_BIND_SERVICE.
Expected: Warning about excessive capabilities detected.
Check script: 90-excessive-capabilities.sh
Sample message: "Excessive capabilities detected"
"""
paths = _setup_mount_tree(tmp_path, "excessive_caps")
volumes = _build_volume_args_for_keys(paths, {"data"})
Check script: 90-excessive-capabilities.sh
Sample message: "Excessive capabilities detected"
"""
vol = _fresh_named_volume("excessive_caps")
try:
result = _run_container(
"excessive-caps",
volumes,
volumes=None,
volume_specs=[f"{vol}:/data"],
extra_args=["--cap-add=SYS_ADMIN", "--cap-add=NET_BROADCAST"],
sleep_seconds=5,
sleep_seconds=15,
)
_assert_contains(result, "Excessive capabilities detected", result.args)
_assert_contains(result, "bounding caps:", result.args)
finally:
_docker_volume_rm(vol)
_assert_contains(result, "Excessive capabilities detected", result.args)
_assert_contains(result, "bounding caps:", result.args)
def test_appliance_integrity_read_write_mode(tmp_path: pathlib.Path) -> None:
"""Test appliance integrity - simulates running with read-write root filesystem.
def test_appliance_integrity_read_write_mode(tmp_path: pathlib.Path) -> None:
"""Test appliance integrity - simulates running with read-write root filesystem.
12. Appliance Integrity: Simulates running container with read-write root filesystem
instead of read-only mode.
Expected: Warning about running in read-write mode instead of read-only.
12. Appliance Integrity: Simulates running container with read-write root filesystem
instead of read-only mode.
Expected: Warning about running in read-write mode instead of read-only.
Check script: 95-appliance-integrity.sh
Sample message: "Container is running as read-write, not in read-only mode"
"""
paths = _setup_mount_tree(tmp_path, "appliance_integrity")
volumes = _build_volume_args_for_keys(paths, {"data"})
result = _run_container("appliance-integrity", volumes, sleep_seconds=5)
_assert_contains(
result, "Container is running as read-write, not in read-only mode", result.args
Check script: 95-appliance-integrity.sh
Sample message: "Container is running as read-write, not in read-only mode"
"""
vol = _fresh_named_volume("appliance_integrity")
try:
result = _run_container(
"appliance-integrity",
volumes=None,
volume_specs=[f"{vol}:/data"],
sleep_seconds=15,
)
_assert_contains(result, "read-only: true", result.args)
finally:
_docker_volume_rm(vol)
_assert_contains(
result, "Container is running as read-write, not in read-only mode", result.args
)
def test_zero_permissions_app_db_dir(tmp_path: pathlib.Path) -> None:
@@ -769,19 +978,26 @@ def test_mandatory_folders_creation(tmp_path: pathlib.Path) -> None:
def test_writable_config_validation(tmp_path: pathlib.Path) -> None:
"""Test writable config validation - simulates read-only config file.
"""Test writable config validation - simulates invalid config file type.
3. Writable Config Validation: Simulates config file with read-only permissions.
3. Writable Config Validation: Simulates app.conf being a non-regular file (directory).
Container verifies it can read from and write to critical config and database files.
Expected: "Read permission denied" warning for config file.
Expected: "Path is not a regular file" warning for config file.
Check script: 30-writable-config.sh
Sample message: "Read permission denied"
Check script: 35-writable-config.sh
Sample message: "Path is not a regular file"
"""
paths = _setup_mount_tree(tmp_path, "writable_config")
# Make config file read-only but keep directories writable so container gets past mounts.py
config_file = paths["app_config"] / "app.conf"
config_file.chmod(0o400) # Read-only for owner
# Force a non-regular file for /data/config/app.conf to exercise the correct warning branch.
config_path = paths["app_config"] / "app.conf"
if config_path.exists():
if config_path.is_dir():
shutil.rmtree(config_path)
else:
config_path.unlink()
config_path.mkdir(parents=False)
config_path.chmod(0o777)
_chown_netalertx(config_path)
# Ensure directories are writable and owned by netalertx user so container gets past mounts.py
for key in [
@@ -799,7 +1015,7 @@ def test_writable_config_validation(tmp_path: pathlib.Path) -> None:
result = _run_container(
"writable-config", volumes, user="20211:20211", sleep_seconds=5.0
)
_assert_contains(result, "Read permission denied", result.args)
_assert_contains(result, "ATTENTION: Path is not a regular file.", result.args)
def test_mount_analysis_ram_disk_performance(tmp_path: pathlib.Path) -> None:
@@ -904,3 +1120,92 @@ def test_mount_analysis_dataloss_risk(tmp_path: pathlib.Path) -> None:
# Check that configuration issues are detected due to dataloss risk
_assert_contains(result, "Configuration issues detected", result.args)
assert result.returncode != 0
def test_restrictive_permissions_handling(tmp_path: pathlib.Path) -> None:
"""Test handling of restrictive permissions on bind mounts.
Simulates a user mounting a directory with restrictive permissions (e.g., 755 root:root).
The container should either fail gracefully or handle it if running as root (which triggers fix).
If running as non-root (default), it should fail to write if it doesn't have access.
"""
paths = _setup_mount_tree(tmp_path, "restrictive_perms")
# Helper to chown without userns host (workaround for potential devcontainer hang)
def _chown_root_safe(host_path: pathlib.Path) -> None:
cmd = [
"docker", "run", "--rm",
# "--userns", "host", # Removed to avoid hang
"--user", "0:0",
"--entrypoint", "/bin/chown",
"-v", f"{host_path}:/mnt",
IMAGE,
"-R", "0:0", "/mnt",
]
subprocess.run(
cmd,
check=True,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
timeout=SUBPROCESS_TIMEOUT_SECONDS,
)
# Set up a restrictive directory (root owned, 755)
target_dir = paths["app_db"]
_chown_root_safe(target_dir)
target_dir.chmod(0o755)
# Mount ALL volumes to avoid 'find' errors in 0-storage-permission.sh
keys = {"data", "app_db", "app_config", "app_log", "app_api", "services_run", "nginx_conf"}
volumes = _build_volume_args_for_keys(paths, keys)
# Case 1: Running as non-root (default) - Should fail to write
# We disable host network/userns to avoid potential hangs in devcontainer environment
result = _run_container(
"restrictive-perms-user",
volumes,
user="20211:20211",
sleep_seconds=5,
network_mode=None,
userns_mode=None
)
assert result.returncode != 0 or "Permission denied" in result.output or "Unable to write" in result.output
# Case 2: Running as root - Should trigger the fix script
result_root = _run_container(
"restrictive-perms-root",
volumes,
user="0:0",
sleep_seconds=5,
network_mode=None,
userns_mode=None
)
_assert_contains(result_root, "NetAlertX is running as ROOT", result_root.args)
_assert_contains(result_root, "Permissions fixed for read-write paths", result_root.args)
check_cmd = [
"docker", "run", "--rm",
"--entrypoint", "/bin/sh",
"--user", "20211:20211",
IMAGE,
"-c", "ls -ldn /data/db && touch /data/db/test_write_after_fix"
]
# Add all volumes to check_cmd too
for host_path, target, _readonly in volumes:
check_cmd.extend(["-v", f"{host_path}:{target}"])
check_result = subprocess.run(
check_cmd,
capture_output=True,
text=True,
timeout=SUBPROCESS_TIMEOUT_SECONDS,
)
if check_result.returncode != 0:
print(f"Check command failed. Cmd: {check_cmd}")
print(f"Stderr: {check_result.stderr}")
print(f"Stdout: {check_result.stdout}")
assert check_result.returncode == 0, f"Should be able to write after root fix script runs. Stderr: {check_result.stderr}. Stdout: {check_result.stdout}"

View File

@@ -49,11 +49,11 @@ def test_skip_tests_env_var():
@pytest.mark.feature_complete
def test_app_conf_override_from_graphql_port():
# If GRAPHQL_PORT is set and APP_CONF_OVERRIDE is not set, the entrypoint should set
# APP_CONF_OVERRIDE to a JSON string containing the GRAPHQL_PORT value and print a message
# about it.
# APP_CONF_OVERRIDE to a JSON string containing the GRAPHQL_PORT value.
# The script should exit successfully.
result = _run_entrypoint(env={"GRAPHQL_PORT": "20212", "SKIP_TESTS": "1"}, check_only=True)
assert 'Setting APP_CONF_OVERRIDE to {"GRAPHQL_PORT":"20212"}' in result.stdout
assert 'Setting APP_CONF_OVERRIDE to' not in result.stdout
assert 'APP_CONF_OVERRIDE detected' in result.stderr
assert result.returncode == 0

View File

@@ -5,6 +5,14 @@ Pytest-based Mount Diagnostic Tests for NetAlertX
Tests all possible mount configurations for each path to validate the diagnostic tool.
Uses pytest framework for proper test discovery and execution.
TODO: Future Robustness & Compatibility Tests
1. Symlink Attacks: Verify behavior when a writable directory is mounted via a symlink.
Hypothesis: The tool might misidentify the mount status or path.
2. OverlayFS/Copy-up Scenarios: Investigate behavior on filesystems like Synology's OverlayFS.
Hypothesis: Files might appear writable but fail on specific operations (locking, mmap).
3. Text-based Output: Refactor output to support text-based status (e.g., [OK], [FAIL])
instead of emojis for better compatibility with terminals that don't support unicode.
All tests use the mounts table. For reference, the mounts table looks like this:
Path | Writeable | Mount | RAMDisk | Performance | DataLoss
@@ -604,3 +612,4 @@ def test_table_parsing():
performance=True,
dataloss=True,
)