Strengthen socket access integration coverage

Make tests/test_socket_access.py deterministic across environments and add password-mode auth integration checks (v1 and v2).
This commit is contained in:
Lawrence Chen 2026-02-22 01:08:25 -08:00
parent a168182f54
commit a205028b2e

View file

@ -20,6 +20,8 @@ import subprocess
import sys
import tempfile
import time
import json
import glob
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from cmux import cmux, cmuxError
@ -69,32 +71,66 @@ def _raw_send(sock, command: str, timeout: float = 3.0) -> str:
def _find_app():
r = subprocess.run(
["find", "/Users/cmux/Library/Developer/Xcode/DerivedData",
"-path", "*/Build/Products/Debug/cmux DEV.app", "-print", "-quit"],
capture_output=True, text=True, timeout=10
)
return r.stdout.strip()
explicit = os.environ.get("CMUX_APP_PATH")
if explicit and os.path.exists(explicit):
return explicit
candidates = []
home = os.path.expanduser("~")
candidates.extend(glob.glob(os.path.join(
home, "Library/Developer/Xcode/DerivedData/*/Build/Products/Debug/cmux DEV.app"
)))
candidates.extend(glob.glob("/tmp/cmux-*/Build/Products/Debug/cmux DEV*.app"))
candidates.extend(glob.glob("/private/tmp/cmux-*/Build/Products/Debug/cmux DEV*.app"))
candidates = [p for p in candidates if os.path.exists(p)]
if not candidates:
return ""
candidates.sort(key=os.path.getmtime, reverse=True)
return candidates[0]
def _wait_for_socket(socket_path: str, timeout: float = 10.0) -> bool:
deadline = time.time() + timeout
while time.time() < deadline:
if os.path.exists(socket_path):
return True
try:
sock = _raw_connect(socket_path, timeout=0.3)
sock.close()
return True
except Exception:
pass
time.sleep(0.5)
return False
def _kill_cmux():
subprocess.run(["pkill", "-x", "cmux DEV"], capture_output=True)
def _kill_cmux(app_path: str = None):
if app_path:
exe = os.path.join(app_path, "Contents/MacOS/cmux DEV")
subprocess.run(["pkill", "-f", exe], capture_output=True)
else:
subprocess.run(["pkill", "-x", "cmux DEV"], capture_output=True)
time.sleep(1.5)
def _launch_cmux(app_path: str, socket_path: str, mode: str = None):
def _launch_cmux(app_path: str, socket_path: str, mode: str = None, extra_env: dict = None):
if os.path.exists(socket_path):
try:
os.unlink(socket_path)
except OSError:
pass
env_args = []
if mode:
env_args = ["--env", f"CMUX_SOCKET_MODE={mode}"]
launch_env = {
"CMUX_SOCKET_PATH": socket_path,
"CMUX_ALLOW_SOCKET_OVERRIDE": "1",
}
if extra_env:
launch_env.update(extra_env)
for key, value in launch_env.items():
env_args.extend(["--env", f"{key}={value}"])
subprocess.Popen(["open", "-a", app_path] + env_args)
if not _wait_for_socket(socket_path):
raise RuntimeError(f"Socket {socket_path} not created after launch")
@ -249,8 +285,8 @@ fi
f.write(hook_line)
# Kill existing cmux, launch in cmuxOnly mode (default)
_kill_cmux()
_launch_cmux(app_path, socket_path)
_kill_cmux(app_path)
_launch_cmux(app_path, socket_path, mode="cmuxOnly")
# Wait for marker (the shell sources .zprofile on startup)
for _ in range(40):
@ -305,7 +341,7 @@ def test_allowall_mode_works(socket_path: str, app_path: str) -> TestResult:
"""Verify CMUX_SOCKET_MODE=allowAll bypasses ancestry check."""
result = TestResult("allowAll mode allows external")
try:
_kill_cmux()
_kill_cmux(app_path)
_launch_cmux(app_path, socket_path, mode="allowAll")
sock = _raw_connect(socket_path)
@ -321,6 +357,124 @@ def test_allowall_mode_works(socket_path: str, app_path: str) -> TestResult:
return result
def test_password_mode_requires_auth(socket_path: str, app_path: str) -> TestResult:
"""Verify password mode rejects unauthenticated commands."""
result = TestResult("Password mode requires auth")
password = f"cmux-pass-{os.getpid()}"
try:
_kill_cmux(app_path)
_launch_cmux(
app_path,
socket_path,
mode="password",
extra_env={"CMUX_SOCKET_PASSWORD": password}
)
sock = _raw_connect(socket_path)
response = _raw_send(sock, "ping")
sock.close()
if "Authentication required" in response:
result.success("Unauthenticated command rejected in password mode")
else:
result.failure(f"Unexpected response without auth: {response!r}")
except Exception as e:
result.failure(f"{type(e).__name__}: {e}")
return result
def test_password_mode_v1_auth_flow(socket_path: str, app_path: str) -> TestResult:
"""Verify v1 auth command unlocks the connection only with correct password."""
result = TestResult("Password mode v1 auth flow")
password = f"cmux-pass-{os.getpid()}"
try:
_kill_cmux(app_path)
_launch_cmux(
app_path,
socket_path,
mode="password",
extra_env={"CMUX_SOCKET_PASSWORD": password}
)
sock = _raw_connect(socket_path)
try:
wrong = _raw_send(sock, "auth wrong-password")
if "Invalid password" not in wrong:
result.failure(f"Expected invalid password error, got: {wrong!r}")
return result
ok = _raw_send(sock, f"auth {password}")
if "OK: Authenticated" not in ok:
result.failure(f"Expected auth success, got: {ok!r}")
return result
pong = _raw_send(sock, "ping")
if pong != "PONG":
result.failure(f"Expected PONG after auth, got: {pong!r}")
return result
finally:
sock.close()
result.success("v1 auth gate works")
except Exception as e:
result.failure(f"{type(e).__name__}: {e}")
return result
def test_password_mode_v2_auth_flow(socket_path: str, app_path: str) -> TestResult:
"""Verify v2 auth.login unlocks subsequent v2 requests."""
result = TestResult("Password mode v2 auth flow")
password = f"cmux-pass-{os.getpid()}"
try:
_kill_cmux(app_path)
_launch_cmux(
app_path,
socket_path,
mode="password",
extra_env={"CMUX_SOCKET_PASSWORD": password}
)
sock = _raw_connect(socket_path)
try:
unauth = _raw_send(sock, json.dumps({
"id": "1",
"method": "system.ping",
"params": {}
}))
unauth_obj = json.loads(unauth)
if unauth_obj.get("error", {}).get("code") != "auth_required":
result.failure(f"Expected auth_required, got: {unauth!r}")
return result
login = _raw_send(sock, json.dumps({
"id": "2",
"method": "auth.login",
"params": {"password": password}
}))
login_obj = json.loads(login)
if not login_obj.get("ok"):
result.failure(f"Expected auth.login success, got: {login!r}")
return result
pong = _raw_send(sock, json.dumps({
"id": "3",
"method": "system.ping",
"params": {}
}))
pong_obj = json.loads(pong)
pong_value = pong_obj.get("result", {}).get("pong")
if pong_value is not True:
result.failure(f"Expected pong=true after auth.login, got: {pong!r}")
return result
finally:
sock.close()
result.success("v2 auth.login gate works")
except Exception as e:
result.failure(f"{type(e).__name__}: {e}")
return result
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
@ -337,7 +491,11 @@ def run_tests():
return 1
print(f"App: {app_path}")
socket_path = _find_socket_path()
socket_path = f"/tmp/cmux-test-socket-access-{os.getpid()}.sock"
try:
os.unlink(socket_path)
except OSError:
pass
print(f"Socket: {socket_path}")
print()
@ -356,9 +514,9 @@ def run_tests():
print("-" * 50)
# Ensure cmux is running in cmuxOnly mode
_kill_cmux()
_kill_cmux(app_path)
print(" Launching cmux in cmuxOnly mode...")
_launch_cmux(app_path, socket_path)
_launch_cmux(app_path, socket_path, mode="cmuxOnly")
run_test(test_external_rejected, socket_path)
run_test(test_connection_closed_after_reject, socket_path)
@ -380,9 +538,18 @@ def run_tests():
run_test(test_allowall_mode_works, socket_path, app_path)
print()
# ── Phase 4: password mode auth gate ──
print("Phase 4: password mode — auth required + login flow")
print("-" * 50)
run_test(test_password_mode_requires_auth, socket_path, app_path)
run_test(test_password_mode_v1_auth_flow, socket_path, app_path)
run_test(test_password_mode_v2_auth_flow, socket_path, app_path)
print()
# ── Cleanup: leave cmux in cmuxOnly mode ──
_kill_cmux()
_launch_cmux(app_path, socket_path)
_kill_cmux(app_path)
_launch_cmux(app_path, socket_path, mode="cmuxOnly")
# ── Summary ──
print("=" * 60)