diff --git a/Sources/Workspace.swift b/Sources/Workspace.swift index a6688089..07157401 100644 --- a/Sources/Workspace.swift +++ b/Sources/Workspace.swift @@ -647,6 +647,103 @@ extension Workspace { } } +final class WorkspaceRemoteDaemonPendingCallRegistry { + final class PendingCall { + let id: Int + fileprivate let semaphore = DispatchSemaphore(value: 0) + fileprivate var response: [String: Any]? + fileprivate var failureMessage: String? + + fileprivate init(id: Int) { + self.id = id + } + } + + enum WaitOutcome { + case response([String: Any]) + case failure(String) + case missing + case timedOut + } + + private let queue = DispatchQueue(label: "com.cmux.remote-ssh.daemon-rpc.pending.\(UUID().uuidString)") + private var nextRequestID = 1 + private var pendingID: Int? + private var pendingCall: PendingCall? + + func reset() { + queue.sync { + nextRequestID = 1 + clearPendingLocked() + } + } + + func register() -> PendingCall { + queue.sync { + let call = PendingCall(id: nextRequestID) + nextRequestID += 1 + pendingID = call.id + pendingCall = call + return call + } + } + + @discardableResult + func resolve(id: Int, payload: [String: Any]) -> Bool { + queue.sync { + guard pendingID == id, let pendingCall else { return false } + pendingCall.response = payload + pendingCall.semaphore.signal() + return true + } + } + + func failAll(_ message: String) { + queue.sync { + pendingCall?.failureMessage = message + pendingCall?.semaphore.signal() + } + } + + func remove(_ call: PendingCall) { + queue.sync { + guard pendingID == call.id else { return } + clearPendingLocked() + } + } + + func wait(for call: PendingCall, timeout: TimeInterval) -> WaitOutcome { + if call.semaphore.wait(timeout: .now() + timeout) == .timedOut { + queue.sync { + guard pendingID == call.id else { return } + clearPendingLocked() + } + return .timedOut + } + + return queue.sync { + guard pendingID == call.id, let pendingCall else { + return .missing + } + defer { + clearPendingLocked() + } + if let failure = pendingCall.failureMessage { + return .failure(failure) + } + guard let response = pendingCall.response else { + return .missing + } + return .response(response) + } + } + + private func clearPendingLocked() { + pendingID = nil + pendingCall = nil + } +} + private final class WorkspaceRemoteDaemonRPCClient { private static let maxStdoutBufferBytes = 256 * 1024 @@ -655,6 +752,7 @@ private final class WorkspaceRemoteDaemonRPCClient { private let onUnexpectedTermination: (String) -> Void private let callQueue = DispatchQueue(label: "com.cmux.remote-ssh.daemon-rpc.call.\(UUID().uuidString)") private let stateQueue = DispatchQueue(label: "com.cmux.remote-ssh.daemon-rpc.state.\(UUID().uuidString)") + private let pendingCalls = WorkspaceRemoteDaemonPendingCallRegistry() private var process: Process? private var stdinHandle: FileHandle? @@ -663,12 +761,6 @@ private final class WorkspaceRemoteDaemonRPCClient { private var isClosed = true private var shouldReportTermination = true - private var nextRequestID = 1 - private var pendingID: Int? - private var pendingSemaphore: DispatchSemaphore? - private var pendingResponse: [String: Any]? - private var pendingFailureMessage: String? - private var stdoutBuffer = Data() private var stderrBuffer = "" @@ -729,11 +821,8 @@ private final class WorkspaceRemoteDaemonRPCClient { self.shouldReportTermination = true self.stdoutBuffer = Data() self.stderrBuffer = "" - self.pendingID = nil - self.pendingSemaphore = nil - self.pendingResponse = nil - self.pendingFailureMessage = nil } + pendingCalls.reset() do { let hello = try call(method: "hello", params: [:], timeout: 8.0) @@ -809,16 +898,8 @@ private final class WorkspaceRemoteDaemonRPCClient { private func call(method: String, params: [String: Any], timeout: TimeInterval) throws -> [String: Any] { try callQueue.sync { - let semaphore = DispatchSemaphore(value: 0) - let requestID: Int = stateQueue.sync { - let id = nextRequestID - nextRequestID += 1 - pendingID = id - pendingSemaphore = semaphore - pendingResponse = nil - pendingFailureMessage = nil - return id - } + let pendingCall = pendingCalls.register() + let requestID = pendingCall.id let payload: Data do { @@ -828,9 +909,7 @@ private final class WorkspaceRemoteDaemonRPCClient { "params": params, ]) } catch { - stateQueue.sync { - clearPendingLocked() - } + pendingCalls.remove(pendingCall) throw NSError(domain: "cmux.remote.daemon.rpc", code: 10, userInfo: [ NSLocalizedDescriptionKey: "failed to encode daemon RPC request \(method): \(error.localizedDescription)", ]) @@ -839,34 +918,27 @@ private final class WorkspaceRemoteDaemonRPCClient { do { try writePayload(payload) } catch { - stateQueue.sync { - clearPendingLocked() - } + pendingCalls.remove(pendingCall) throw error } - if semaphore.wait(timeout: .now() + timeout) == .timedOut { + let response: [String: Any] + switch pendingCalls.wait(for: pendingCall, timeout: timeout) { + case .timedOut: stop(suppressTerminationCallback: false) throw NSError(domain: "cmux.remote.daemon.rpc", code: 11, userInfo: [ NSLocalizedDescriptionKey: "daemon RPC timeout waiting for \(method) response", ]) - } - - let response: [String: Any] = try stateQueue.sync { - defer { - clearPendingLocked() - } - if let failure = pendingFailureMessage { - throw NSError(domain: "cmux.remote.daemon.rpc", code: 12, userInfo: [ - NSLocalizedDescriptionKey: failure, - ]) - } - guard let pendingResponse else { - throw NSError(domain: "cmux.remote.daemon.rpc", code: 13, userInfo: [ - NSLocalizedDescriptionKey: "daemon RPC \(method) returned empty response", - ]) - } - return pendingResponse + case .failure(let failure): + throw NSError(domain: "cmux.remote.daemon.rpc", code: 12, userInfo: [ + NSLocalizedDescriptionKey: failure, + ]) + case .missing: + throw NSError(domain: "cmux.remote.daemon.rpc", code: 13, userInfo: [ + NSLocalizedDescriptionKey: "daemon RPC \(method) returned empty response", + ]) + case .response(let pendingResponse): + response = pendingResponse } let ok = (response["ok"] as? Bool) ?? false @@ -939,10 +1011,7 @@ private final class WorkspaceRemoteDaemonRPCClient { return -1 }() guard responseID >= 0 else { continue } - guard pendingID == responseID else { continue } - - pendingResponse = payload - pendingSemaphore?.signal() + _ = pendingCalls.resolve(id: responseID, payload: payload) } } @@ -1012,15 +1081,7 @@ private final class WorkspaceRemoteDaemonRPCClient { } private func signalPendingFailureLocked(_ message: String) { - pendingFailureMessage = message - pendingSemaphore?.signal() - } - - private func clearPendingLocked() { - pendingID = nil - pendingSemaphore = nil - pendingResponse = nil - pendingFailureMessage = nil + pendingCalls.failAll(message) } private static func encodeJSON(_ object: [String: Any]) throws -> Data { diff --git a/cmuxTests/GhosttyConfigTests.swift b/cmuxTests/GhosttyConfigTests.swift index 2e38deea..d7a4b136 100644 --- a/cmuxTests/GhosttyConfigTests.swift +++ b/cmuxTests/GhosttyConfigTests.swift @@ -701,6 +701,62 @@ final class WorkspaceRemoteDaemonManifestTests: XCTestCase { } } +final class WorkspaceRemoteDaemonPendingCallRegistryTests: XCTestCase { + func testSupportsMultiplePendingCallsResolvedOutOfOrder() { + let registry = WorkspaceRemoteDaemonPendingCallRegistry() + let first = registry.register() + let second = registry.register() + + XCTAssertTrue(registry.resolve(id: second.id, payload: [ + "ok": true, + "result": ["stream_id": "second"], + ])) + + switch registry.wait(for: second, timeout: 0.1) { + case .response(let response): + XCTAssertEqual(response["ok"] as? Bool, true) + XCTAssertEqual((response["result"] as? [String: String])?["stream_id"], "second") + default: + XCTFail("second pending call should complete independently") + } + + XCTAssertTrue(registry.resolve(id: first.id, payload: [ + "ok": true, + "result": ["stream_id": "first"], + ])) + + switch registry.wait(for: first, timeout: 0.1) { + case .response(let response): + XCTAssertEqual(response["ok"] as? Bool, true) + XCTAssertEqual((response["result"] as? [String: String])?["stream_id"], "first") + default: + XCTFail("first pending call should remain pending until its own response arrives") + } + } + + func testFailAllSignalsEveryPendingCall() { + let registry = WorkspaceRemoteDaemonPendingCallRegistry() + let first = registry.register() + let second = registry.register() + + registry.failAll("daemon transport stopped") + + switch registry.wait(for: first, timeout: 0.1) { + case .failure(let message): + XCTAssertEqual(message, "daemon transport stopped") + default: + XCTFail("first pending call should receive shared failure") + } + + switch registry.wait(for: second, timeout: 0.1) { + case .failure(let message): + XCTAssertEqual(message, "daemon transport stopped") + default: + XCTFail("second pending call should receive shared failure") + } + } +} + final class WindowBackgroundSelectionGateTests: XCTestCase { func testShouldApplyWindowBackgroundUsesOwningWindowSelectionWhenAvailable() { let tabId = UUID()