From d7d80ff5af9513df10660ac92f6f9cf8af8c19b5 Mon Sep 17 00:00:00 2001 From: Lawrence Chen <54008264+lawrencecchen@users.noreply.github.com> Date: Thu, 12 Mar 2026 06:01:45 -0700 Subject: [PATCH] Allow concurrent remote daemon RPC calls --- Sources/Workspace.swift | 142 +++++++++++++++++++--------------------- 1 file changed, 67 insertions(+), 75 deletions(-) diff --git a/Sources/Workspace.swift b/Sources/Workspace.swift index 07157401..8ab72d09 100644 --- a/Sources/Workspace.swift +++ b/Sources/Workspace.swift @@ -668,13 +668,12 @@ final class WorkspaceRemoteDaemonPendingCallRegistry { private let queue = DispatchQueue(label: "com.cmux.remote-ssh.daemon-rpc.pending.\(UUID().uuidString)") private var nextRequestID = 1 - private var pendingID: Int? - private var pendingCall: PendingCall? + private var pendingCalls: [Int: PendingCall] = [:] func reset() { queue.sync { nextRequestID = 1 - clearPendingLocked() + pendingCalls.removeAll(keepingCapacity: false) } } @@ -682,8 +681,7 @@ final class WorkspaceRemoteDaemonPendingCallRegistry { queue.sync { let call = PendingCall(id: nextRequestID) nextRequestID += 1 - pendingID = call.id - pendingCall = call + pendingCalls[call.id] = call return call } } @@ -691,7 +689,7 @@ final class WorkspaceRemoteDaemonPendingCallRegistry { @discardableResult func resolve(id: Int, payload: [String: Any]) -> Bool { queue.sync { - guard pendingID == id, let pendingCall else { return false } + guard let pendingCall = pendingCalls[id] else { return false } pendingCall.response = payload pendingCall.semaphore.signal() return true @@ -700,34 +698,33 @@ final class WorkspaceRemoteDaemonPendingCallRegistry { func failAll(_ message: String) { queue.sync { - pendingCall?.failureMessage = message - pendingCall?.semaphore.signal() + let calls = Array(pendingCalls.values) + for call in calls { + guard call.response == nil, call.failureMessage == nil else { continue } + call.failureMessage = message + call.semaphore.signal() + } } } func remove(_ call: PendingCall) { queue.sync { - guard pendingID == call.id else { return } - clearPendingLocked() + pendingCalls.removeValue(forKey: call.id) } } func wait(for call: PendingCall, timeout: TimeInterval) -> WaitOutcome { if call.semaphore.wait(timeout: .now() + timeout) == .timedOut { queue.sync { - guard pendingID == call.id else { return } - clearPendingLocked() + pendingCalls.removeValue(forKey: call.id) } return .timedOut } return queue.sync { - guard pendingID == call.id, let pendingCall else { + guard let pendingCall = pendingCalls.removeValue(forKey: call.id) else { return .missing } - defer { - clearPendingLocked() - } if let failure = pendingCall.failureMessage { return .failure(failure) } @@ -737,11 +734,6 @@ final class WorkspaceRemoteDaemonPendingCallRegistry { return .response(response) } } - - private func clearPendingLocked() { - pendingID = nil - pendingCall = nil - } } private final class WorkspaceRemoteDaemonRPCClient { @@ -750,7 +742,7 @@ private final class WorkspaceRemoteDaemonRPCClient { private let configuration: WorkspaceRemoteConfiguration private let remotePath: String private let onUnexpectedTermination: (String) -> Void - private let callQueue = DispatchQueue(label: "com.cmux.remote-ssh.daemon-rpc.call.\(UUID().uuidString)") + private let writeQueue = DispatchQueue(label: "com.cmux.remote-ssh.daemon-rpc.write.\(UUID().uuidString)") private let stateQueue = DispatchQueue(label: "com.cmux.remote-ssh.daemon-rpc.state.\(UUID().uuidString)") private let pendingCalls = WorkspaceRemoteDaemonPendingCallRegistry() @@ -897,62 +889,62 @@ private final class WorkspaceRemoteDaemonRPCClient { } private func call(method: String, params: [String: Any], timeout: TimeInterval) throws -> [String: Any] { - try callQueue.sync { - let pendingCall = pendingCalls.register() - let requestID = pendingCall.id + let pendingCall = pendingCalls.register() + let requestID = pendingCall.id - let payload: Data - do { - payload = try Self.encodeJSON([ - "id": requestID, - "method": method, - "params": params, - ]) - } catch { - pendingCalls.remove(pendingCall) - throw NSError(domain: "cmux.remote.daemon.rpc", code: 10, userInfo: [ - NSLocalizedDescriptionKey: "failed to encode daemon RPC request \(method): \(error.localizedDescription)", - ]) - } - - do { - try writePayload(payload) - } catch { - pendingCalls.remove(pendingCall) - throw error - } - - let response: [String: Any] - switch pendingCalls.wait(for: pendingCall, timeout: timeout) { - case .timedOut: - stop(suppressTerminationCallback: false) - throw NSError(domain: "cmux.remote.daemon.rpc", code: 11, userInfo: [ - NSLocalizedDescriptionKey: "daemon RPC timeout waiting for \(method) response", - ]) - case .failure(let failure): - throw NSError(domain: "cmux.remote.daemon.rpc", code: 12, userInfo: [ - NSLocalizedDescriptionKey: failure, - ]) - case .missing: - throw NSError(domain: "cmux.remote.daemon.rpc", code: 13, userInfo: [ - NSLocalizedDescriptionKey: "daemon RPC \(method) returned empty response", - ]) - case .response(let pendingResponse): - response = pendingResponse - } - - let ok = (response["ok"] as? Bool) ?? false - if ok { - return (response["result"] as? [String: Any]) ?? [:] - } - - let errorObject = (response["error"] as? [String: Any]) ?? [:] - let code = (errorObject["code"] as? String)?.trimmingCharacters(in: .whitespacesAndNewlines) ?? "rpc_error" - let message = (errorObject["message"] as? String)?.trimmingCharacters(in: .whitespacesAndNewlines) ?? "daemon RPC call failed" - throw NSError(domain: "cmux.remote.daemon.rpc", code: 14, userInfo: [ - NSLocalizedDescriptionKey: "\(method) failed (\(code)): \(message)", + let payload: Data + do { + payload = try Self.encodeJSON([ + "id": requestID, + "method": method, + "params": params, + ]) + } catch { + pendingCalls.remove(pendingCall) + throw NSError(domain: "cmux.remote.daemon.rpc", code: 10, userInfo: [ + NSLocalizedDescriptionKey: "failed to encode daemon RPC request \(method): \(error.localizedDescription)", ]) } + + do { + try writeQueue.sync { + try writePayload(payload) + } + } catch { + pendingCalls.remove(pendingCall) + throw error + } + + let response: [String: Any] + switch pendingCalls.wait(for: pendingCall, timeout: timeout) { + case .timedOut: + stop(suppressTerminationCallback: false) + throw NSError(domain: "cmux.remote.daemon.rpc", code: 11, userInfo: [ + NSLocalizedDescriptionKey: "daemon RPC timeout waiting for \(method) response", + ]) + case .failure(let failure): + throw NSError(domain: "cmux.remote.daemon.rpc", code: 12, userInfo: [ + NSLocalizedDescriptionKey: failure, + ]) + case .missing: + throw NSError(domain: "cmux.remote.daemon.rpc", code: 13, userInfo: [ + NSLocalizedDescriptionKey: "daemon RPC \(method) returned empty response", + ]) + case .response(let pendingResponse): + response = pendingResponse + } + + let ok = (response["ok"] as? Bool) ?? false + if ok { + return (response["result"] as? [String: Any]) ?? [:] + } + + let errorObject = (response["error"] as? [String: Any]) ?? [:] + let code = (errorObject["code"] as? String)?.trimmingCharacters(in: .whitespacesAndNewlines) ?? "rpc_error" + let message = (errorObject["message"] as? String)?.trimmingCharacters(in: .whitespacesAndNewlines) ?? "daemon RPC call failed" + throw NSError(domain: "cmux.remote.daemon.rpc", code: 14, userInfo: [ + NSLocalizedDescriptionKey: "\(method) failed (\(code)): \(message)", + ]) } private func writePayload(_ payload: Data) throws {