Fix ssh stack review regressions

This commit is contained in:
Lawrence Chen 2026-03-13 04:14:52 -07:00
parent 19b59cae37
commit 2e6856ff2f
27 changed files with 1270 additions and 506 deletions

View file

@ -0,0 +1,124 @@
from __future__ import annotations
import re
import secrets
import time
from cmux import cmux, cmuxError
ANSI_ESCAPE_RE = re.compile(r"\x1b\[[0-?]*[ -/]*[@-~]")
OSC_ESCAPE_RE = re.compile(r"\x1b\][^\x07\x1b]*(?:\x07|\x1b\\)")
def must(cond: bool, msg: str) -> None:
if not cond:
raise cmuxError(msg)
def wait_for(pred, timeout_s: float = 5.0, step_s: float = 0.05) -> None:
deadline = time.time() + timeout_s
while time.time() < deadline:
if pred():
return
time.sleep(step_s)
raise cmuxError("Timed out waiting for condition")
def clean_line(raw: str) -> str:
line = OSC_ESCAPE_RE.sub("", raw)
line = ANSI_ESCAPE_RE.sub("", line)
line = line.replace("\r", "")
return line.strip()
def layout_panes(client: cmux) -> list[dict]:
layout_payload = client.layout_debug() or {}
layout = layout_payload.get("layout") or {}
return list(layout.get("panes") or [])
def pane_extent(client: cmux, pane_id: str, axis: str) -> float:
panes = layout_panes(client)
for pane in panes:
pid = str(pane.get("paneId") or pane.get("pane_id") or "")
if pid != pane_id:
continue
frame = pane.get("frame") or {}
return float(frame.get(axis) or 0.0)
raise cmuxError(f"Pane {pane_id} missing from debug layout panes: {panes}")
def workspace_panes(client: cmux, workspace_id: str) -> list[tuple[str, bool, int]]:
payload = client._call("pane.list", {"workspace_id": workspace_id}) or {}
out: list[tuple[str, bool, int]] = []
for row in payload.get("panes") or []:
out.append((
str(row.get("id") or ""),
bool(row.get("focused")),
int(row.get("surface_count") or 0),
))
return out
def focused_pane_id(client: cmux, workspace_id: str) -> str:
for pane_id, focused, _surface_count in workspace_panes(client, workspace_id):
if focused:
return pane_id
raise cmuxError("No focused pane found")
def surface_scrollback_text(client: cmux, workspace_id: str, surface_id: str) -> str:
payload = client._call(
"surface.read_text",
{"workspace_id": workspace_id, "surface_id": surface_id, "scrollback": True},
) or {}
return str(payload.get("text") or "")
def surface_scrollback_lines(client: cmux, workspace_id: str, surface_id: str) -> list[str]:
text = surface_scrollback_text(client, workspace_id, surface_id)
return [clean_line(raw) for raw in text.splitlines()]
def scrollback_has_exact_line(client: cmux, workspace_id: str, surface_id: str, token: str) -> bool:
return token in surface_scrollback_lines(client, workspace_id, surface_id)
def wait_for_surface_command_roundtrip(client: cmux, workspace_id: str, surface_id: str) -> None:
for _attempt in range(1, 5):
token = f"CMUX_READY_{secrets.token_hex(4)}"
client.send_surface(surface_id, f"echo {token}\n")
try:
wait_for(
lambda: scrollback_has_exact_line(client, workspace_id, surface_id, token),
timeout_s=2.5,
)
return
except cmuxError:
time.sleep(0.1)
raise cmuxError("Timed out waiting for surface command roundtrip")
def pick_resize_direction_for_pane(client: cmux, pane_ids: list[str], target_pane: str) -> tuple[str, str]:
panes = [p for p in layout_panes(client) if str(p.get("paneId") or p.get("pane_id") or "") in pane_ids]
if len(panes) < 2:
raise cmuxError(f"Need >=2 panes for resize test, got {panes}")
def x_of(p: dict) -> float:
return float((p.get("frame") or {}).get("x") or 0.0)
def y_of(p: dict) -> float:
return float((p.get("frame") or {}).get("y") or 0.0)
x_span = max(x_of(p) for p in panes) - min(x_of(p) for p in panes)
y_span = max(y_of(p) for p in panes) - min(y_of(p) for p in panes)
if x_span >= y_span:
left_pane = min(panes, key=x_of)
left_id = str(left_pane.get("paneId") or left_pane.get("pane_id") or "")
return ("right" if target_pane == left_id else "left"), "width"
top_pane = min(panes, key=y_of)
top_id = str(top_pane.get("paneId") or top_pane.get("pane_id") or "")
return ("down" if target_pane == top_id else "up"), "height"

View file

@ -67,6 +67,7 @@ def main() -> int:
LAST_SOCKET_HINT_PATH.write_text(f"{SOCKET_PATH}\n", encoding="utf-8")
auto_env = dict(os.environ)
auto_env.pop("CMUX_SOCKET_PATH", None)
auto_env.pop("CMUX_SOCKET", None)
auto_ping = _run([cli, "ping"], env=auto_env)
auto_ping_out = _merged_output(auto_ping).lower()
_must(auto_ping.returncode == 0, f"debug auto socket resolution should succeed: {auto_ping.returncode} {auto_ping_out!r}")

View file

@ -0,0 +1,113 @@
#!/usr/bin/env python3
"""Regression: sidebar metadata CLI commands still dispatch through the public cmux CLI."""
from __future__ import annotations
import glob
import os
import subprocess
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent))
from cmux import cmux, cmuxError
SOCKET_PATH = os.environ.get("CMUX_SOCKET", "/tmp/cmux-debug.sock")
def _must(cond: bool, msg: str) -> None:
if not cond:
raise cmuxError(msg)
def _find_cli_binary() -> str:
env_cli = os.environ.get("CMUXTERM_CLI")
if env_cli and os.path.isfile(env_cli) and os.access(env_cli, os.X_OK):
return env_cli
fixed = os.path.expanduser("~/Library/Developer/Xcode/DerivedData/cmux-tests-v2/Build/Products/Debug/cmux")
if os.path.isfile(fixed) and os.access(fixed, os.X_OK):
return fixed
candidates = glob.glob(os.path.expanduser("~/Library/Developer/Xcode/DerivedData/**/Build/Products/Debug/cmux"), recursive=True)
candidates += glob.glob("/tmp/cmux-*/Build/Products/Debug/cmux")
candidates = [p for p in candidates if os.path.isfile(p) and os.access(p, os.X_OK)]
if not candidates:
raise cmuxError("Could not locate cmux CLI binary; set CMUXTERM_CLI")
candidates.sort(key=lambda p: os.path.getmtime(p), reverse=True)
return candidates[0]
def _run_cli(cli: str, args: list[str]) -> str:
proc = subprocess.run(
[cli, "--socket", SOCKET_PATH, *args],
capture_output=True,
text=True,
check=False,
env=dict(os.environ),
)
if proc.returncode != 0:
merged = f"{proc.stdout}\n{proc.stderr}".strip()
raise cmuxError(f"CLI failed ({' '.join(args)}): {merged}")
return proc.stdout.strip()
def main() -> int:
cli = _find_cli_binary()
workspace_id = ""
try:
with cmux(SOCKET_PATH) as client:
workspace_id = client.new_workspace()
status_response = _run_cli(cli, ["set-status", "build", "compiling", "--workspace", workspace_id])
_must(status_response.startswith("OK"), f"set-status should succeed, got {status_response!r}")
status_list = _run_cli(cli, ["list-status", "--workspace", workspace_id])
_must("build=compiling" in status_list, f"list-status should include the inserted status entry: {status_list!r}")
progress_response = _run_cli(cli, ["set-progress", "0.5", "--workspace", workspace_id, "--label", "Building"])
_must(progress_response.startswith("OK"), f"set-progress should succeed, got {progress_response!r}")
log_response = _run_cli(cli, ["log", "--workspace", workspace_id, "--", "ship it"])
_must(log_response.startswith("OK"), f"log should succeed, got {log_response!r}")
log_list = _run_cli(cli, ["list-log", "--workspace", workspace_id, "--limit", "5"])
_must("ship it" in log_list, f"list-log should include the appended log entry: {log_list!r}")
sidebar_state = _run_cli(cli, ["sidebar-state", "--workspace", workspace_id])
_must("status_count=1" in sidebar_state, f"sidebar-state should include the status entry count: {sidebar_state!r}")
_must("progress=0.50 Building" in sidebar_state, f"sidebar-state should include the progress label: {sidebar_state!r}")
_must("[info] ship it" in sidebar_state, f"sidebar-state should include the recent log entry: {sidebar_state!r}")
clear_status_response = _run_cli(cli, ["clear-status", "build", "--workspace", workspace_id])
_must(clear_status_response.startswith("OK"), f"clear-status should succeed, got {clear_status_response!r}")
clear_progress_response = _run_cli(cli, ["clear-progress", "--workspace", workspace_id])
_must(clear_progress_response.startswith("OK"), f"clear-progress should succeed, got {clear_progress_response!r}")
clear_log_response = _run_cli(cli, ["clear-log", "--workspace", workspace_id])
_must(clear_log_response.startswith("OK"), f"clear-log should succeed, got {clear_log_response!r}")
cleared_sidebar_state = _run_cli(cli, ["sidebar-state", "--workspace", workspace_id])
_must("status_count=0" in cleared_sidebar_state, f"sidebar-state should clear status entries: {cleared_sidebar_state!r}")
_must("progress=none" in cleared_sidebar_state, f"sidebar-state should clear progress: {cleared_sidebar_state!r}")
_must("log_count=0" in cleared_sidebar_state, f"sidebar-state should clear log entries: {cleared_sidebar_state!r}")
client.close_workspace(workspace_id)
workspace_id = ""
finally:
if workspace_id:
try:
with cmux(SOCKET_PATH) as cleanup_client:
cleanup_client.close_workspace(workspace_id)
except Exception:
pass
print("PASS: sidebar metadata CLI commands dispatch and update workspace state")
return 0
if __name__ == "__main__":
raise SystemExit(main())

View file

@ -4,7 +4,6 @@
from __future__ import annotations
import os
import re
import secrets
import shlex
import shutil
@ -15,97 +14,20 @@ from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent))
from cmux import cmux, cmuxError
from pane_resize_test_support import (
clean_line as _clean_line,
focused_pane_id as _focused_pane_id,
pane_extent as _pane_extent,
pick_resize_direction_for_pane as _pick_resize_direction_for_pane,
scrollback_has_exact_line as _scrollback_has_exact_line,
surface_scrollback_text as _surface_scrollback_text,
wait_for as _wait_for,
wait_for_surface_command_roundtrip as _wait_for_surface_command_roundtrip,
workspace_panes as _workspace_panes,
)
DEFAULT_SOCKET_PATHS = ["/tmp/cmux-debug.sock", "/tmp/cmux.sock"]
ANSI_ESCAPE_RE = re.compile(r"\x1b\[[0-?]*[ -/]*[@-~]")
OSC_ESCAPE_RE = re.compile(r"\x1b\][^\x07\x1b]*(?:\x07|\x1b\\)")
def _must(cond: bool, msg: str) -> None:
if not cond:
raise cmuxError(msg)
def _wait_for(pred, timeout_s: float = 5.0, step_s: float = 0.05) -> None:
deadline = time.time() + timeout_s
while time.time() < deadline:
if pred():
return
time.sleep(step_s)
raise cmuxError("Timed out waiting for condition")
def _clean_line(raw: str) -> str:
line = OSC_ESCAPE_RE.sub("", raw)
line = ANSI_ESCAPE_RE.sub("", line)
line = line.replace("\r", "")
return line.strip()
def _layout_panes(client: cmux) -> list[dict]:
layout_payload = client.layout_debug() or {}
layout = layout_payload.get("layout") or {}
return list(layout.get("panes") or [])
def _pane_extent(client: cmux, pane_id: str, axis: str) -> float:
panes = _layout_panes(client)
for pane in panes:
pid = str(pane.get("paneId") or pane.get("pane_id") or "")
if pid != pane_id:
continue
frame = pane.get("frame") or {}
return float(frame.get(axis) or 0.0)
raise cmuxError(f"Pane {pane_id} missing from debug layout panes: {panes}")
def _workspace_panes(client: cmux, workspace_id: str) -> list[tuple[str, bool, int]]:
payload = client._call("pane.list", {"workspace_id": workspace_id}) or {}
out: list[tuple[str, bool, int]] = []
for row in payload.get("panes") or []:
out.append((
str(row.get("id") or ""),
bool(row.get("focused")),
int(row.get("surface_count") or 0),
))
return out
def _focused_pane_id(client: cmux, workspace_id: str) -> str:
for pane_id, focused, _surface_count in _workspace_panes(client, workspace_id):
if focused:
return pane_id
raise cmuxError("No focused pane found")
def _surface_scrollback_text(client: cmux, workspace_id: str, surface_id: str) -> str:
payload = client._call(
"surface.read_text",
{"workspace_id": workspace_id, "surface_id": surface_id, "scrollback": True},
) or {}
return str(payload.get("text") or "")
def _scrollback_has_exact_line(client: cmux, workspace_id: str, surface_id: str, token: str) -> bool:
text = _surface_scrollback_text(client, workspace_id, surface_id)
lines = [_clean_line(raw) for raw in text.splitlines()]
return token in lines
def _wait_for_surface_command_roundtrip(client: cmux, workspace_id: str, surface_id: str) -> None:
for _attempt in range(1, 5):
token = f"CMUX_READY_{secrets.token_hex(4)}"
client.send_surface(surface_id, f"echo {token}\n")
try:
_wait_for(
lambda: _scrollback_has_exact_line(client, workspace_id, surface_id, token),
timeout_s=2.5,
)
return
except cmuxError:
time.sleep(0.1)
raise cmuxError("Timed out waiting for surface command roundtrip")
def _has_exact_marker_lines(
@ -120,30 +42,6 @@ def _has_exact_marker_lines(
return start_marker in lines and end_marker in lines
def _pick_resize_direction_for_pane(client: cmux, pane_ids: list[str], target_pane: str) -> tuple[str, str]:
panes = [p for p in _layout_panes(client) if str(p.get("paneId") or p.get("pane_id") or "") in pane_ids]
if len(panes) < 2:
raise cmuxError(f"Need >=2 panes for resize test, got {panes}")
def x_of(p: dict) -> float:
return float((p.get("frame") or {}).get("x") or 0.0)
def y_of(p: dict) -> float:
return float((p.get("frame") or {}).get("y") or 0.0)
x_span = max(x_of(p) for p in panes) - min(x_of(p) for p in panes)
y_span = max(y_of(p) for p in panes) - min(y_of(p) for p in panes)
if x_span >= y_span:
left_pane = min(panes, key=x_of)
left_id = str(left_pane.get("paneId") or left_pane.get("pane_id") or "")
return ("right" if target_pane == left_id else "left"), "width"
top_pane = min(panes, key=y_of)
top_id = str(top_pane.get("paneId") or top_pane.get("pane_id") or "")
return ("down" if target_pane == top_id else "up"), "height"
def _extract_segment_lines(
text: str,
start_marker: str,

View file

@ -4,132 +4,26 @@
from __future__ import annotations
import os
import re
import secrets
import sys
import time
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent))
from cmux import cmux, cmuxError
from pane_resize_test_support import (
focused_pane_id as _focused_pane_id,
pane_extent as _pane_extent,
pick_resize_direction_for_pane as _pick_resize_direction_for_pane,
scrollback_has_exact_line as _scrollback_has_exact_line,
surface_scrollback_lines as _surface_scrollback_lines,
wait_for as _wait_for,
wait_for_surface_command_roundtrip as _wait_for_surface_command_roundtrip,
workspace_panes as _workspace_panes,
must as _must,
)
DEFAULT_SOCKET_PATHS = ["/tmp/cmux-debug.sock", "/tmp/cmux.sock"]
ANSI_ESCAPE_RE = re.compile(r"\x1b\[[0-?]*[ -/]*[@-~]")
OSC_ESCAPE_RE = re.compile(r"\x1b\][^\x07\x1b]*(?:\x07|\x1b\\)")
def _must(cond: bool, msg: str) -> None:
if not cond:
raise cmuxError(msg)
def _wait_for(pred, timeout_s: float = 5.0, step_s: float = 0.05) -> None:
deadline = time.time() + timeout_s
while time.time() < deadline:
if pred():
return
time.sleep(step_s)
raise cmuxError("Timed out waiting for condition")
def _clean_line(raw: str) -> str:
line = OSC_ESCAPE_RE.sub("", raw)
line = ANSI_ESCAPE_RE.sub("", line)
line = line.replace("\r", "")
return line.strip()
def _layout_panes(client: cmux) -> list[dict]:
layout_payload = client.layout_debug() or {}
layout = layout_payload.get("layout") or {}
return list(layout.get("panes") or [])
def _pane_extent(client: cmux, pane_id: str, axis: str) -> float:
panes = _layout_panes(client)
for pane in panes:
pid = str(pane.get("paneId") or pane.get("pane_id") or "")
if pid != pane_id:
continue
frame = pane.get("frame") or {}
return float(frame.get(axis) or 0.0)
raise cmuxError(f"Pane {pane_id} missing from debug layout panes: {panes}")
def _workspace_panes(client: cmux, workspace_id: str) -> list[tuple[str, bool, int]]:
payload = client._call("pane.list", {"workspace_id": workspace_id}) or {}
out: list[tuple[str, bool, int]] = []
for row in payload.get("panes") or []:
out.append((
str(row.get("id") or ""),
bool(row.get("focused")),
int(row.get("surface_count") or 0),
))
return out
def _focused_pane_id(client: cmux, workspace_id: str) -> str:
for pane_id, focused, _surface_count in _workspace_panes(client, workspace_id):
if focused:
return pane_id
raise cmuxError("No focused pane found")
def _surface_scrollback_text(client: cmux, workspace_id: str, surface_id: str) -> str:
payload = client._call(
"surface.read_text",
{"workspace_id": workspace_id, "surface_id": surface_id, "scrollback": True},
) or {}
return str(payload.get("text") or "")
def _surface_scrollback_lines(client: cmux, workspace_id: str, surface_id: str) -> list[str]:
text = _surface_scrollback_text(client, workspace_id, surface_id)
return [_clean_line(raw) for raw in text.splitlines()]
def _scrollback_has_exact_line(client: cmux, workspace_id: str, surface_id: str, token: str) -> bool:
return token in _surface_scrollback_lines(client, workspace_id, surface_id)
def _wait_for_surface_command_roundtrip(client: cmux, workspace_id: str, surface_id: str) -> None:
for _attempt in range(1, 5):
token = f"CMUX_READY_{secrets.token_hex(4)}"
client.send_surface(surface_id, f"echo {token}\n")
try:
_wait_for(
lambda: _scrollback_has_exact_line(client, workspace_id, surface_id, token),
timeout_s=2.5,
)
return
except cmuxError:
time.sleep(0.1)
raise cmuxError("Timed out waiting for surface command roundtrip")
def _pick_resize_direction_for_pane(client: cmux, pane_ids: list[str], target_pane: str) -> tuple[str, str]:
panes = [p for p in _layout_panes(client) if str(p.get("paneId") or p.get("pane_id") or "") in pane_ids]
if len(panes) < 2:
raise cmuxError(f"Need >=2 panes for resize test, got {panes}")
def x_of(p: dict) -> float:
return float((p.get("frame") or {}).get("x") or 0.0)
def y_of(p: dict) -> float:
return float((p.get("frame") or {}).get("y") or 0.0)
x_span = max(x_of(p) for p in panes) - min(x_of(p) for p in panes)
y_span = max(y_of(p) for p in panes) - min(y_of(p) for p in panes)
if x_span >= y_span:
left_pane = min(panes, key=x_of)
left_id = str(left_pane.get("paneId") or left_pane.get("pane_id") or "")
return ("right" if target_pane == left_id else "left"), "width"
top_pane = min(panes, key=y_of)
top_id = str(top_pane.get("paneId") or top_pane.get("pane_id") or "")
return ("down" if target_pane == top_id else "up"), "height"
def _run_once(socket_path: str) -> int:

View file

@ -74,15 +74,6 @@ def _extract_control_path(ssh_command: str) -> str:
return match.group(1) if match else ""
def _has_ssh_option_key(options: list[str], key: str) -> bool:
lowered_key = key.lower()
for option in options:
token = re.split(r"[=\s]+", str(option).strip(), maxsplit=1)[0].strip().lower()
if token == lowered_key:
return True
return False
def _read_any_terminal_text(client: cmux, workspace_id: str, timeout: float = 8.0) -> str | None:
deadline = time.time() + timeout
last_exc: Exception | None = None
@ -187,12 +178,36 @@ def main() -> int:
_must("-o ControlPersist=600" in ssh_command, f"ssh command should keep master alive for reuse: {ssh_command!r}")
_must("ControlPath=/tmp/cmux-ssh-" in ssh_command, f"ssh command should use shared control path template: {ssh_command!r}")
_must(
(
f"RemoteCommand=export PATH=\"$HOME/.cmux/bin:$PATH\"; "
f"export CMUX_SOCKET_PATH={remote_socket_addr}; "
"exec \"${SHELL:-/bin/zsh}\" -l"
) in ssh_command,
f"cmux ssh should use -o RemoteCommand for PATH/bootstrap env pinning (not positional command): {ssh_command!r}",
"RemoteCommand=/bin/sh -lc " in ssh_command,
f"cmux ssh should route RemoteCommand through /bin/sh for non-POSIX login shells: {ssh_command!r}",
)
_must(
f"export PATH=\"$HOME/.cmux/bin:$PATH\"" in ssh_command,
f"cmux ssh should still prepend the remote cmux wrapper path: {ssh_command!r}",
)
_must(
f"export CMUX_SOCKET_PATH=127.0.0.1:{int(remote_relay_port)}" in ssh_command,
f"cmux ssh should still pin the relay socket path in RemoteCommand: {ssh_command!r}",
)
_must(
"case \"${CMUX_LOGIN_SHELL##*/}\" in" in ssh_command,
f"cmux ssh should still branch on the user's login shell when possible: {ssh_command!r}",
)
_must(
"cat > \"$cmux_shell_dir/.zshrc\"" in ssh_command,
f"cmux ssh should install a post-rc zsh wrapper so the remote cmux wrapper stays first on PATH: {ssh_command!r}",
)
_must(
"cmux_wait_attempt=0" in ssh_command,
f"cmux ssh should wait briefly for the authenticated relay before showing the remote shell: {ssh_command!r}",
)
_must(
"exec \"$CMUX_LOGIN_SHELL\" --rcfile \"$cmux_shell_dir/.bashrc\" -i" in ssh_command,
f"cmux ssh should still support bash login shells with a post-rc wrapper file: {ssh_command!r}",
)
_must(
"exec \"$CMUX_LOGIN_SHELL\" -i" in ssh_command,
f"cmux ssh should still hand off to the user's interactive login shell when possible: {ssh_command!r}",
)
listed_row = None
@ -221,18 +236,17 @@ def main() -> int:
str(proxy.get("state") or "") in {"connecting", "ready", "error", "unavailable"},
f"remote payload should include proxy state metadata: {remote}",
)
remote_ssh_options = [str(item) for item in (remote.get("ssh_options") or [])]
_must(
_has_ssh_option_key(remote_ssh_options, "ControlMaster"),
f"workspace.remote.configure should include ControlMaster default: {remote}",
"ssh_options" not in remote,
f"workspace remote payload should not expose raw ssh_options: {remote}",
)
_must(
_has_ssh_option_key(remote_ssh_options, "ControlPersist"),
f"workspace.remote.configure should include ControlPersist default: {remote}",
"identity_file" not in remote,
f"workspace remote payload should not expose identity_file: {remote}",
)
_must(
_has_ssh_option_key(remote_ssh_options, "ControlPath"),
f"workspace.remote.configure should include ControlPath default: {remote}",
bool(remote.get("has_ssh_options")) is True,
f"workspace remote payload should indicate ssh options are configured: {remote}",
)
# Regression: cmux ssh should launch through initial_command, not visibly type a giant command into the shell.
terminal_text = _read_any_terminal_text(client, workspace_id)
@ -352,10 +366,13 @@ def main() -> int:
f"ssh command should not force default StrictHostKeyChecking when override is supplied: {ssh_command_strict_override!r}",
)
strict_override_remote = payload_strict_override.get("remote") or {}
strict_override_options = [str(item) for item in (strict_override_remote.get("ssh_options") or [])]
_must(
any(item.lower() == "stricthostkeychecking=no" for item in strict_override_options),
f"workspace.remote.configure should preserve explicit StrictHostKeyChecking override: {strict_override_remote}",
"ssh_options" not in strict_override_remote,
f"workspace remote payload should not expose raw ssh_options: {strict_override_remote}",
)
_must(
bool(strict_override_remote.get("has_ssh_options")) is True,
f"workspace remote payload should indicate ssh options are configured: {strict_override_remote}",
)
payload_case_override = _run_cli_json(
@ -420,38 +437,13 @@ def main() -> int:
f"ssh command should include exactly one ControlPath when lowercase override is supplied: {ssh_command_case_override!r}",
)
case_override_remote = payload_case_override.get("remote") or {}
case_override_options = [str(item) for item in (case_override_remote.get("ssh_options") or [])]
_must(
any(item.lower() == "stricthostkeychecking=no" for item in case_override_options),
f"workspace.remote.configure should preserve lowercase StrictHostKeyChecking override: {case_override_remote}",
"ssh_options" not in case_override_remote,
f"workspace remote payload should not expose raw ssh_options: {case_override_remote}",
)
_must(
not any(item.lower() == "stricthostkeychecking=accept-new" for item in case_override_options),
f"workspace.remote.configure should not inject default StrictHostKeyChecking when lowercase override is supplied: {case_override_remote}",
)
_must(
any(item.lower() == "controlmaster=no" for item in case_override_options),
f"workspace.remote.configure should preserve lowercase ControlMaster override: {case_override_remote}",
)
_must(
not any(item.lower() == "controlmaster=auto" for item in case_override_options),
f"workspace.remote.configure should not inject default ControlMaster when lowercase override is supplied: {case_override_remote}",
)
_must(
any(item.lower() == "controlpersist=0" for item in case_override_options),
f"workspace.remote.configure should preserve lowercase ControlPersist override: {case_override_remote}",
)
_must(
not any(item.lower() == "controlpersist=600" for item in case_override_options),
f"workspace.remote.configure should not inject default ControlPersist when lowercase override is supplied: {case_override_remote}",
)
_must(
any(item.lower() == "controlpath=/tmp/cmux-ssh-%c-custom" for item in case_override_options),
f"workspace.remote.configure should preserve lowercase ControlPath override: {case_override_remote}",
)
_must(
sum(1 for item in case_override_options if item.lower().startswith("controlpath=")) == 1,
f"workspace.remote.configure should include exactly one ControlPath when lowercase override is supplied: {case_override_remote}",
bool(case_override_remote.get("has_ssh_options")) is True,
f"workspace remote payload should indicate ssh options are configured: {case_override_remote}",
)
payload3 = _run_cli_json(
@ -475,7 +467,7 @@ def main() -> int:
except Exception:
pass
invalid_proxy_port_workspace = client._call("workspace.create", {"initial_command": "echo invalid-local-proxy-port"}) or {}
invalid_proxy_port_workspace = client._call("workspace.create", {}) or {}
workspace_id_invalid_proxy_port = str(invalid_proxy_port_workspace.get("workspace_id") or "")
if workspace_id_invalid_proxy_port:
workspaces_to_close.append(workspace_id_invalid_proxy_port)

View file

@ -207,7 +207,7 @@ def main() -> int:
remote_relay_port = payload.get("remote_relay_port")
_must(remote_relay_port is not None, f"cmux ssh output missing remote_relay_port: {payload}")
remote_relay_port = int(remote_relay_port)
_must(49152 <= remote_relay_port <= 65535, f"remote_relay_port should be in ephemeral range: {remote_relay_port}")
_must(1 <= remote_relay_port <= 65535, f"remote_relay_port should be a valid TCP port: {remote_relay_port}")
remote_socket_addr = f"127.0.0.1:{remote_relay_port}"
startup_cmd = str(payload.get("ssh_startup_command") or "")
_must(
@ -288,7 +288,7 @@ def main() -> int:
remote_relay_port_2 = payload_2.get("remote_relay_port")
_must(remote_relay_port_2 is not None, f"second cmux ssh output missing remote_relay_port: {payload_2}")
remote_relay_port_2 = int(remote_relay_port_2)
_must(49152 <= remote_relay_port_2 <= 65535, f"second remote_relay_port out of range: {remote_relay_port_2}")
_must(1 <= remote_relay_port_2 <= 65535, f"second remote_relay_port should be a valid TCP port: {remote_relay_port_2}")
_must(
remote_relay_port_2 != remote_relay_port,
f"relay ports should differ per workspace: {remote_relay_port_2} vs {remote_relay_port}",

View file

@ -70,6 +70,8 @@ def _as_int(value: object, field: str) -> int:
if isinstance(value, int):
return value
if isinstance(value, float):
if not value.is_integer():
raise cmuxError(f"{field} should be an integer value, got float {value!r}")
return int(value)
raise cmuxError(f"{field} has unexpected type {type(value).__name__}: {value!r}")

View file

@ -185,10 +185,10 @@ def main() -> int:
host = f"root@{DOCKER_SSH_HOST}"
_wait_for_ssh(host, host_ssh_port, key_path)
conflict_port = _find_free_loopback_port()
conflict_listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
conflict_listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
conflict_listener.bind(("127.0.0.1", conflict_port))
conflict_listener.bind(("127.0.0.1", 0))
conflict_port = int(conflict_listener.getsockname()[1])
conflict_listener.listen(1)
with cmux(SOCKET_PATH) as client:

View file

@ -131,6 +131,10 @@ def main() -> int:
second = _run_cli_json(cli, ["ssh", SSH_HOST])
second_workspace_id = _workspace_id_from_payload(client, second)
_must(bool(second_workspace_id), f"second cmux ssh output missing workspace_id: {second}")
_must(
second_workspace_id != first_workspace_id,
f"second cmux ssh should create a distinct workspace: {first_workspace_id} vs {second_workspace_id}",
)
workspace_ids.append(second_workspace_id)
_wait_remote_ready(client, second_workspace_id)