Fix for ports

This commit is contained in:
Adam Outler
2025-11-11 22:54:07 +00:00
parent 9d56e13818
commit 9c366881f1
9 changed files with 271 additions and 203 deletions

View File

@@ -9,7 +9,13 @@ import copy
import os
import pathlib
import re
import shutil
import socket
import subprocess
import time
from collections.abc import Callable, Iterable
from _pytest.outcomes import Skipped
import pytest
import yaml
@@ -29,6 +35,55 @@ CONTAINER_PATHS = {
TMPFS_ROOT = "/tmp:uid=20211,gid=20211,mode=1700,rw,noexec,nosuid,nodev,async,noatime,nodiratime"
DEFAULT_HTTP_PORT = int(os.environ.get("NETALERTX_DEFAULT_HTTP_PORT", "20211"))
COMPOSE_PORT_WAIT_TIMEOUT = int(os.environ.get("NETALERTX_COMPOSE_PORT_WAIT_TIMEOUT", "180"))
COMPOSE_SETTLE_WAIT_SECONDS = int(os.environ.get("NETALERTX_COMPOSE_SETTLE_WAIT", "15"))
PREFERRED_CUSTOM_PORTS = (22111, 22112)
HOST_ADDR_ENV = os.environ.get("NETALERTX_HOST_ADDRS", "")
def _discover_host_addresses() -> tuple[str, ...]:
"""Return candidate loopback addresses for reaching host-mode containers."""
candidates: list[str] = ["127.0.0.1"]
if HOST_ADDR_ENV:
env_addrs = [addr.strip() for addr in HOST_ADDR_ENV.split(",") if addr.strip()]
candidates.extend(env_addrs)
ip_cmd = shutil.which("ip")
if ip_cmd:
try:
route_proc = subprocess.run(
[ip_cmd, "-4", "route", "show", "default"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
check=False,
timeout=5,
)
except (OSError, subprocess.TimeoutExpired):
route_proc = None
if route_proc and route_proc.returncode == 0 and route_proc.stdout:
match = re.search(r"default\s+via\s+(?P<gateway>\S+)", route_proc.stdout)
if match:
gateway = match.group("gateway")
candidates.append(gateway)
# Deduplicate while preserving order
seen: set[str] = set()
deduped: list[str] = []
for addr in candidates:
if addr not in seen:
deduped.append(addr)
seen.add(addr)
return tuple(deduped)
HOST_ADDRESS_CANDIDATES = _discover_host_addresses()
LAST_PORT_SUCCESSES: dict[int, str] = {}
pytestmark = [pytest.mark.docker, pytest.mark.compose]
IMAGE = os.environ.get("NETALERTX_TEST_IMAGE", "netalertx-test")
@@ -151,12 +206,142 @@ def _extract_conflict_container_name(output: str) -> str | None:
return None
def _port_is_free(port: int) -> bool:
"""Return True if a TCP port is available on localhost."""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
sock.bind(("127.0.0.1", port))
except OSError:
return False
return True
def _wait_for_ports(ports: Iterable[int], timeout: int = COMPOSE_PORT_WAIT_TIMEOUT) -> None:
"""Block until every port in the iterable accepts TCP connections or timeout expires."""
remaining = set(ports)
deadline = time.time() + timeout
last_errors: dict[int, dict[str, BaseException]] = {port: {} for port in remaining}
while remaining and time.time() < deadline:
ready: list[int] = []
for port in list(remaining):
for addr in HOST_ADDRESS_CANDIDATES:
try:
with socket.create_connection((addr, port), timeout=2):
ready.append(port)
LAST_PORT_SUCCESSES[port] = addr
break
except OSError as exc:
last_errors.setdefault(port, {})[addr] = exc
else:
continue
for port in ready:
remaining.discard(port)
if remaining:
time.sleep(1)
if remaining:
details: list[str] = []
for port in sorted(remaining):
addr_errors = last_errors.get(port, {})
if addr_errors:
error_summary = ", ".join(f"{addr}: {err}" for addr, err in addr_errors.items())
else:
error_summary = "no connection attempts recorded"
details.append(f"{port} -> {error_summary}")
raise TimeoutError(
"Ports did not become ready before timeout: " + "; ".join(details)
)
def _select_custom_ports() -> tuple[int, int]:
"""Choose a pair of non-default ports, preferring the standard high test pair when free."""
preferred_http, preferred_graphql = PREFERRED_CUSTOM_PORTS
if _port_is_free(preferred_http) and _port_is_free(preferred_graphql):
return preferred_http, preferred_graphql
# Fall back to scanning ephemeral range for the first free consecutive pair.
for port in range(30000, 60000, 2):
if _port_is_free(port) and _port_is_free(port + 1):
return port, port + 1
raise RuntimeError("Unable to locate two free high ports for compose testing")
def _make_port_check_hook(ports: tuple[int, ...]) -> Callable[[], None]:
"""Return a callback that waits for the provided ports to accept TCP connections."""
def _hook() -> None:
for port in ports:
LAST_PORT_SUCCESSES.pop(port, None)
time.sleep(COMPOSE_SETTLE_WAIT_SECONDS)
_wait_for_ports(ports, timeout=COMPOSE_PORT_WAIT_TIMEOUT)
return _hook
def _write_normal_startup_compose(
base_dir: pathlib.Path,
project_name: str,
env_overrides: dict[str, str] | None,
) -> pathlib.Path:
"""Generate a compose file for the normal startup scenario with optional environment overrides."""
compose_config = copy.deepcopy(COMPOSE_CONFIGS["normal_startup"])
service = compose_config["services"]["netalertx"]
data_volume_name = f"{project_name}_data"
service["volumes"][0]["source"] = data_volume_name
if env_overrides:
service_env = service.setdefault("environment", {})
service_env.update(env_overrides)
compose_config["volumes"] = {data_volume_name: {}}
compose_file = base_dir / "docker-compose.yml"
with open(compose_file, "w") as f:
yaml.dump(compose_config, f)
return compose_file
def _assert_ports_ready(
result: subprocess.CompletedProcess,
project_name: str,
ports: tuple[int, ...],
) -> str:
"""Validate the post-up hook succeeded and return sanitized compose logs for further assertions."""
post_error = getattr(result, "post_up_error", None)
clean_output = ANSI_ESCAPE.sub("", result.output)
port_hosts = {port: LAST_PORT_SUCCESSES.get(port) for port in ports}
result.port_hosts = port_hosts # type: ignore[attr-defined]
if post_error:
pytest.fail(
"Port readiness check failed for project"
f" {project_name} on ports {ports}: {post_error}\n"
f"Compose logs:\n{clean_output}"
)
port_summary = ", ".join(
f"{port}@{addr if addr else 'unresolved'}" for port, addr in port_hosts.items()
)
print(f"[compose port hosts] {project_name}: {port_summary}")
return clean_output
def _run_docker_compose(
compose_file: pathlib.Path,
project_name: str,
timeout: int = 5,
env_vars: dict | None = None,
detached: bool = False,
post_up: Callable[[], None] | None = None,
) -> subprocess.CompletedProcess:
"""Run docker compose up and capture output."""
cmd = [
@@ -219,10 +404,21 @@ def _run_docker_compose(
continue
return proc
post_up_exc: BaseException | None = None
skip_exc: Skipped | None = None
try:
if detached:
up_result = _run_with_conflict_retry(up_cmd, timeout)
if post_up:
try:
post_up()
except Skipped as exc:
skip_exc = exc
except BaseException as exc: # noqa: BLE001 - bubble the root cause through the result payload
post_up_exc = exc
logs_cmd = cmd + ["logs"]
logs_result = subprocess.run(
logs_cmd,
@@ -255,6 +451,9 @@ def _run_docker_compose(
# Combine stdout and stderr
result.output = result.stdout + result.stderr
result.post_up_error = post_up_exc # type: ignore[attr-defined]
if skip_exc is not None:
raise skip_exc
# Surface command context and IO for any caller to aid debugging
print("\n[compose command]", " ".join(up_cmd))
@@ -339,43 +538,34 @@ def test_normal_startup_no_warnings_compose(tmp_path: pathlib.Path) -> None:
"""
base_dir = tmp_path / "normal_startup"
base_dir.mkdir()
default_http_port = DEFAULT_HTTP_PORT
default_ports = (default_http_port,)
if not _port_is_free(default_http_port):
pytest.skip(
"Default NetAlertX ports are already bound on this host; "
"skipping compose normal-startup validation."
)
project_name = "netalertx-normal"
default_dir = base_dir / "default"
default_dir.mkdir()
default_project = "netalertx-normal-default"
# Create compose file mirroring production docker-compose.yml
compose_config = copy.deepcopy(COMPOSE_CONFIGS["normal_startup"])
service = compose_config["services"]["netalertx"]
default_compose_file = _write_normal_startup_compose(default_dir, default_project, None)
default_result = _run_docker_compose(
default_compose_file,
default_project,
timeout=60,
detached=True,
post_up=_make_port_check_hook(default_ports),
)
default_output = _assert_ports_ready(default_result, default_project, default_ports)
data_volume_name = f"{project_name}_data"
service["volumes"][0]["source"] = data_volume_name
service.setdefault("environment", {})
service["environment"].update({
"PORT": "22111",
"GRAPHQL_PORT": "22112",
})
compose_config["volumes"] = {
data_volume_name: {},
}
compose_file = base_dir / "docker-compose.yml"
with open(compose_file, 'w') as f:
yaml.dump(compose_config, f)
# Run docker compose
result = _run_docker_compose(compose_file, project_name, detached=True)
clean_output = ANSI_ESCAPE.sub("", result.output)
# Check that startup completed without critical issues and mounts table shows success
assert "Startup pre-checks" in clean_output
assert "" not in clean_output
assert "Startup pre-checks" in default_output
assert "" not in default_output
data_line = ""
data_parts: list[str] = []
for line in clean_output.splitlines():
for line in default_output.splitlines():
if CONTAINER_PATHS['data'] not in line or '|' not in line:
continue
parts = [segment.strip() for segment in line.split('|')]
@@ -387,15 +577,46 @@ def test_normal_startup_no_warnings_compose(tmp_path: pathlib.Path) -> None:
break
assert data_line, "Expected /data row in mounts table"
assert data_parts[1] == CONTAINER_PATHS['data'], f"Unexpected path column in /data row: {data_parts}"
assert data_parts[2] == "" and data_parts[3] == "", (
f"Unexpected mount row values for /data: {data_parts[2:4]}"
)
parts = data_parts
assert parts[1] == CONTAINER_PATHS['data'], f"Unexpected path column in /data row: {parts}"
assert parts[2] == "" and parts[3] == "", f"Unexpected mount row values for /data: {parts[2:4]}"
assert "Write permission denied" not in default_output
assert "CRITICAL" not in default_output
assert "⚠️" not in default_output
# Ensure no critical errors or permission problems surfaced
assert "Write permission denied" not in clean_output
assert "CRITICAL" not in clean_output
assert "⚠️" not in clean_output
custom_http, custom_graphql = _select_custom_ports()
assert custom_http != default_http_port
custom_ports = (custom_http,)
custom_dir = base_dir / "custom"
custom_dir.mkdir()
custom_project = "netalertx-normal-custom"
custom_compose_file = _write_normal_startup_compose(
custom_dir,
custom_project,
{
"PORT": str(custom_http),
"GRAPHQL_PORT": str(custom_graphql),
},
)
custom_result = _run_docker_compose(
custom_compose_file,
custom_project,
timeout=60,
detached=True,
post_up=_make_port_check_hook(custom_ports),
)
custom_output = _assert_ports_ready(custom_result, custom_project, custom_ports)
assert "Startup pre-checks" in custom_output
assert "" not in custom_output
assert "Write permission denied" not in custom_output
assert "CRITICAL" not in custom_output
assert "⚠️" not in custom_output
def test_ram_disk_mount_analysis_compose(tmp_path: pathlib.Path) -> None: