feat: bootstrap remote daemon over ssh for remote workspaces

This commit is contained in:
Lawrence Chen 2026-02-20 23:44:06 -08:00
parent aaf2ef4c3a
commit 89fa361500
6 changed files with 767 additions and 7 deletions

View file

@ -19,6 +19,24 @@ private final class WorkspaceRemoteSessionController {
let stderrPipe: Pipe
}
private struct CommandResult {
let status: Int32
let stdout: String
let stderr: String
}
private struct RemotePlatform {
let goOS: String
let goArch: String
}
private struct DaemonHello {
let name: String
let version: String
let capabilities: [String]
let remotePath: String
}
private let queue = DispatchQueue(label: "com.cmux.remote-ssh.\(UUID().uuidString)", qos: .utility)
private weak var workspace: Workspace?
private let configuration: WorkspaceRemoteConfiguration
@ -33,6 +51,9 @@ private final class WorkspaceRemoteSessionController {
private var desiredRemotePorts: Set<Int> = []
private var forwardEntries: [Int: ForwardEntry] = [:]
private var portConflicts: Set<Int> = []
private var daemonReady = false
private var daemonBootstrapVersion: String?
private var daemonRemotePath: String?
init(workspace: Workspace, configuration: WorkspaceRemoteConfiguration) {
self.workspace = workspace
@ -43,8 +64,7 @@ private final class WorkspaceRemoteSessionController {
queue.async { [weak self] in
guard let self else { return }
guard !self.isStopping else { return }
self.publishState(.connecting, detail: "Connecting to \(self.configuration.displayTarget)")
self.startProbeLocked()
self.beginConnectionAttemptLocked()
}
}
@ -79,10 +99,44 @@ private final class WorkspaceRemoteSessionController {
forwardEntries.removeAll()
desiredRemotePorts.removeAll()
portConflicts.removeAll()
daemonReady = false
daemonBootstrapVersion = nil
daemonRemotePath = nil
}
private func beginConnectionAttemptLocked() {
guard !isStopping else { return }
publishState(.connecting, detail: "Connecting to \(configuration.displayTarget)")
publishDaemonStatus(.bootstrapping, detail: "Bootstrapping remote daemon on \(configuration.displayTarget)")
do {
let hello = try bootstrapDaemonLocked()
daemonReady = true
daemonBootstrapVersion = hello.version
daemonRemotePath = hello.remotePath
publishDaemonStatus(
.ready,
detail: "Remote daemon ready",
version: hello.version,
name: hello.name,
capabilities: hello.capabilities,
remotePath: hello.remotePath
)
startProbeLocked()
} catch {
daemonReady = false
daemonBootstrapVersion = nil
daemonRemotePath = nil
let detail = "Remote daemon bootstrap failed: \(error.localizedDescription)"
publishDaemonStatus(.error, detail: detail)
publishState(.error, detail: detail)
scheduleProbeRestartLocked(delay: 4.0)
}
}
private func startProbeLocked() {
guard !isStopping else { return }
guard daemonReady else { return }
probeStdoutBuffer = ""
probeStderrBuffer = ""
@ -164,8 +218,7 @@ private final class WorkspaceRemoteSessionController {
guard let self else { return }
guard !self.isStopping else { return }
guard self.probeProcess == nil else { return }
self.publishState(.connecting, detail: "Reconnecting to \(self.configuration.displayTarget)")
self.startProbeLocked()
self.beginConnectionAttemptLocked()
}
}
@ -306,6 +359,28 @@ private final class WorkspaceRemoteSessionController {
}
}
private func publishDaemonStatus(
_ state: WorkspaceRemoteDaemonState,
detail: String?,
version: String? = nil,
name: String? = nil,
capabilities: [String] = [],
remotePath: String? = nil
) {
let status = WorkspaceRemoteDaemonStatus(
state: state,
detail: detail,
version: version,
name: name,
capabilities: capabilities,
remotePath: remotePath
)
DispatchQueue.main.async { [weak workspace] in
guard let workspace else { return }
workspace.remoteDaemonStatus = status
}
}
private func publishPortsSnapshotLocked() {
let detected = desiredRemotePorts.sorted()
let forwarded = forwardEntries.keys.sorted()
@ -355,6 +430,289 @@ private final class WorkspaceRemoteSessionController {
return args
}
private func sshExec(arguments: [String], stdin: Data? = nil, timeout: TimeInterval = 15) throws -> CommandResult {
try runProcess(
executable: "/usr/bin/ssh",
arguments: arguments,
stdin: stdin,
timeout: timeout
)
}
private func scpExec(arguments: [String], timeout: TimeInterval = 30) throws -> CommandResult {
try runProcess(
executable: "/usr/bin/scp",
arguments: arguments,
stdin: nil,
timeout: timeout
)
}
private func runProcess(
executable: String,
arguments: [String],
environment: [String: String]? = nil,
currentDirectory: URL? = nil,
stdin: Data?,
timeout: TimeInterval
) throws -> CommandResult {
let process = Process()
process.executableURL = URL(fileURLWithPath: executable)
process.arguments = arguments
if let environment {
process.environment = environment
}
if let currentDirectory {
process.currentDirectoryURL = currentDirectory
}
let stdoutPipe = Pipe()
let stderrPipe = Pipe()
process.standardOutput = stdoutPipe
process.standardError = stderrPipe
if stdin != nil {
process.standardInput = Pipe()
} else {
process.standardInput = FileHandle.nullDevice
}
do {
try process.run()
} catch {
throw NSError(domain: "cmux.remote.process", code: 1, userInfo: [
NSLocalizedDescriptionKey: "Failed to launch \(URL(fileURLWithPath: executable).lastPathComponent): \(error.localizedDescription)",
])
}
if let stdin, let pipe = process.standardInput as? Pipe {
pipe.fileHandleForWriting.write(stdin)
try? pipe.fileHandleForWriting.close()
}
let deadline = Date().addingTimeInterval(timeout)
while process.isRunning && Date() < deadline {
Thread.sleep(forTimeInterval: 0.05)
}
if process.isRunning {
process.terminate()
throw NSError(domain: "cmux.remote.process", code: 2, userInfo: [
NSLocalizedDescriptionKey: "\(URL(fileURLWithPath: executable).lastPathComponent) timed out after \(Int(timeout))s",
])
}
let stdoutData = stdoutPipe.fileHandleForReading.readDataToEndOfFile()
let stderrData = stderrPipe.fileHandleForReading.readDataToEndOfFile()
let stdout = String(data: stdoutData, encoding: .utf8) ?? ""
let stderr = String(data: stderrData, encoding: .utf8) ?? ""
return CommandResult(status: process.terminationStatus, stdout: stdout, stderr: stderr)
}
private func bootstrapDaemonLocked() throws -> DaemonHello {
let platform = try resolveRemotePlatformLocked()
let version = Self.remoteDaemonVersion()
let remotePath = Self.remoteDaemonPath(version: version, goOS: platform.goOS, goArch: platform.goArch)
if try !remoteDaemonExistsLocked(remotePath: remotePath) {
let localBinary = try buildLocalDaemonBinary(goOS: platform.goOS, goArch: platform.goArch, version: version)
try uploadRemoteDaemonBinaryLocked(localBinary: localBinary, remotePath: remotePath)
}
return try helloRemoteDaemonLocked(remotePath: remotePath)
}
private func resolveRemotePlatformLocked() throws -> RemotePlatform {
let script = "uname -s; uname -m"
let command = "sh -lc \(Self.shellSingleQuoted(script))"
let result = try sshExec(arguments: sshCommonArguments(batchMode: true) + [configuration.destination, command], timeout: 10)
guard result.status == 0 else {
let detail = Self.lastNonEmptyLine(in: result.stderr) ?? "ssh exited \(result.status)"
throw NSError(domain: "cmux.remote.daemon", code: 10, userInfo: [
NSLocalizedDescriptionKey: "failed to query remote platform: \(detail)",
])
}
let lines = result.stdout
.split(separator: "\n")
.map { $0.trimmingCharacters(in: .whitespacesAndNewlines) }
.filter { !$0.isEmpty }
guard lines.count >= 2 else {
throw NSError(domain: "cmux.remote.daemon", code: 11, userInfo: [
NSLocalizedDescriptionKey: "remote platform probe returned invalid output",
])
}
guard let goOS = Self.mapUnameOS(lines[0]),
let goArch = Self.mapUnameArch(lines[1]) else {
throw NSError(domain: "cmux.remote.daemon", code: 12, userInfo: [
NSLocalizedDescriptionKey: "unsupported remote platform \(lines[0])/\(lines[1])",
])
}
return RemotePlatform(goOS: goOS, goArch: goArch)
}
private func remoteDaemonExistsLocked(remotePath: String) throws -> Bool {
let script = "if [ -x \(Self.shellSingleQuoted(remotePath)) ]; then echo yes; else echo no; fi"
let command = "sh -lc \(Self.shellSingleQuoted(script))"
let result = try sshExec(arguments: sshCommonArguments(batchMode: true) + [configuration.destination, command], timeout: 8)
guard result.status == 0 else { return false }
return result.stdout.trimmingCharacters(in: .whitespacesAndNewlines) == "yes"
}
private func buildLocalDaemonBinary(goOS: String, goArch: String, version: String) throws -> URL {
guard let repoRoot = Self.findRepoRoot() else {
throw NSError(domain: "cmux.remote.daemon", code: 20, userInfo: [
NSLocalizedDescriptionKey: "cannot locate cmux repo root for daemon build",
])
}
let daemonRoot = repoRoot.appendingPathComponent("daemon/remote", isDirectory: true)
let goModPath = daemonRoot.appendingPathComponent("go.mod").path
guard FileManager.default.fileExists(atPath: goModPath) else {
throw NSError(domain: "cmux.remote.daemon", code: 21, userInfo: [
NSLocalizedDescriptionKey: "missing daemon module at \(goModPath)",
])
}
guard let goBinary = Self.which("go") else {
throw NSError(domain: "cmux.remote.daemon", code: 22, userInfo: [
NSLocalizedDescriptionKey: "go is required to build cmuxd-remote",
])
}
let cacheRoot = URL(fileURLWithPath: NSTemporaryDirectory(), isDirectory: true)
.appendingPathComponent("cmux-remote-daemon-build", isDirectory: true)
.appendingPathComponent(version, isDirectory: true)
.appendingPathComponent("\(goOS)-\(goArch)", isDirectory: true)
try FileManager.default.createDirectory(at: cacheRoot, withIntermediateDirectories: true)
let output = cacheRoot.appendingPathComponent("cmuxd-remote", isDirectory: false)
var env = ProcessInfo.processInfo.environment
env["GOOS"] = goOS
env["GOARCH"] = goArch
env["CGO_ENABLED"] = "0"
let ldflags = "-s -w -X main.version=\(version)"
let result = try runProcess(
executable: goBinary,
arguments: ["build", "-trimpath", "-ldflags", ldflags, "-o", output.path, "./cmd/cmuxd-remote"],
environment: env,
currentDirectory: daemonRoot,
stdin: nil,
timeout: 90
)
guard result.status == 0 else {
let detail = Self.lastNonEmptyLine(in: result.stderr) ?? "go build failed with status \(result.status)"
throw NSError(domain: "cmux.remote.daemon", code: 23, userInfo: [
NSLocalizedDescriptionKey: "failed to build cmuxd-remote: \(detail)",
])
}
guard FileManager.default.isExecutableFile(atPath: output.path) else {
throw NSError(domain: "cmux.remote.daemon", code: 24, userInfo: [
NSLocalizedDescriptionKey: "cmuxd-remote build output is not executable",
])
}
return output
}
private func uploadRemoteDaemonBinaryLocked(localBinary: URL, remotePath: String) throws {
let remoteDirectory = (remotePath as NSString).deletingLastPathComponent
let remoteTempPath = "\(remotePath).tmp-\(UUID().uuidString.prefix(8))"
let mkdirScript = "mkdir -p \(Self.shellSingleQuoted(remoteDirectory))"
let mkdirCommand = "sh -lc \(Self.shellSingleQuoted(mkdirScript))"
let mkdirResult = try sshExec(arguments: sshCommonArguments(batchMode: true) + [configuration.destination, mkdirCommand], timeout: 12)
guard mkdirResult.status == 0 else {
let detail = Self.lastNonEmptyLine(in: mkdirResult.stderr) ?? "ssh exited \(mkdirResult.status)"
throw NSError(domain: "cmux.remote.daemon", code: 30, userInfo: [
NSLocalizedDescriptionKey: "failed to create remote daemon directory: \(detail)",
])
}
var scpArgs: [String] = ["-q", "-o", "StrictHostKeyChecking=accept-new"]
if let port = configuration.port {
scpArgs += ["-P", String(port)]
}
if let identityFile = configuration.identityFile,
!identityFile.trimmingCharacters(in: .whitespacesAndNewlines).isEmpty {
scpArgs += ["-i", identityFile]
}
for option in configuration.sshOptions {
let trimmed = option.trimmingCharacters(in: .whitespacesAndNewlines)
guard !trimmed.isEmpty else { continue }
scpArgs += ["-o", trimmed]
}
scpArgs += [localBinary.path, "\(configuration.destination):\(remoteTempPath)"]
let scpResult = try scpExec(arguments: scpArgs, timeout: 45)
guard scpResult.status == 0 else {
let detail = Self.lastNonEmptyLine(in: scpResult.stderr) ?? "scp exited \(scpResult.status)"
throw NSError(domain: "cmux.remote.daemon", code: 31, userInfo: [
NSLocalizedDescriptionKey: "failed to upload cmuxd-remote: \(detail)",
])
}
let finalizeScript = """
chmod 755 \(Self.shellSingleQuoted(remoteTempPath)) && \
mv \(Self.shellSingleQuoted(remoteTempPath)) \(Self.shellSingleQuoted(remotePath))
"""
let finalizeCommand = "sh -lc \(Self.shellSingleQuoted(finalizeScript))"
let finalizeResult = try sshExec(arguments: sshCommonArguments(batchMode: true) + [configuration.destination, finalizeCommand], timeout: 12)
guard finalizeResult.status == 0 else {
let detail = Self.lastNonEmptyLine(in: finalizeResult.stderr) ?? "ssh exited \(finalizeResult.status)"
throw NSError(domain: "cmux.remote.daemon", code: 32, userInfo: [
NSLocalizedDescriptionKey: "failed to install remote daemon binary: \(detail)",
])
}
}
private func helloRemoteDaemonLocked(remotePath: String) throws -> DaemonHello {
let request = #"{"id":1,"method":"hello","params":{}}"#
let script = "printf '%s\\n' \(Self.shellSingleQuoted(request)) | \(Self.shellSingleQuoted(remotePath)) serve --stdio"
let command = "sh -lc \(Self.shellSingleQuoted(script))"
let result = try sshExec(arguments: sshCommonArguments(batchMode: true) + [configuration.destination, command], timeout: 12)
guard result.status == 0 else {
let detail = Self.lastNonEmptyLine(in: result.stderr) ?? "ssh exited \(result.status)"
throw NSError(domain: "cmux.remote.daemon", code: 40, userInfo: [
NSLocalizedDescriptionKey: "failed to start remote daemon: \(detail)",
])
}
let responseLine = result.stdout
.split(separator: "\n")
.map(String.init)
.first(where: { !$0.trimmingCharacters(in: .whitespacesAndNewlines).isEmpty }) ?? ""
guard !responseLine.isEmpty,
let data = responseLine.data(using: .utf8),
let payload = try? JSONSerialization.jsonObject(with: data, options: []) as? [String: Any] else {
throw NSError(domain: "cmux.remote.daemon", code: 41, userInfo: [
NSLocalizedDescriptionKey: "remote daemon hello returned invalid JSON",
])
}
if let ok = payload["ok"] as? Bool, !ok {
let errorMessage: String = {
if let errorObject = payload["error"] as? [String: Any],
let message = errorObject["message"] as? String,
!message.trimmingCharacters(in: .whitespacesAndNewlines).isEmpty {
return message
}
return "hello call failed"
}()
throw NSError(domain: "cmux.remote.daemon", code: 42, userInfo: [
NSLocalizedDescriptionKey: "remote daemon hello failed: \(errorMessage)",
])
}
let resultObject = payload["result"] as? [String: Any] ?? [:]
let name = (resultObject["name"] as? String)?.trimmingCharacters(in: .whitespacesAndNewlines)
let version = (resultObject["version"] as? String)?.trimmingCharacters(in: .whitespacesAndNewlines)
let capabilities = (resultObject["capabilities"] as? [String]) ?? []
return DaemonHello(
name: (name?.isEmpty == false ? name! : "cmuxd-remote"),
version: (version?.isEmpty == false ? version! : "dev"),
capabilities: capabilities,
remotePath: remotePath
)
}
private static func parseRemotePorts(line: String) -> [Int] {
let tokens = line.split(whereSeparator: \.isWhitespace)
let values = tokens.compactMap { Int($0) }
@ -391,6 +749,96 @@ private final class WorkspaceRemoteSessionController {
"'" + value.replacingOccurrences(of: "'", with: "'\"'\"'") + "'"
}
private static func mapUnameOS(_ raw: String) -> String? {
switch raw.lowercased() {
case "linux":
return "linux"
case "darwin":
return "darwin"
case "freebsd":
return "freebsd"
default:
return nil
}
}
private static func mapUnameArch(_ raw: String) -> String? {
switch raw.lowercased() {
case "x86_64", "amd64":
return "amd64"
case "aarch64", "arm64":
return "arm64"
case "armv7l":
return "arm"
default:
return nil
}
}
private static func remoteDaemonVersion() -> String {
let bundleVersion = (Bundle.main.infoDictionary?["CFBundleShortVersionString"] as? String)?
.trimmingCharacters(in: .whitespacesAndNewlines)
if let bundleVersion, !bundleVersion.isEmpty {
return bundleVersion
}
return "dev"
}
private static func remoteDaemonPath(version: String, goOS: String, goArch: String) -> String {
".cmux/bin/cmuxd-remote/\(version)/\(goOS)-\(goArch)/cmuxd-remote"
}
private static func which(_ executable: String) -> String? {
let path = ProcessInfo.processInfo.environment["PATH"] ?? ""
for component in path.split(separator: ":") {
let candidate = String(component) + "/" + executable
if FileManager.default.isExecutableFile(atPath: candidate) {
return candidate
}
}
return nil
}
private static func findRepoRoot() -> URL? {
var candidates: [URL] = []
let compileTimeRoot = URL(fileURLWithPath: #filePath)
.deletingLastPathComponent() // Sources
.deletingLastPathComponent() // repo root
candidates.append(compileTimeRoot)
let environment = ProcessInfo.processInfo.environment
if let envRoot = environment["CMUX_REMOTE_DAEMON_SOURCE_ROOT"],
!envRoot.trimmingCharacters(in: .whitespacesAndNewlines).isEmpty {
candidates.append(URL(fileURLWithPath: envRoot, isDirectory: true))
}
if let envRoot = environment["CMUXTERM_REPO_ROOT"],
!envRoot.trimmingCharacters(in: .whitespacesAndNewlines).isEmpty {
candidates.append(URL(fileURLWithPath: envRoot, isDirectory: true))
}
candidates.append(URL(fileURLWithPath: FileManager.default.currentDirectoryPath, isDirectory: true))
if let executable = Bundle.main.executableURL?.deletingLastPathComponent() {
candidates.append(executable)
candidates.append(executable.deletingLastPathComponent())
candidates.append(executable.deletingLastPathComponent().deletingLastPathComponent())
}
let fm = FileManager.default
for base in candidates {
var cursor = base.standardizedFileURL
for _ in 0..<10 {
let marker = cursor.appendingPathComponent("daemon/remote/go.mod").path
if fm.fileExists(atPath: marker) {
return cursor
}
let parent = cursor.deletingLastPathComponent()
if parent.path == cursor.path {
break
}
cursor = parent
}
}
return nil
}
private static func lastNonEmptyLine(in text: String) -> String? {
for line in text.split(separator: "\n").reversed() {
let trimmed = line.trimmingCharacters(in: .whitespacesAndNewlines)
@ -458,6 +906,33 @@ enum WorkspaceRemoteConnectionState: String {
case error
}
enum WorkspaceRemoteDaemonState: String {
case unavailable
case bootstrapping
case ready
case error
}
struct WorkspaceRemoteDaemonStatus: Equatable {
var state: WorkspaceRemoteDaemonState = .unavailable
var detail: String?
var version: String?
var name: String?
var capabilities: [String] = []
var remotePath: String?
func payload() -> [String: Any] {
[
"state": state.rawValue,
"detail": detail ?? NSNull(),
"version": version ?? NSNull(),
"name": name ?? NSNull(),
"capabilities": capabilities,
"remote_path": remotePath ?? NSNull(),
]
}
}
struct WorkspaceRemoteConfiguration: Equatable {
let destination: String
let port: Int?
@ -531,6 +1006,7 @@ final class Workspace: Identifiable, ObservableObject {
@Published var remoteConfiguration: WorkspaceRemoteConfiguration?
@Published var remoteConnectionState: WorkspaceRemoteConnectionState = .disconnected
@Published var remoteConnectionDetail: String?
@Published var remoteDaemonStatus: WorkspaceRemoteDaemonStatus = WorkspaceRemoteDaemonStatus()
@Published var remoteDetectedPorts: [Int] = []
@Published var remoteForwardedPorts: [Int] = []
@Published var remotePortConflicts: [Int] = []
@ -1022,6 +1498,7 @@ final class Workspace: Identifiable, ObservableObject {
"enabled": remoteConfiguration != nil,
"state": remoteConnectionState.rawValue,
"connected": remoteConnectionState == .connected,
"daemon": remoteDaemonStatus.payload(),
"detected_ports": remoteDetectedPorts,
"forwarded_ports": remoteForwardedPorts,
"conflicted_ports": remotePortConflicts,
@ -1047,6 +1524,7 @@ final class Workspace: Identifiable, ObservableObject {
remoteForwardedPorts = []
remotePortConflicts = []
remoteConnectionDetail = nil
remoteDaemonStatus = WorkspaceRemoteDaemonStatus()
recomputeListeningPorts()
remoteSessionController?.stop()
@ -1076,6 +1554,7 @@ final class Workspace: Identifiable, ObservableObject {
remotePortConflicts = []
remoteConnectionState = .disconnected
remoteConnectionDetail = nil
remoteDaemonStatus = WorkspaceRemoteDaemonStatus()
if clearConfiguration {
remoteConfiguration = nil
}

View file

@ -1,6 +1,6 @@
# cmuxd-remote (Go)
Minimal remote daemon scaffold for `cmux ssh`.
Go remote daemon for `cmux ssh` bootstrap and capability negotiation.
Current commands:
1. `cmuxd-remote version`
@ -10,5 +10,7 @@ Current RPC methods (newline-delimited JSON):
1. `hello`
2. `ping`
This scaffold is intentionally small so `cmux` can start integrating daemon bootstrap,
capability negotiation, and protocol evolution without coupling to the Swift app runtime.
Current integration in cmux:
1. `workspace.remote.configure` now bootstraps this binary over SSH when missing.
2. Client sends `hello` before enabling remote port probing/forwarding.
3. Daemon status/capabilities are exposed in `workspace.remote.status -> remote.daemon`.

View file

@ -41,6 +41,14 @@ func TestRunStdioHelloAndPing(t *testing.T) {
if ok, _ := first["ok"].(bool); !ok {
t.Fatalf("first response should be ok=true: %v", first)
}
firstResult, _ := first["result"].(map[string]any)
if firstResult == nil {
t.Fatalf("first response missing result object: %v", first)
}
capabilities, _ := firstResult["capabilities"].([]any)
if len(capabilities) < 2 {
t.Fatalf("hello should return capabilities: %v", firstResult)
}
var second map[string]any
if err := json.Unmarshal([]byte(lines[1]), &second); err != nil {

View file

@ -129,6 +129,11 @@ def main() -> int:
status = client._call("workspace.remote.status", {"workspace_id": workspace_id}) or {}
status_remote = status.get("remote") or {}
_must(bool(status_remote.get("enabled")) is True, f"workspace.remote.status should report enabled remote: {status}")
daemon = status_remote.get("daemon") or {}
_must(
str(daemon.get("state") or "") in {"unavailable", "bootstrapping", "ready", "error"},
f"workspace.remote.status should include daemon state metadata: {status_remote}",
)
# Regression: --name is optional.
payload2 = _run_cli_json(

View file

@ -158,6 +158,11 @@ def main() -> int:
else:
raise cmuxError(f"Remote port forwarding did not converge: {last_status}")
daemon = ((last_status.get("remote") or {}).get("daemon") or {})
_must(str(daemon.get("state") or "") == "ready", f"daemon should be ready in connected state: {last_status}")
capabilities = daemon.get("capabilities") or []
_must("session.basic" in capabilities, f"daemon hello capabilities missing session.basic: {daemon}")
body = ""
deadline_http = time.time() + 15.0
while time.time() < deadline_http:

View file

@ -0,0 +1,261 @@
#!/usr/bin/env python3
"""Docker integration: remote SSH reconnect after host restart."""
from __future__ import annotations
import glob
import json
import os
import secrets
import shutil
import socket
import subprocess
import sys
import tempfile
import time
import urllib.request
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent))
from cmux import cmux, cmuxError
SOCKET_PATH = os.environ.get("CMUX_SOCKET", "/tmp/cmux-debug.sock")
REMOTE_HTTP_PORT = int(os.environ.get("CMUX_SSH_TEST_REMOTE_HTTP_PORT", "43174"))
def _must(cond: bool, msg: str) -> None:
if not cond:
raise cmuxError(msg)
def _find_cli_binary() -> str:
env_cli = os.environ.get("CMUXTERM_CLI")
if env_cli and os.path.isfile(env_cli) and os.access(env_cli, os.X_OK):
return env_cli
fixed = os.path.expanduser("~/Library/Developer/Xcode/DerivedData/cmux-tests-v2/Build/Products/Debug/cmux")
if os.path.isfile(fixed) and os.access(fixed, os.X_OK):
return fixed
candidates = glob.glob(os.path.expanduser("~/Library/Developer/Xcode/DerivedData/**/Build/Products/Debug/cmux"), recursive=True)
candidates += glob.glob("/tmp/cmux-*/Build/Products/Debug/cmux")
candidates = [p for p in candidates if os.path.isfile(p) and os.access(p, os.X_OK)]
if not candidates:
raise cmuxError("Could not locate cmux CLI binary; set CMUXTERM_CLI")
candidates.sort(key=lambda p: os.path.getmtime(p), reverse=True)
return candidates[0]
def _run(cmd: list[str], *, env: dict[str, str] | None = None, check: bool = True) -> subprocess.CompletedProcess[str]:
proc = subprocess.run(cmd, capture_output=True, text=True, env=env, check=False)
if check and proc.returncode != 0:
merged = f"{proc.stdout}\n{proc.stderr}".strip()
raise cmuxError(f"Command failed ({' '.join(cmd)}): {merged}")
return proc
def _run_cli_json(cli: str, args: list[str]) -> dict:
env = dict(os.environ)
env.pop("CMUX_WORKSPACE_ID", None)
env.pop("CMUX_SURFACE_ID", None)
env.pop("CMUX_TAB_ID", None)
proc = _run([cli, "--socket", SOCKET_PATH, "--json", *args], env=env)
try:
return json.loads(proc.stdout or "{}")
except Exception as exc: # noqa: BLE001
raise cmuxError(f"Invalid JSON output for {' '.join(args)}: {proc.stdout!r} ({exc})")
def _docker_available() -> bool:
if shutil.which("docker") is None:
return False
probe = _run(["docker", "info"], check=False)
return probe.returncode == 0
def _http_get(url: str, timeout: float = 2.0) -> str:
with urllib.request.urlopen(url, timeout=timeout) as resp: # nosec B310 - test loopback endpoint only
return resp.read().decode("utf-8", errors="replace")
def _find_free_loopback_port() -> int:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.bind(("127.0.0.1", 0))
return int(sock.getsockname()[1])
def _start_container(image_tag: str, container_name: str, pubkey: str, host_ssh_port: int) -> None:
for _ in range(20):
proc = _run(
[
"docker",
"run",
"-d",
"--rm",
"--name",
container_name,
"-e",
f"AUTHORIZED_KEY={pubkey}",
"-e",
f"REMOTE_HTTP_PORT={REMOTE_HTTP_PORT}",
"-p",
f"127.0.0.1:{host_ssh_port}:22",
image_tag,
],
check=False,
)
if proc.returncode == 0:
return
time.sleep(0.5)
merged = f"{proc.stdout}\n{proc.stderr}".strip()
raise cmuxError(f"Failed to start ssh test container on fixed port {host_ssh_port}: {merged}")
def _wait_remote_connected(client: cmux, workspace_id: str, timeout: float) -> dict:
deadline = time.time() + timeout
last_status = {}
while time.time() < deadline:
last_status = client._call("workspace.remote.status", {"workspace_id": workspace_id}) or {}
remote = last_status.get("remote") or {}
forwarded = set(int(x) for x in (remote.get("forwarded_ports") or []) if str(x).isdigit())
if str(remote.get("state") or "") == "connected" and REMOTE_HTTP_PORT in forwarded:
return last_status
time.sleep(0.5)
raise cmuxError(f"Remote did not reach connected+forwarded state: {last_status}")
def _wait_remote_degraded(client: cmux, workspace_id: str, timeout: float) -> dict:
deadline = time.time() + timeout
last_status = {}
while time.time() < deadline:
last_status = client._call("workspace.remote.status", {"workspace_id": workspace_id}) or {}
remote = last_status.get("remote") or {}
state = str(remote.get("state") or "")
if state in {"error", "connecting", "disconnected"}:
return last_status
time.sleep(0.5)
raise cmuxError(f"Remote did not enter reconnecting/degraded state: {last_status}")
def main() -> int:
if not _docker_available():
print("SKIP: docker is not available")
return 0
cli = _find_cli_binary()
repo_root = Path(__file__).resolve().parents[1]
fixture_dir = repo_root / "tests" / "fixtures" / "ssh-remote"
_must(fixture_dir.is_dir(), f"Missing docker fixture directory: {fixture_dir}")
temp_dir = Path(tempfile.mkdtemp(prefix="cmux-ssh-reconnect-"))
image_tag = f"cmux-ssh-test:{secrets.token_hex(4)}"
container_name = f"cmux-ssh-reconnect-{secrets.token_hex(4)}"
host_ssh_port = _find_free_loopback_port()
workspace_id = ""
container_running = False
try:
key_path = temp_dir / "id_ed25519"
_run(["ssh-keygen", "-t", "ed25519", "-N", "", "-f", str(key_path)])
pubkey = (key_path.with_suffix(".pub")).read_text(encoding="utf-8").strip()
_must(bool(pubkey), "Generated SSH public key was empty")
_run(["docker", "build", "-t", image_tag, str(fixture_dir)])
_start_container(image_tag, container_name, pubkey, host_ssh_port)
container_running = True
with cmux(SOCKET_PATH) as client:
payload = _run_cli_json(
cli,
[
"ssh",
"root@127.0.0.1",
"--name",
"docker-ssh-reconnect",
"--port",
str(host_ssh_port),
"--identity",
str(key_path),
"--ssh-option",
"UserKnownHostsFile=/dev/null",
"--ssh-option",
"StrictHostKeyChecking=no",
],
)
workspace_id = str(payload.get("workspace_id") or "")
workspace_ref = str(payload.get("workspace_ref") or "")
if not workspace_id and workspace_ref.startswith("workspace:"):
listed = client._call("workspace.list", {}) or {}
for row in listed.get("workspaces") or []:
if str(row.get("ref") or "") == workspace_ref:
workspace_id = str(row.get("id") or "")
break
_must(bool(workspace_id), f"cmux ssh output missing workspace_id: {payload}")
first_status = _wait_remote_connected(client, workspace_id, timeout=45.0)
first_daemon = ((first_status.get("remote") or {}).get("daemon") or {})
_must(str(first_daemon.get("state") or "") == "ready", f"daemon should be ready after first connect: {first_status}")
first_body = ""
first_deadline_http = time.time() + 15.0
while time.time() < first_deadline_http:
try:
first_body = _http_get(f"http://127.0.0.1:{REMOTE_HTTP_PORT}/")
except Exception:
time.sleep(0.5)
continue
if "cmux-ssh-forward-ok" in first_body:
break
time.sleep(0.3)
_must("cmux-ssh-forward-ok" in first_body, f"Forwarded HTTP endpoint failed before reconnect: {first_body[:120]!r}")
_run(["docker", "rm", "-f", container_name], check=False)
container_running = False
_wait_remote_degraded(client, workspace_id, timeout=20.0)
_start_container(image_tag, container_name, pubkey, host_ssh_port)
container_running = True
second_status = _wait_remote_connected(client, workspace_id, timeout=60.0)
second_daemon = ((second_status.get("remote") or {}).get("daemon") or {})
_must(str(second_daemon.get("state") or "") == "ready", f"daemon should be ready after reconnect: {second_status}")
second_body = ""
deadline_http = time.time() + 15.0
while time.time() < deadline_http:
try:
second_body = _http_get(f"http://127.0.0.1:{REMOTE_HTTP_PORT}/")
except Exception:
time.sleep(0.5)
continue
if "cmux-ssh-forward-ok" in second_body:
break
time.sleep(0.3)
_must("cmux-ssh-forward-ok" in second_body, f"Forwarded HTTP endpoint failed after reconnect: {second_body[:120]!r}")
try:
client.close_workspace(workspace_id)
except Exception:
pass
workspace_id = ""
print("PASS: docker SSH remote reconnects and re-establishes forwarded ports")
return 0
finally:
if workspace_id:
try:
with cmux(SOCKET_PATH) as cleanup_client:
cleanup_client.close_workspace(workspace_id)
except Exception:
pass
if container_running:
_run(["docker", "rm", "-f", container_name], check=False)
_run(["docker", "rmi", "-f", image_tag], check=False)
shutil.rmtree(temp_dir, ignore_errors=True)
if __name__ == "__main__":
raise SystemExit(main())