diff --git a/tests_v2/test_ssh_remote_cli_metadata.py b/tests_v2/test_ssh_remote_cli_metadata.py index 7042979b..59eee991 100644 --- a/tests_v2/test_ssh_remote_cli_metadata.py +++ b/tests_v2/test_ssh_remote_cli_metadata.py @@ -113,6 +113,7 @@ def main() -> int: workspace_id_strict_override = "" workspace_id_case_override = "" workspace_id_invalid_proxy_port = "" + workspaces_to_close: list[str] = [] with cmux(SOCKET_PATH) as client: try: payload = _run_cli_json( @@ -120,6 +121,8 @@ def main() -> int: ["ssh", "127.0.0.1", "--port", "1", "--name", "ssh-meta-test"], ) workspace_id = str(payload.get("workspace_id") or "") + if workspace_id: + workspaces_to_close.append(workspace_id) workspace_ref = str(payload.get("workspace_ref") or "") if not workspace_id and workspace_ref.startswith("workspace:"): listed = client._call("workspace.list", {}) or {} @@ -272,6 +275,8 @@ def main() -> int: ["ssh", "127.0.0.1", "--port", "1"], ) workspace_id_without_name = str(payload2.get("workspace_id") or "") + if workspace_id_without_name: + workspaces_to_close.append(workspace_id_without_name) ssh_command_without_name = str(payload2.get("ssh_command") or "") workspace_ref_without_name = str(payload2.get("workspace_ref") or "") if not workspace_id_without_name and workspace_ref_without_name.startswith("workspace:"): @@ -320,6 +325,8 @@ def main() -> int: ], ) workspace_id_strict_override = str(payload_strict_override.get("workspace_id") or "") + if workspace_id_strict_override: + workspaces_to_close.append(workspace_id_strict_override) workspace_ref_strict_override = str(payload_strict_override.get("workspace_ref") or "") if not workspace_id_strict_override and workspace_ref_strict_override.startswith("workspace:"): listed_override = client._call("workspace.list", {}) or {} @@ -367,6 +374,8 @@ def main() -> int: ], ) workspace_id_case_override = str(payload_case_override.get("workspace_id") or "") + if workspace_id_case_override: + workspaces_to_close.append(workspace_id_case_override) workspace_ref_case_override = str(payload_case_override.get("workspace_ref") or "") if not workspace_id_case_override and workspace_ref_case_override.startswith("workspace:"): listed_case_override = client._call("workspace.list", {}) or {} @@ -459,6 +468,8 @@ def main() -> int: f"cmux ssh should merge existing shell features when present: {payload3!r}", ) workspace_id3 = str(payload3.get("workspace_id") or "") + if workspace_id3: + workspaces_to_close.append(workspace_id3) if workspace_id3: try: client.close_workspace(workspace_id3) @@ -467,6 +478,8 @@ def main() -> int: invalid_proxy_port_workspace = client._call("workspace.create", {"initial_command": "echo invalid-local-proxy-port"}) or {} workspace_id_invalid_proxy_port = str(invalid_proxy_port_workspace.get("workspace_id") or "") + if workspace_id_invalid_proxy_port: + workspaces_to_close.append(workspace_id_invalid_proxy_port) _must(bool(workspace_id_invalid_proxy_port), f"workspace.create missing workspace_id: {invalid_proxy_port_workspace}") configured_with_string_ports = client._call( @@ -599,31 +612,14 @@ def main() -> int: client.close_workspace(workspace_id_invalid_proxy_port) except Exception: pass - workspace_id_invalid_proxy_port = "" + else: + workspace_id_invalid_proxy_port = "" finally: - if workspace_id: + for workspace_id_to_close in dict.fromkeys(workspaces_to_close): + if not workspace_id_to_close: + continue try: - client.close_workspace(workspace_id) - except Exception: - pass - if workspace_id_without_name: - try: - client.close_workspace(workspace_id_without_name) - except Exception: - pass - if workspace_id_strict_override: - try: - client.close_workspace(workspace_id_strict_override) - except Exception: - pass - if workspace_id_case_override: - try: - client.close_workspace(workspace_id_case_override) - except Exception: - pass - if workspace_id_invalid_proxy_port: - try: - client.close_workspace(workspace_id_invalid_proxy_port) + client.close_workspace(workspace_id_to_close) except Exception: pass diff --git a/tests_v2/test_ssh_remote_docker_forwarding.py b/tests_v2/test_ssh_remote_docker_forwarding.py index 2af14d95..cc7ec4b0 100644 --- a/tests_v2/test_ssh_remote_docker_forwarding.py +++ b/tests_v2/test_ssh_remote_docker_forwarding.py @@ -613,10 +613,9 @@ def main() -> int: os.access(local_cached_binary, os.X_OK), f"local daemon cache artifact must be executable: {local_cached_binary}", ) - local_version = _run([str(local_cached_binary), "version"], check=True).stdout.strip() _must( - daemon_version in local_version, - f"local cached daemon binary version mismatch: expected {daemon_version!r}, got {local_version!r}", + daemon_version in local_cached_binary.parts, + f"local cached daemon binary path should encode daemon version {daemon_version!r}: {local_cached_binary}", ) local_sha256 = _local_file_sha256(local_cached_binary) remote_sha256 = _remote_binary_sha256(host, host_ssh_port, key_path, remote_path) diff --git a/tests_v2/test_ssh_remote_interactive_cmux_command_regression.py b/tests_v2/test_ssh_remote_interactive_cmux_command_regression.py index 01497627..040207d7 100644 --- a/tests_v2/test_ssh_remote_interactive_cmux_command_regression.py +++ b/tests_v2/test_ssh_remote_interactive_cmux_command_regression.py @@ -113,24 +113,48 @@ def _wait_text(client: cmux, surface_id: str, token: str, timeout: float = 12.0) raise cmuxError(f"Timed out waiting for {token!r} in surface {surface_id}: {last[-1200:]!r}") +def _wait_shell_ready(client: cmux, surface_id: str, timeout: float = 20.0) -> None: + token = f"__CMUX_SHELL_READY_{secrets.token_hex(6)}__" + client.send_surface(surface_id, f"printf '{token}'; echo") + client.send_key_surface(surface_id, "enter") + _wait_text(client, surface_id, token, timeout=timeout) + + def _run_remote_shell_command(client: cmux, surface_id: str, command: str, timeout: float = 12.0) -> tuple[int, str, str]: token = f"__CMUX_REMOTE_CMD_{secrets.token_hex(6)}__" + start_marker = f"{token}:START" + status_marker = f"{token}:STATUS" + end_marker = f"{token}:END" client.send_surface( surface_id, ( - f"__cmux_out=$({command} 2>&1); " + f"printf '{start_marker}'; echo; " + f"{command}; " "__cmux_status=$?; " - f"printf '{token}:%s:' \"$__cmux_status\"; " - "printf '%s' \"$__cmux_out\"; " - "printf ':__CMUX_REMOTE_CMD_END__\\n'\n" + f"printf '{status_marker}:%s' \"$__cmux_status\"; echo; " + f"printf '{end_marker}'; echo" ), ) - text = _wait_text(client, surface_id, token, timeout=timeout) - pattern = re.compile(re.escape(token) + r":(\d+):(.*?):__CMUX_REMOTE_CMD_END__") + client.send_key_surface(surface_id, "enter") + deadline = time.time() + timeout + text = "" + while time.time() < deadline: + text = client.read_terminal_text(surface_id) + if ( + text.count(start_marker) >= 2 + and text.count(status_marker) >= 2 + and text.count(end_marker) >= 2 + ): + break + time.sleep(0.15) + pattern = re.compile( + re.escape(start_marker) + r"\n(.*?)" + re.escape(status_marker) + r":(\d+)\n" + re.escape(end_marker), + re.S, + ) matches = pattern.findall(text) if not matches: raise cmuxError(f"Missing command result token for {command!r}: {text[-1200:]!r}") - status_raw, output = matches[-1] + output, status_raw = matches[-1] return int(status_raw), output, text @@ -150,6 +174,7 @@ def main() -> int: _wait_remote_ready(client, workspace_id) surface_id = _wait_surface_id(client, workspace_id) + _wait_shell_ready(client, surface_id) which_status, which_output, which_text = _run_remote_shell_command(client, surface_id, "command -v cmux") _must(which_status == 0, f"`command -v cmux` failed: output={which_output!r} tail={which_text[-1200:]!r}") @@ -165,6 +190,10 @@ def main() -> int: "Socket not found at 127.0.0.1:" not in ping_text, f"interactive ssh shell still routed cmux to a unix-socket-only binary: {ping_text[-1200:]!r}", ) + _must( + "waiting for relay on 127.0.0.1:" not in ping_text and "failed to connect to 127.0.0.1:" not in ping_text, + f"`cmux ping` hit a dead ssh relay instead of the local app socket: {ping_text[-1200:]!r}", + ) notify_status, notify_output, notify_text = _run_remote_shell_command( client, @@ -179,6 +208,27 @@ def main() -> int: "Socket not found at 127.0.0.1:" not in notify_text, f"`cmux notify` still failed via wrong cmux binary: {notify_text[-1200:]!r}", ) + _must( + "waiting for relay on 127.0.0.1:" not in notify_text and "failed to connect to 127.0.0.1:" not in notify_text, + f"`cmux notify` still failed because the ssh relay listener was not running: {notify_text[-1200:]!r}", + ) + + shell_status, shell_output, shell_text = _run_remote_shell_command( + client, + surface_id, + r'''printf 'TERM=%s\n' "${TERM:-}"; printf 'TERM_PROGRAM=%s\n' "${TERM_PROGRAM:-}"; printf 'TERM_PROGRAM_VERSION=%s\n' "${TERM_PROGRAM_VERSION:-}"; printf 'GHOSTTY_SHELL_FEATURES=%s\n' "${GHOSTTY_SHELL_FEATURES:-}"; bindkey "^A"; bindkey "^K"; bindkey "^[^?"; bindkey "^[b"; bindkey "^[f"''', + ) + _must(shell_status == 0, f"ssh shell env/bindkey probe failed: output={shell_output!r} tail={shell_text[-1200:]!r}") + _must("TERM=xterm-ghostty" in shell_output, f"ssh shell lost TERM=xterm-ghostty: {shell_output!r}") + _must("TERM_PROGRAM=ghostty" in shell_output, f"ssh shell lost TERM_PROGRAM=ghostty: {shell_output!r}") + _must("GHOSTTY_SHELL_FEATURES=" in shell_output, f"ssh shell lost GHOSTTY_SHELL_FEATURES: {shell_output!r}") + _must("ssh-env" in shell_output, f"ssh shell missing ssh-env feature: {shell_output!r}") + _must("ssh-terminfo" in shell_output, f"ssh shell missing ssh-terminfo feature: {shell_output!r}") + _must('"^A" beginning-of-line' in shell_output, f"Ctrl-A binding regressed in ssh shell: {shell_output!r}") + _must('"^K" kill-line' in shell_output, f"Ctrl-K binding regressed in ssh shell: {shell_output!r}") + _must('"^[^?" backward-kill-word' in shell_output, f"Opt-Backspace binding regressed in ssh shell: {shell_output!r}") + _must('"^[b" backward-word' in shell_output, f"Opt-Left binding regressed in ssh shell: {shell_output!r}") + _must('"^[f" forward-word' in shell_output, f"Opt-Right binding regressed in ssh shell: {shell_output!r}") finally: if workspace_ids: try: diff --git a/tests_v2/test_ssh_remote_last_surface_clears_remote_state.py b/tests_v2/test_ssh_remote_last_surface_clears_remote_state.py new file mode 100644 index 00000000..91af772d --- /dev/null +++ b/tests_v2/test_ssh_remote_last_surface_clears_remote_state.py @@ -0,0 +1,259 @@ +#!/usr/bin/env python3 +"""Regression: closing the last SSH surface should clear remote workspace state.""" + +from __future__ import annotations + +import glob +import json +import os +import re +import subprocess +import sys +import time +from pathlib import Path + +sys.path.insert(0, str(Path(__file__).parent)) +from cmux import cmux, cmuxError + + +SOCKET_PATH = os.environ.get("CMUX_SOCKET", "/tmp/cmux-debug.sock") +SSH_HOST = os.environ.get("CMUX_SSH_TEST_HOST", "").strip() +SSH_PORT = os.environ.get("CMUX_SSH_TEST_PORT", "").strip() +SSH_IDENTITY = os.environ.get("CMUX_SSH_TEST_IDENTITY", "").strip() +SSH_OPTIONS_RAW = os.environ.get("CMUX_SSH_TEST_OPTIONS", "").strip() + + +def _must(cond: bool, msg: str) -> None: + if not cond: + raise cmuxError(msg) + + +def _run(cmd: list[str], *, env: dict[str, str] | None = None, check: bool = True) -> subprocess.CompletedProcess[str]: + proc = subprocess.run(cmd, capture_output=True, text=True, env=env, check=False) + if check and proc.returncode != 0: + merged = f"{proc.stdout}\n{proc.stderr}".strip() + raise cmuxError(f"Command failed ({' '.join(cmd)}): {merged}") + return proc + + +def _find_cli_binary() -> str: + env_cli = os.environ.get("CMUXTERM_CLI") + if env_cli and os.path.isfile(env_cli) and os.access(env_cli, os.X_OK): + return env_cli + + fixed = os.path.expanduser("~/Library/Developer/Xcode/DerivedData/cmux-tests-v2/Build/Products/Debug/cmux") + if os.path.isfile(fixed) and os.access(fixed, os.X_OK): + return fixed + + candidates = glob.glob(os.path.expanduser("~/Library/Developer/Xcode/DerivedData/**/Build/Products/Debug/cmux"), recursive=True) + candidates += glob.glob("/tmp/cmux-*/Build/Products/Debug/cmux") + candidates = [p for p in candidates if os.path.isfile(p) and os.access(p, os.X_OK)] + if not candidates: + raise cmuxError("Could not locate cmux CLI binary; set CMUXTERM_CLI") + candidates.sort(key=lambda p: os.path.getmtime(p), reverse=True) + return candidates[0] + + +def _run_cli_json(cli: str, args: list[str]) -> dict: + env = dict(os.environ) + env.pop("CMUX_WORKSPACE_ID", None) + env.pop("CMUX_SURFACE_ID", None) + env.pop("CMUX_TAB_ID", None) + + proc = _run([cli, "--socket", SOCKET_PATH, "--json", *args], env=env) + try: + return json.loads(proc.stdout or "{}") + except Exception as exc: # noqa: BLE001 + raise cmuxError(f"Invalid JSON output for {' '.join(args)}: {proc.stdout!r} ({exc})") + + +def _wait_for(pred, timeout_s: float = 8.0, step_s: float = 0.1) -> None: + deadline = time.time() + timeout_s + while time.time() < deadline: + if pred(): + return + time.sleep(step_s) + raise cmuxError("Timed out waiting for condition") + + +def _wait_remote_ready(client: cmux, workspace_id: str, timeout_s: float = 45.0) -> None: + deadline = time.time() + timeout_s + last_status = {} + while time.time() < deadline: + last_status = client._call("workspace.remote.status", {"workspace_id": workspace_id}) or {} + remote = last_status.get("remote") or {} + daemon = remote.get("daemon") or {} + if str(remote.get("state") or "") == "connected" and str(daemon.get("state") or "") == "ready": + return + time.sleep(0.25) + raise cmuxError(f"Remote did not become ready for {workspace_id}: {last_status}") + + +def _resolve_workspace_id(client: cmux, payload: dict, *, before_workspace_ids: set[str]) -> str: + workspace_id = str(payload.get("workspace_id") or "") + if workspace_id: + return workspace_id + + workspace_ref = str(payload.get("workspace_ref") or "") + if workspace_ref.startswith("workspace:"): + listed = client._call("workspace.list", {}) or {} + for row in listed.get("workspaces") or []: + if str(row.get("ref") or "") == workspace_ref: + resolved = str(row.get("id") or "") + if resolved: + return resolved + + current = {wid for _index, wid, _title, _focused in client.list_workspaces()} + new_ids = sorted(current - before_workspace_ids) + if len(new_ids) == 1: + return new_ids[0] + + raise cmuxError(f"Unable to resolve workspace_id from payload: {payload}") + + +def _workspace_row(client: cmux, workspace_id: str) -> dict: + rows = (client._call("workspace.list", {}) or {}).get("workspaces") or [] + for row in rows: + if str(row.get("id") or "") == workspace_id: + return row + raise cmuxError(f"workspace.list missing {workspace_id}: {rows}") + + +def _remote_session_count(client: cmux, workspace_id: str) -> int: + row = _workspace_row(client, workspace_id) + remote = row.get("remote") or {} + return int(remote.get("active_terminal_sessions") or 0) + + +def _run_surface_probe(client: cmux, surface_id: str, command: str, token_prefix: str, timeout_s: float = 12.0) -> str: + token = f"__CMUX_{token_prefix}_{int(time.time() * 1000)}__" + client.send_surface( + surface_id, + ( + f"printf '{token}:START'; echo; " + f"{command}; " + f"printf '{token}:END'; echo" + ), + ) + client.send_key_surface(surface_id, "enter") + deadline = time.time() + timeout_s + last = "" + pattern = re.compile(re.escape(token) + r":START\n(.*?)" + re.escape(token) + r":END", re.S) + while time.time() < deadline: + last = client.read_terminal_text(surface_id) + matches = pattern.findall(last) + if matches: + return matches[-1] + time.sleep(0.15) + raise cmuxError(f"Timed out waiting for probe {token!r}: {last[-1200:]!r}") + + +def _open_ssh_workspace(client: cmux, cli: str, *, name: str) -> str: + before_workspace_ids = {wid for _index, wid, _title, _focused in client.list_workspaces()} + + ssh_args = ["ssh", SSH_HOST, "--name", name] + if SSH_PORT: + ssh_args.extend(["--port", SSH_PORT]) + if SSH_IDENTITY: + ssh_args.extend(["--identity", SSH_IDENTITY]) + if SSH_OPTIONS_RAW: + for option in SSH_OPTIONS_RAW.split(","): + trimmed = option.strip() + if trimmed: + ssh_args.extend(["--ssh-option", trimmed]) + + payload = _run_cli_json(cli, ssh_args) + workspace_id = _resolve_workspace_id(client, payload, before_workspace_ids=before_workspace_ids) + _wait_remote_ready(client, workspace_id) + client.select_workspace(workspace_id) + _wait_for(lambda: client.current_workspace() == workspace_id, timeout_s=8.0) + return workspace_id + + +def main() -> int: + if not SSH_HOST: + print("SKIP: set CMUX_SSH_TEST_HOST to run ssh last-surface remote state regression") + return 0 + + cli = _find_cli_binary() + workspace_id = "" + + try: + with cmux(SOCKET_PATH) as client: + workspace_id = _open_ssh_workspace( + client, + cli, + name=f"ssh-last-surface-{int(time.time())}", + ) + + row = _workspace_row(client, workspace_id) + remote = row.get("remote") or {} + _must(bool(remote.get("enabled")) is True, f"workspace should start as remote-enabled: {row}") + _must(int(remote.get("active_terminal_sessions") or 0) == 1, f"workspace should start with one active ssh terminal session: {row}") + + surfaces = client.list_surfaces(workspace_id) + _must(len(surfaces) == 1, f"expected one initial ssh surface, got {surfaces}") + + split_surface_id = client.new_split("right") + _wait_for(lambda: len(client.list_surfaces(workspace_id)) == 2, timeout_s=10.0, step_s=0.1) + _wait_for(lambda: _remote_session_count(client, workspace_id) == 2, timeout_s=10.0, step_s=0.1) + + client.send_surface(split_surface_id, "exit") + client.send_key_surface(split_surface_id, "enter") + _wait_for(lambda: _remote_session_count(client, workspace_id) == 1, timeout_s=15.0, step_s=0.15) + + row_after_first_exit = _workspace_row(client, workspace_id) + remote_after_first_exit = row_after_first_exit.get("remote") or {} + _must(bool(remote_after_first_exit.get("enabled")) is True, f"workspace should stay remote while one ssh terminal remains: {row_after_first_exit}") + + remaining_surface_id = next( + surface_id + for _index, surface_id, _focused in client.list_surfaces(workspace_id) + if surface_id != split_surface_id + ) + client.send_surface(remaining_surface_id, "exit") + client.send_key_surface(remaining_surface_id, "enter") + + def _remote_cleared() -> bool: + row_now = _workspace_row(client, workspace_id) + remote_now = row_now.get("remote") or {} + if bool(remote_now.get("enabled")): + return False + surfaces_now = client.list_surfaces(workspace_id) + return len(surfaces_now) == 2 + + _wait_for(_remote_cleared, timeout_s=15.0, step_s=0.15) + + final_row = _workspace_row(client, workspace_id) + final_remote = final_row.get("remote") or {} + _must(bool(final_remote.get("enabled")) is False, f"workspace remote metadata should clear after last ssh surface closes: {final_row}") + _must(str(final_remote.get("state") or "") == "disconnected", f"workspace should end disconnected after remote metadata clears: {final_row}") + _must(int(final_remote.get("active_terminal_sessions") or 0) == 0, f"workspace should report zero active ssh terminal sessions after last ssh surface closes: {final_row}") + + local_surface_ids = [surface_id for _index, surface_id, _focused in client.list_surfaces(workspace_id)] + _must(len(local_surface_ids) == 2, f"expected both panes to remain as local terminals after ssh exits, got {local_surface_ids}") + for idx, surface_id in enumerate(local_surface_ids): + socket_output = _run_surface_probe( + client, + surface_id, + r'''printf '%s' "${CMUX_SOCKET_PATH:-}"''', + f"SSH_LAST_SURFACE_SOCKET_{idx}", + ).strip() + _must( + not socket_output.startswith("127.0.0.1:"), + f"surface {surface_id} should be local after clearing remote state, got CMUX_SOCKET_PATH={socket_output!r}", + ) + finally: + if workspace_id: + try: + with cmux(SOCKET_PATH) as cleanup_client: + cleanup_client._call("workspace.close", {"workspace_id": workspace_id}) + except Exception: + pass + + print("PASS: exiting all ssh panes clears remote workspace state while fallback local panes remain local") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/tests_v2/test_ssh_remote_shortcuts_stay_remote.py b/tests_v2/test_ssh_remote_shortcuts_stay_remote.py new file mode 100644 index 00000000..fa5d9199 --- /dev/null +++ b/tests_v2/test_ssh_remote_shortcuts_stay_remote.py @@ -0,0 +1,281 @@ +#!/usr/bin/env python3 +"""Regression: new tabs and splits from an ssh terminal must stay on the remote shell.""" + +from __future__ import annotations + +import glob +import json +import os +import re +import secrets +import subprocess +import sys +import time +from pathlib import Path + +sys.path.insert(0, str(Path(__file__).parent)) +from cmux import cmux, cmuxError + + +SOCKET_PATH = os.environ.get("CMUX_SOCKET", "/tmp/cmux-debug.sock") +SSH_HOST = os.environ.get("CMUX_SSH_TEST_HOST", "").strip() +SSH_PORT = os.environ.get("CMUX_SSH_TEST_PORT", "").strip() +SSH_IDENTITY = os.environ.get("CMUX_SSH_TEST_IDENTITY", "").strip() +SSH_OPTIONS_RAW = os.environ.get("CMUX_SSH_TEST_OPTIONS", "").strip() + + +def _must(cond: bool, msg: str) -> None: + if not cond: + raise cmuxError(msg) + + +def _run(cmd: list[str], *, env: dict[str, str] | None = None, check: bool = True) -> subprocess.CompletedProcess[str]: + proc = subprocess.run(cmd, capture_output=True, text=True, env=env, check=False) + if check and proc.returncode != 0: + merged = f"{proc.stdout}\n{proc.stderr}".strip() + raise cmuxError(f"Command failed ({' '.join(cmd)}): {merged}") + return proc + + +def _find_cli_binary() -> str: + env_cli = os.environ.get("CMUXTERM_CLI") + if env_cli and os.path.isfile(env_cli) and os.access(env_cli, os.X_OK): + return env_cli + + fixed = os.path.expanduser("~/Library/Developer/Xcode/DerivedData/cmux-tests-v2/Build/Products/Debug/cmux") + if os.path.isfile(fixed) and os.access(fixed, os.X_OK): + return fixed + + candidates = glob.glob(os.path.expanduser("~/Library/Developer/Xcode/DerivedData/**/Build/Products/Debug/cmux"), recursive=True) + candidates += glob.glob("/tmp/cmux-*/Build/Products/Debug/cmux") + candidates = [p for p in candidates if os.path.isfile(p) and os.access(p, os.X_OK)] + if not candidates: + raise cmuxError("Could not locate cmux CLI binary; set CMUXTERM_CLI") + candidates.sort(key=lambda p: os.path.getmtime(p), reverse=True) + return candidates[0] + + +def _run_cli_json(cli: str, args: list[str]) -> dict: + env = dict(os.environ) + env.pop("CMUX_WORKSPACE_ID", None) + env.pop("CMUX_SURFACE_ID", None) + env.pop("CMUX_TAB_ID", None) + + proc = _run([cli, "--socket", SOCKET_PATH, "--json", *args], env=env) + try: + return json.loads(proc.stdout or "{}") + except Exception as exc: # noqa: BLE001 + raise cmuxError(f"Invalid JSON output for {' '.join(args)}: {proc.stdout!r} ({exc})") + + +def _wait_for(pred, timeout_s: float = 8.0, step_s: float = 0.1) -> None: + deadline = time.time() + timeout_s + while time.time() < deadline: + if pred(): + return + time.sleep(step_s) + raise cmuxError("Timed out waiting for condition") + + +def _wait_remote_ready(client: cmux, workspace_id: str, timeout_s: float = 45.0) -> None: + deadline = time.time() + timeout_s + last_status = {} + while time.time() < deadline: + last_status = client._call("workspace.remote.status", {"workspace_id": workspace_id}) or {} + remote = last_status.get("remote") or {} + daemon = remote.get("daemon") or {} + if str(remote.get("state") or "") == "connected" and str(daemon.get("state") or "") == "ready": + return + time.sleep(0.25) + raise cmuxError(f"Remote did not become ready for {workspace_id}: {last_status}") + + +def _resolve_workspace_id(client: cmux, payload: dict, *, before_workspace_ids: set[str]) -> str: + workspace_id = str(payload.get("workspace_id") or "") + if workspace_id: + return workspace_id + + workspace_ref = str(payload.get("workspace_ref") or "") + if workspace_ref.startswith("workspace:"): + listed = client._call("workspace.list", {}) or {} + for row in listed.get("workspaces") or []: + if str(row.get("ref") or "") == workspace_ref: + resolved = str(row.get("id") or "") + if resolved: + return resolved + + current = {wid for _index, wid, _title, _focused in client.list_workspaces()} + new_ids = sorted(current - before_workspace_ids) + if len(new_ids) == 1: + return new_ids[0] + + raise cmuxError(f"Unable to resolve workspace_id from payload: {payload}") + + +def _focused_surface_id(client: cmux) -> str: + ident = client.identify() + focused = ident.get("focused") or {} + surface_id = str(focused.get("surface_id") or "") + if not surface_id: + raise cmuxError(f"Missing focused surface in identify payload: {ident}") + return surface_id + + +def _run_remote_shell_probe(client: cmux, surface_id: str, probe_label: str) -> str: + token = f"__CMUX_REMOTE_SOCKET_{probe_label}_{secrets.token_hex(4)}__" + client.send_surface( + surface_id, + ( + f"__cmux_socket_path=\"${{CMUX_SOCKET_PATH:-}}\"; " + f"printf '{token}:%s:__CMUX_REMOTE_SOCKET_END__\\n' \"$__cmux_socket_path\"\n" + ), + ) + deadline = time.time() + 15.0 + last = "" + pattern = re.compile(re.escape(token) + r":(.*?):__CMUX_REMOTE_SOCKET_END__") + while time.time() < deadline: + last = client.read_terminal_text(surface_id) + matches = pattern.findall(last) + if matches: + for candidate in reversed(matches): + cleaned = candidate.strip() + if cleaned and cleaned != "%s": + return cleaned + time.sleep(0.15) + raise cmuxError(f"Timed out waiting for socket token {token!r}: {last[-1200:]!r}") + + +def _assert_remote_socket_path(client: cmux, surface_id: str, shortcut_name: str) -> None: + socket_path = _run_remote_shell_probe(client, surface_id, shortcut_name) + _must( + socket_path.startswith("127.0.0.1:"), + f"{shortcut_name} should keep the new terminal on the ssh relay, got CMUX_SOCKET_PATH={socket_path!r}", + ) + + +def _open_ssh_workspace(client: cmux, cli: str, *, name: str) -> str: + before_workspace_ids = {wid for _index, wid, _title, _focused in client.list_workspaces()} + + ssh_args = ["ssh", SSH_HOST, "--name", name] + if SSH_PORT: + ssh_args.extend(["--port", SSH_PORT]) + if SSH_IDENTITY: + ssh_args.extend(["--identity", SSH_IDENTITY]) + if SSH_OPTIONS_RAW: + for option in SSH_OPTIONS_RAW.split(","): + trimmed = option.strip() + if trimmed: + ssh_args.extend(["--ssh-option", trimmed]) + + payload = _run_cli_json(cli, ssh_args) + workspace_id = _resolve_workspace_id(client, payload, before_workspace_ids=before_workspace_ids) + _wait_remote_ready(client, workspace_id) + client.select_workspace(workspace_id) + _wait_for(lambda: client.current_workspace() == workspace_id, timeout_s=8.0) + return workspace_id + + +def _assert_shortcut_creates_remote_terminal( + client: cmux, + workspace_id: str, + shortcut: str, + shortcut_name: str, + *, + expect_new_pane: bool, +) -> None: + before_surfaces = {sid for _index, sid, _focused in client.list_surfaces(workspace_id)} + before_pane_count = len(client.list_panes()) + + client.activate_app() + client.simulate_app_active() + client.simulate_shortcut(shortcut) + + _wait_for( + lambda: len({sid for _index, sid, _focused in client.list_surfaces(workspace_id)} - before_surfaces) == 1, + timeout_s=12.0, + ) + + if expect_new_pane: + _wait_for(lambda: len(client.list_panes()) >= before_pane_count + 1, timeout_s=12.0) + + after_surfaces = {sid for _index, sid, _focused in client.list_surfaces(workspace_id)} + new_surface_ids = sorted(after_surfaces - before_surfaces) + _must(len(new_surface_ids) == 1, f"{shortcut_name} should create exactly one new surface: {new_surface_ids}") + + focused_surface_id = _focused_surface_id(client) + _must( + focused_surface_id == new_surface_ids[0], + f"{shortcut_name} should focus the new terminal surface: focused={focused_surface_id!r} new={new_surface_ids[0]!r}", + ) + _assert_remote_socket_path(client, focused_surface_id, shortcut_name) + + +def main() -> int: + if not SSH_HOST: + print("SKIP: set CMUX_SSH_TEST_HOST to run ssh shortcut inheritance regression") + return 0 + + cli = _find_cli_binary() + workspace_ids: list[str] = [] + + try: + with cmux(SOCKET_PATH) as client: + workspace_id = _open_ssh_workspace( + client, + cli, + name=f"ssh-shortcut-cmdt-{secrets.token_hex(4)}", + ) + workspace_ids.append(workspace_id) + _assert_shortcut_creates_remote_terminal( + client, + workspace_id, + "cmd+t", + "cmd+t", + expect_new_pane=False, + ) + + workspace_id = _open_ssh_workspace( + client, + cli, + name=f"ssh-shortcut-cmdd-{secrets.token_hex(4)}", + ) + workspace_ids.append(workspace_id) + _assert_shortcut_creates_remote_terminal( + client, + workspace_id, + "cmd+d", + "cmd+d", + expect_new_pane=True, + ) + + workspace_id = _open_ssh_workspace( + client, + cli, + name=f"ssh-shortcut-cmdshiftd-{secrets.token_hex(4)}", + ) + workspace_ids.append(workspace_id) + _assert_shortcut_creates_remote_terminal( + client, + workspace_id, + "cmd+shift+d", + "cmd+shift+d", + expect_new_pane=True, + ) + finally: + if workspace_ids: + try: + with cmux(SOCKET_PATH) as client: + for workspace_id in workspace_ids: + try: + client._call("workspace.close", {"workspace_id": workspace_id}) + except Exception: + pass + except Exception: + pass + + print("PASS: cmd+t/cmd+d/cmd+shift+d keep ssh terminals on the remote relay") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main())