Allow concurrent remote daemon RPC calls

This commit is contained in:
Lawrence Chen 2026-03-12 06:01:45 -07:00
parent 4a12dca8a7
commit d7d80ff5af

View file

@ -668,13 +668,12 @@ final class WorkspaceRemoteDaemonPendingCallRegistry {
private let queue = DispatchQueue(label: "com.cmux.remote-ssh.daemon-rpc.pending.\(UUID().uuidString)")
private var nextRequestID = 1
private var pendingID: Int?
private var pendingCall: PendingCall?
private var pendingCalls: [Int: PendingCall] = [:]
func reset() {
queue.sync {
nextRequestID = 1
clearPendingLocked()
pendingCalls.removeAll(keepingCapacity: false)
}
}
@ -682,8 +681,7 @@ final class WorkspaceRemoteDaemonPendingCallRegistry {
queue.sync {
let call = PendingCall(id: nextRequestID)
nextRequestID += 1
pendingID = call.id
pendingCall = call
pendingCalls[call.id] = call
return call
}
}
@ -691,7 +689,7 @@ final class WorkspaceRemoteDaemonPendingCallRegistry {
@discardableResult
func resolve(id: Int, payload: [String: Any]) -> Bool {
queue.sync {
guard pendingID == id, let pendingCall else { return false }
guard let pendingCall = pendingCalls[id] else { return false }
pendingCall.response = payload
pendingCall.semaphore.signal()
return true
@ -700,34 +698,33 @@ final class WorkspaceRemoteDaemonPendingCallRegistry {
func failAll(_ message: String) {
queue.sync {
pendingCall?.failureMessage = message
pendingCall?.semaphore.signal()
let calls = Array(pendingCalls.values)
for call in calls {
guard call.response == nil, call.failureMessage == nil else { continue }
call.failureMessage = message
call.semaphore.signal()
}
}
}
func remove(_ call: PendingCall) {
queue.sync {
guard pendingID == call.id else { return }
clearPendingLocked()
pendingCalls.removeValue(forKey: call.id)
}
}
func wait(for call: PendingCall, timeout: TimeInterval) -> WaitOutcome {
if call.semaphore.wait(timeout: .now() + timeout) == .timedOut {
queue.sync {
guard pendingID == call.id else { return }
clearPendingLocked()
pendingCalls.removeValue(forKey: call.id)
}
return .timedOut
}
return queue.sync {
guard pendingID == call.id, let pendingCall else {
guard let pendingCall = pendingCalls.removeValue(forKey: call.id) else {
return .missing
}
defer {
clearPendingLocked()
}
if let failure = pendingCall.failureMessage {
return .failure(failure)
}
@ -737,11 +734,6 @@ final class WorkspaceRemoteDaemonPendingCallRegistry {
return .response(response)
}
}
private func clearPendingLocked() {
pendingID = nil
pendingCall = nil
}
}
private final class WorkspaceRemoteDaemonRPCClient {
@ -750,7 +742,7 @@ private final class WorkspaceRemoteDaemonRPCClient {
private let configuration: WorkspaceRemoteConfiguration
private let remotePath: String
private let onUnexpectedTermination: (String) -> Void
private let callQueue = DispatchQueue(label: "com.cmux.remote-ssh.daemon-rpc.call.\(UUID().uuidString)")
private let writeQueue = DispatchQueue(label: "com.cmux.remote-ssh.daemon-rpc.write.\(UUID().uuidString)")
private let stateQueue = DispatchQueue(label: "com.cmux.remote-ssh.daemon-rpc.state.\(UUID().uuidString)")
private let pendingCalls = WorkspaceRemoteDaemonPendingCallRegistry()
@ -897,62 +889,62 @@ private final class WorkspaceRemoteDaemonRPCClient {
}
private func call(method: String, params: [String: Any], timeout: TimeInterval) throws -> [String: Any] {
try callQueue.sync {
let pendingCall = pendingCalls.register()
let requestID = pendingCall.id
let pendingCall = pendingCalls.register()
let requestID = pendingCall.id
let payload: Data
do {
payload = try Self.encodeJSON([
"id": requestID,
"method": method,
"params": params,
])
} catch {
pendingCalls.remove(pendingCall)
throw NSError(domain: "cmux.remote.daemon.rpc", code: 10, userInfo: [
NSLocalizedDescriptionKey: "failed to encode daemon RPC request \(method): \(error.localizedDescription)",
])
}
do {
try writePayload(payload)
} catch {
pendingCalls.remove(pendingCall)
throw error
}
let response: [String: Any]
switch pendingCalls.wait(for: pendingCall, timeout: timeout) {
case .timedOut:
stop(suppressTerminationCallback: false)
throw NSError(domain: "cmux.remote.daemon.rpc", code: 11, userInfo: [
NSLocalizedDescriptionKey: "daemon RPC timeout waiting for \(method) response",
])
case .failure(let failure):
throw NSError(domain: "cmux.remote.daemon.rpc", code: 12, userInfo: [
NSLocalizedDescriptionKey: failure,
])
case .missing:
throw NSError(domain: "cmux.remote.daemon.rpc", code: 13, userInfo: [
NSLocalizedDescriptionKey: "daemon RPC \(method) returned empty response",
])
case .response(let pendingResponse):
response = pendingResponse
}
let ok = (response["ok"] as? Bool) ?? false
if ok {
return (response["result"] as? [String: Any]) ?? [:]
}
let errorObject = (response["error"] as? [String: Any]) ?? [:]
let code = (errorObject["code"] as? String)?.trimmingCharacters(in: .whitespacesAndNewlines) ?? "rpc_error"
let message = (errorObject["message"] as? String)?.trimmingCharacters(in: .whitespacesAndNewlines) ?? "daemon RPC call failed"
throw NSError(domain: "cmux.remote.daemon.rpc", code: 14, userInfo: [
NSLocalizedDescriptionKey: "\(method) failed (\(code)): \(message)",
let payload: Data
do {
payload = try Self.encodeJSON([
"id": requestID,
"method": method,
"params": params,
])
} catch {
pendingCalls.remove(pendingCall)
throw NSError(domain: "cmux.remote.daemon.rpc", code: 10, userInfo: [
NSLocalizedDescriptionKey: "failed to encode daemon RPC request \(method): \(error.localizedDescription)",
])
}
do {
try writeQueue.sync {
try writePayload(payload)
}
} catch {
pendingCalls.remove(pendingCall)
throw error
}
let response: [String: Any]
switch pendingCalls.wait(for: pendingCall, timeout: timeout) {
case .timedOut:
stop(suppressTerminationCallback: false)
throw NSError(domain: "cmux.remote.daemon.rpc", code: 11, userInfo: [
NSLocalizedDescriptionKey: "daemon RPC timeout waiting for \(method) response",
])
case .failure(let failure):
throw NSError(domain: "cmux.remote.daemon.rpc", code: 12, userInfo: [
NSLocalizedDescriptionKey: failure,
])
case .missing:
throw NSError(domain: "cmux.remote.daemon.rpc", code: 13, userInfo: [
NSLocalizedDescriptionKey: "daemon RPC \(method) returned empty response",
])
case .response(let pendingResponse):
response = pendingResponse
}
let ok = (response["ok"] as? Bool) ?? false
if ok {
return (response["result"] as? [String: Any]) ?? [:]
}
let errorObject = (response["error"] as? [String: Any]) ?? [:]
let code = (errorObject["code"] as? String)?.trimmingCharacters(in: .whitespacesAndNewlines) ?? "rpc_error"
let message = (errorObject["message"] as? String)?.trimmingCharacters(in: .whitespacesAndNewlines) ?? "daemon RPC call failed"
throw NSError(domain: "cmux.remote.daemon.rpc", code: 14, userInfo: [
NSLocalizedDescriptionKey: "\(method) failed (\(code)): \(message)",
])
}
private func writePayload(_ payload: Data) throws {