* Socket access control: process ancestry check + file permissions Redesign socket control modes from (off, notifications, full) to (off, cmuxOnly, allowAll): - cmuxOnly (default): uses LOCAL_PEERPID + sysctl process tree walk to verify the connecting process is a descendant of cmux. External processes (SSH, other terminals) are rejected. - allowAll: hidden mode accessible only via CMUX_SOCKET_MODE=allowAll env var, skips ancestry check. Legacy "full"/"notifications" env values map here for backward compat. - off: disables socket entirely. Security hardening: - Server: chmod 0600 on socket after bind (owner-only access) - CLI: stat() ownership check before connect (reject fake sockets) Removes per-command allow-list (isCommandAllowed) — once a process passes the ancestry check, all commands are available. Includes migration for persisted UserDefaults values and env var aliases (cmux_only, cmux-only, allow_all, allow-all). * Add /sync-branch skill for submodule + main sync
413 lines
13 KiB
Python
413 lines
13 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Tests for socket access control (process ancestry check).
|
|
|
|
In cmuxOnly mode (default), only processes descended from the cmux
|
|
app process can connect. External processes (e.g., SSH) are rejected.
|
|
|
|
Test strategy:
|
|
Phase 1: cmuxOnly — external processes get rejected
|
|
Phase 2: cmuxOnly — internal process CAN connect (inject via shell rc)
|
|
Phase 3: allowAll env override — existing test commands still work
|
|
|
|
Usage:
|
|
python3 test_socket_access.py
|
|
"""
|
|
|
|
import os
|
|
import socket
|
|
import subprocess
|
|
import sys
|
|
import tempfile
|
|
import time
|
|
|
|
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
|
from cmux import cmux, cmuxError
|
|
|
|
|
|
class TestResult:
|
|
def __init__(self, name: str):
|
|
self.name = name
|
|
self.passed = False
|
|
self.message = ""
|
|
|
|
def success(self, msg: str = ""):
|
|
self.passed = True
|
|
self.message = msg
|
|
|
|
def failure(self, msg: str):
|
|
self.passed = False
|
|
self.message = msg
|
|
|
|
|
|
def _find_socket_path():
|
|
return cmux().socket_path
|
|
|
|
|
|
def _raw_connect(socket_path: str, timeout: float = 3.0):
|
|
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
|
sock.settimeout(timeout)
|
|
sock.connect(socket_path)
|
|
return sock
|
|
|
|
|
|
def _raw_send(sock, command: str, timeout: float = 3.0) -> str:
|
|
sock.sendall((command + "\n").encode())
|
|
data = b""
|
|
deadline = time.time() + timeout
|
|
while time.time() < deadline:
|
|
try:
|
|
chunk = sock.recv(4096)
|
|
if not chunk:
|
|
break
|
|
data += chunk
|
|
if b"\n" in data:
|
|
break
|
|
except socket.timeout:
|
|
break
|
|
return data.decode().strip()
|
|
|
|
|
|
def _find_app():
|
|
r = subprocess.run(
|
|
["find", "/Users/cmux/Library/Developer/Xcode/DerivedData",
|
|
"-path", "*/Build/Products/Debug/cmux DEV.app", "-print", "-quit"],
|
|
capture_output=True, text=True, timeout=10
|
|
)
|
|
return r.stdout.strip()
|
|
|
|
|
|
def _wait_for_socket(socket_path: str, timeout: float = 10.0) -> bool:
|
|
deadline = time.time() + timeout
|
|
while time.time() < deadline:
|
|
if os.path.exists(socket_path):
|
|
return True
|
|
time.sleep(0.5)
|
|
return False
|
|
|
|
|
|
def _kill_cmux():
|
|
subprocess.run(["pkill", "-x", "cmux DEV"], capture_output=True)
|
|
time.sleep(1.5)
|
|
|
|
|
|
def _launch_cmux(app_path: str, socket_path: str, mode: str = None):
|
|
env_args = []
|
|
if mode:
|
|
env_args = ["--env", f"CMUX_SOCKET_MODE={mode}"]
|
|
subprocess.Popen(["open", "-a", app_path] + env_args)
|
|
if not _wait_for_socket(socket_path):
|
|
raise RuntimeError(f"Socket {socket_path} not created after launch")
|
|
time.sleep(8)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# External rejection tests (Phase 1)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_external_rejected(socket_path: str) -> TestResult:
|
|
result = TestResult("External process rejected")
|
|
try:
|
|
sock = _raw_connect(socket_path)
|
|
try:
|
|
response = _raw_send(sock, "ping")
|
|
if "Access denied" in response:
|
|
result.success(f"Correctly rejected")
|
|
elif response == "PONG":
|
|
result.failure("External allowed — ancestry check not working")
|
|
else:
|
|
result.failure(f"Unexpected: {response!r}")
|
|
finally:
|
|
sock.close()
|
|
except Exception as e:
|
|
result.failure(f"{type(e).__name__}: {e}")
|
|
return result
|
|
|
|
|
|
def test_connection_closed_after_reject(socket_path: str) -> TestResult:
|
|
result = TestResult("Connection closed after rejection")
|
|
try:
|
|
sock = _raw_connect(socket_path)
|
|
try:
|
|
_raw_send(sock, "ping")
|
|
try:
|
|
sock.sendall(b"list_tabs\n")
|
|
time.sleep(0.3)
|
|
data = sock.recv(4096)
|
|
if data:
|
|
result.failure(f"Got response after rejection: {data.decode().strip()!r}")
|
|
else:
|
|
result.success("Connection properly closed")
|
|
except (BrokenPipeError, ConnectionResetError, OSError):
|
|
result.success("Connection properly closed")
|
|
finally:
|
|
sock.close()
|
|
except Exception as e:
|
|
result.failure(f"{type(e).__name__}: {e}")
|
|
return result
|
|
|
|
|
|
def test_rapid_reconnect(socket_path: str) -> TestResult:
|
|
result = TestResult("Rapid reconnect all rejected")
|
|
try:
|
|
for i in range(20):
|
|
try:
|
|
sock = _raw_connect(socket_path, timeout=2.0)
|
|
response = _raw_send(sock, "ping", timeout=1.0)
|
|
sock.close()
|
|
except (BrokenPipeError, ConnectionResetError, OSError):
|
|
# Server closed connection before we could read — counts as rejection
|
|
continue
|
|
if "Access denied" not in response and "ERROR" not in response:
|
|
result.failure(f"Iteration {i}: not rejected: {response!r}")
|
|
return result
|
|
result.success("All 20 rejected")
|
|
except Exception as e:
|
|
result.failure(f"{type(e).__name__}: {e}")
|
|
return result
|
|
|
|
|
|
def test_subprocess_rejected(socket_path: str) -> TestResult:
|
|
result = TestResult("Subprocess of external rejected")
|
|
try:
|
|
script = f"""
|
|
import socket, sys, time
|
|
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
|
sock.settimeout(3)
|
|
sock.connect("{socket_path}")
|
|
sock.sendall(b"ping\\n")
|
|
data = b""
|
|
deadline = time.time() + 3
|
|
while time.time() < deadline:
|
|
try:
|
|
chunk = sock.recv(4096)
|
|
if not chunk: break
|
|
data += chunk
|
|
if b"\\n" in data: break
|
|
except socket.timeout: break
|
|
sock.close()
|
|
resp = data.decode().strip()
|
|
if "Access denied" in resp or "ERROR" in resp:
|
|
print("REJECTED"); sys.exit(0)
|
|
else:
|
|
print("ALLOWED:" + resp); sys.exit(1)
|
|
"""
|
|
proc = subprocess.run(
|
|
[sys.executable, "-c", script],
|
|
capture_output=True, text=True, timeout=10
|
|
)
|
|
if proc.returncode == 0 and "REJECTED" in proc.stdout:
|
|
result.success("Child process rejected")
|
|
else:
|
|
result.failure(f"exit={proc.returncode} out={proc.stdout!r}")
|
|
except Exception as e:
|
|
result.failure(f"{type(e).__name__}: {e}")
|
|
return result
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Internal process test (Phase 2)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_internal_process_allowed(socket_path: str, app_path: str) -> TestResult:
|
|
"""
|
|
Verify a cmux-spawned terminal process CAN connect in cmuxOnly mode.
|
|
Inject a test via the shell rc file, then launch cmux in cmuxOnly mode.
|
|
The shell (a descendant of cmux) runs the test on startup.
|
|
"""
|
|
result = TestResult("Internal process can connect (cmuxOnly)")
|
|
marker = os.path.join(tempfile.gettempdir(), f"cmux_internal_{os.getpid()}")
|
|
hook_file = os.path.join(tempfile.gettempdir(), f"cmux_rc_hook_{os.getpid()}.sh")
|
|
zprofile_path = os.path.expanduser("~/.zprofile")
|
|
|
|
try:
|
|
for f in [marker, hook_file]:
|
|
if os.path.exists(f):
|
|
os.unlink(f)
|
|
|
|
# Write test script: connects to socket, sends ping, writes result
|
|
with open(hook_file, "w") as f:
|
|
f.write(f"""#!/bin/bash
|
|
# One-shot test hook — self-removes after running
|
|
RESULT=$(echo "ping" | nc -U "{socket_path}" 2>/dev/null | head -1)
|
|
if [ "$RESULT" = "PONG" ]; then
|
|
echo "OK" > "{marker}"
|
|
else
|
|
echo "FAIL:$RESULT" > "{marker}"
|
|
fi
|
|
""")
|
|
os.chmod(hook_file, 0o755)
|
|
|
|
# Append hook to .zprofile (runs on terminal startup)
|
|
zprofile_backup = None
|
|
if os.path.exists(zprofile_path):
|
|
with open(zprofile_path) as f:
|
|
zprofile_backup = f.read()
|
|
|
|
hook_line = f'\n[ -f "{hook_file}" ] && bash "{hook_file}" && rm -f "{hook_file}"\n'
|
|
with open(zprofile_path, "a") as f:
|
|
f.write(hook_line)
|
|
|
|
# Kill existing cmux, launch in cmuxOnly mode (default)
|
|
_kill_cmux()
|
|
_launch_cmux(app_path, socket_path)
|
|
|
|
# Wait for marker (the shell sources .zprofile on startup)
|
|
for _ in range(40):
|
|
if os.path.exists(marker):
|
|
break
|
|
time.sleep(0.5)
|
|
|
|
if not os.path.exists(marker):
|
|
result.failure("Marker not created — hook didn't run in terminal")
|
|
return result
|
|
|
|
with open(marker) as f:
|
|
content = f.read().strip()
|
|
|
|
if content == "OK":
|
|
result.success("Internal process pinged socket successfully in cmuxOnly mode")
|
|
else:
|
|
result.failure(f"Internal process got: {content!r}")
|
|
|
|
except Exception as e:
|
|
result.failure(f"{type(e).__name__}: {e}")
|
|
finally:
|
|
# Restore .zprofile
|
|
if zprofile_backup is not None:
|
|
with open(zprofile_path, "w") as f:
|
|
f.write(zprofile_backup)
|
|
elif os.path.exists(zprofile_path):
|
|
# Remove the hook line we added
|
|
with open(zprofile_path) as f:
|
|
content = f.read()
|
|
content = content.replace(hook_line, "")
|
|
if content.strip():
|
|
with open(zprofile_path, "w") as f:
|
|
f.write(content)
|
|
else:
|
|
os.unlink(zprofile_path)
|
|
|
|
for f in [marker, hook_file]:
|
|
try:
|
|
os.unlink(f)
|
|
except OSError:
|
|
pass
|
|
|
|
return result
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# allowAll mode test (Phase 3)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_allowall_mode_works(socket_path: str, app_path: str) -> TestResult:
|
|
"""Verify CMUX_SOCKET_MODE=allowAll bypasses ancestry check."""
|
|
result = TestResult("allowAll mode allows external")
|
|
try:
|
|
_kill_cmux()
|
|
_launch_cmux(app_path, socket_path, mode="allowAll")
|
|
|
|
sock = _raw_connect(socket_path)
|
|
response = _raw_send(sock, "ping")
|
|
sock.close()
|
|
|
|
if response == "PONG":
|
|
result.success("External process allowed in allowAll mode")
|
|
else:
|
|
result.failure(f"Unexpected response: {response!r}")
|
|
except Exception as e:
|
|
result.failure(f"{type(e).__name__}: {e}")
|
|
return result
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Main
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def run_tests():
|
|
print("=" * 60)
|
|
print("cmux Socket Access Control Tests")
|
|
print("=" * 60)
|
|
print()
|
|
|
|
app_path = _find_app()
|
|
if not app_path:
|
|
print("Error: Could not find cmux DEV.app in DerivedData")
|
|
return 1
|
|
print(f"App: {app_path}")
|
|
|
|
socket_path = _find_socket_path()
|
|
print(f"Socket: {socket_path}")
|
|
print()
|
|
|
|
results = []
|
|
|
|
def run_test(test_fn, *args):
|
|
name = test_fn.__name__.replace("test_", "").replace("_", " ").title()
|
|
print(f" Testing {name}...")
|
|
r = test_fn(*args)
|
|
results.append(r)
|
|
status = "\u2705" if r.passed else "\u274c"
|
|
print(f" {status} {r.message}")
|
|
|
|
# ── Phase 1: cmuxOnly — external rejection ──
|
|
print("Phase 1: cmuxOnly mode — external rejection")
|
|
print("-" * 50)
|
|
|
|
# Ensure cmux is running in cmuxOnly mode
|
|
_kill_cmux()
|
|
print(" Launching cmux in cmuxOnly mode...")
|
|
_launch_cmux(app_path, socket_path)
|
|
|
|
run_test(test_external_rejected, socket_path)
|
|
run_test(test_connection_closed_after_reject, socket_path)
|
|
run_test(test_rapid_reconnect, socket_path)
|
|
run_test(test_subprocess_rejected, socket_path)
|
|
print()
|
|
|
|
# ── Phase 2: cmuxOnly — internal process CAN connect ──
|
|
print("Phase 2: cmuxOnly mode — internal process allowed")
|
|
print("-" * 50)
|
|
|
|
run_test(test_internal_process_allowed, socket_path, app_path)
|
|
print()
|
|
|
|
# ── Phase 3: allowAll env override ──
|
|
print("Phase 3: allowAll mode — env override bypasses check")
|
|
print("-" * 50)
|
|
|
|
run_test(test_allowall_mode_works, socket_path, app_path)
|
|
print()
|
|
|
|
# ── Cleanup: leave cmux in cmuxOnly mode ──
|
|
_kill_cmux()
|
|
_launch_cmux(app_path, socket_path)
|
|
|
|
# ── Summary ──
|
|
print("=" * 60)
|
|
print("Summary")
|
|
print("=" * 60)
|
|
|
|
passed = sum(1 for r in results if r.passed)
|
|
total = len(results)
|
|
|
|
for r in results:
|
|
status = "\u2705 PASS" if r.passed else "\u274c FAIL"
|
|
print(f" {r.name}: {status}")
|
|
if not r.passed and r.message:
|
|
print(f" {r.message}")
|
|
|
|
print()
|
|
print(f"Passed: {passed}/{total}")
|
|
|
|
if passed == total:
|
|
print("\n\U0001f389 All tests passed!")
|
|
return 0
|
|
else:
|
|
print(f"\n\u26a0\ufe0f {total - passed} test(s) failed")
|
|
return 1
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(run_tests())
|