From c5577dd495d35d21140e7473d6359189cd762787 Mon Sep 17 00:00:00 2001 From: Austin Wang Date: Thu, 5 Mar 2026 19:52:59 -0800 Subject: [PATCH] Fix flaky CLI socket listener recovery (#952) (#954) * Harden socket listener health checks and path handling * Make socket health probe non-blocking * Address PR feedback for issue 952 recovery guard * Run issue 952 regression guard without pytest dependency * Harden socket probe timing and health telemetry * Handle missing git in issue 952 regression guard * Address remaining PR 954 review fixes --- .github/workflows/ci-macos-compat.yml | 3 + .github/workflows/ci.yml | 6 + .github/workflows/test-depot.yml | 3 + Sources/AppDelegate.swift | 41 ++++- Sources/TerminalController.swift | 141 +++++++++++++-- cmuxTests/CmuxWebViewKeyEquivalentTests.swift | 20 ++- ...test_issue_952_socket_listener_recovery.py | 162 ++++++++++++++++++ 7 files changed, 355 insertions(+), 21 deletions(-) create mode 100644 tests/test_issue_952_socket_listener_recovery.py diff --git a/.github/workflows/ci-macos-compat.yml b/.github/workflows/ci-macos-compat.yml index 4bcab6b0..463e5a56 100644 --- a/.github/workflows/ci-macos-compat.yml +++ b/.github/workflows/ci-macos-compat.yml @@ -94,6 +94,9 @@ jobs: sleep $((attempt * 5)) done + - name: Run issue #952 regression guard + run: python3 tests/test_issue_952_socket_listener_recovery.py + - name: Run unit tests run: | set -euo pipefail diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 6bac7a85..c106a18a 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -122,6 +122,9 @@ jobs: sleep $((attempt * 5)) done + - name: Run issue #952 regression guard + run: python3 tests/test_issue_952_socket_listener_recovery.py + - name: Run unit tests run: | set -euo pipefail @@ -244,6 +247,9 @@ jobs: sleep $((attempt * 5)) done + - name: Run issue #952 regression guard + run: python3 tests/test_issue_952_socket_listener_recovery.py + - name: Create virtual display run: | set -euo pipefail diff --git a/.github/workflows/test-depot.yml b/.github/workflows/test-depot.yml index ca636bf6..867479ad 100644 --- a/.github/workflows/test-depot.yml +++ b/.github/workflows/test-depot.yml @@ -122,6 +122,9 @@ jobs: sleep $((attempt * 5)) done + - name: Run issue #952 regression guard + run: python3 tests/test_issue_952_socket_listener_recovery.py + - name: Run unit tests if: ${{ !inputs.skip_unit_tests }} run: | diff --git a/Sources/AppDelegate.swift b/Sources/AppDelegate.swift index dfbff6c2..b015a5ea 100644 --- a/Sources/AppDelegate.swift +++ b/Sources/AppDelegate.swift @@ -1577,7 +1577,8 @@ final class AppDelegate: NSObject, NSApplicationDelegate, UNUserNotificationCent private var isApplyingStartupSessionRestore = false private var sessionAutosaveTimer: DispatchSourceTimer? private var socketListenerHealthTimer: DispatchSourceTimer? - private static let socketListenerHealthCheckInterval: DispatchTimeInterval = .seconds(5) + private var socketListenerHealthCheckInFlight = false + private static let socketListenerHealthCheckInterval: DispatchTimeInterval = .seconds(2) private var lastSocketListenerUnhealthyCaptureAt: Date = .distantPast private static let socketListenerUnhealthyCaptureCooldown: TimeInterval = 60 private let sessionPersistenceQueue = DispatchQueue( @@ -2508,25 +2509,57 @@ final class AppDelegate: NSObject, NSApplicationDelegate, UNUserNotificationCent private func stopSocketListenerHealthMonitor() { socketListenerHealthTimer?.cancel() socketListenerHealthTimer = nil + socketListenerHealthCheckInFlight = false } private func restartSocketListenerIfNeededForHealthCheck(source: String) { - guard let config = socketListenerConfigurationIfEnabled() else { return } - let health = TerminalController.shared.socketListenerHealth(expectedSocketPath: config.path) + guard !socketListenerHealthCheckInFlight, + let config = socketListenerConfigurationIfEnabled() else { return } + let expectedSocketPath = config.path + let terminalController = TerminalController.shared + socketListenerHealthCheckInFlight = true + Thread.detachNewThread { [weak self, expectedSocketPath, source, terminalController] in + let health = terminalController.socketListenerHealth(expectedSocketPath: expectedSocketPath) + Task { @MainActor [weak self, health] in + guard let self else { return } + self.socketListenerHealthCheckInFlight = false + self.handleSocketListenerHealthCheckResult( + health, + source: source, + expectedSocketPath: expectedSocketPath + ) + } + } + } + + private func handleSocketListenerHealthCheckResult( + _ health: TerminalController.SocketListenerHealth, + source: String, + expectedSocketPath: String + ) { + guard let config = socketListenerConfigurationIfEnabled(), + config.path == expectedSocketPath else { return } guard !health.isHealthy else { lastSocketListenerUnhealthyCaptureAt = .distantPast return } let failureSignals = health.failureSignals - let data: [String: Any] = [ + var data: [String: Any] = [ "source": source, "path": config.path, "isRunning": health.isRunning ? 1 : 0, "acceptLoopAlive": health.acceptLoopAlive ? 1 : 0, "socketPathMatches": health.socketPathMatches ? 1 : 0, "socketPathExists": health.socketPathExists ? 1 : 0, + "socketProbePerformed": health.socketProbePerformed ? 1 : 0, "failureSignals": failureSignals ] + if let socketConnectable = health.socketConnectable { + data["socketConnectable"] = socketConnectable ? 1 : 0 + } + if let socketConnectErrno = health.socketConnectErrno { + data["socketConnectErrno"] = Int(socketConnectErrno) + } sentryBreadcrumb("socket.listener.unhealthy", category: "socket", data: data) let now = Date() if now.timeIntervalSince(lastSocketListenerUnhealthyCaptureAt) >= Self.socketListenerUnhealthyCaptureCooldown { diff --git a/Sources/TerminalController.swift b/Sources/TerminalController.swift index 5d905129..8f8eadf1 100644 --- a/Sources/TerminalController.swift +++ b/Sources/TerminalController.swift @@ -13,6 +13,9 @@ class TerminalController { let acceptLoopAlive: Bool let socketPathMatches: Bool let socketPathExists: Bool + let socketProbePerformed: Bool + let socketConnectable: Bool? + let socketConnectErrno: Int32? var failureSignals: [String] { var signals: [String] = [] @@ -20,6 +23,9 @@ class TerminalController { if !acceptLoopAlive { signals.append("accept_loop_dead") } if !socketPathMatches { signals.append("socket_path_mismatch") } if !socketPathExists { signals.append("socket_missing") } + if socketProbePerformed && isRunning && acceptLoopAlive && socketPathMatches && socketPathExists && socketConnectable == false { + signals.append("socket_unreachable") + } return signals } @@ -51,6 +57,14 @@ class TerminalController { private nonisolated static let acceptFailureMaxBackoffMs = 5_000 private nonisolated static let acceptFailureMinimumRearmDelayMs = 100 private nonisolated static let acceptFailureRearmThreshold = 50 + private nonisolated static let socketProbePollTimeoutMs: Int32 = 100 + private nonisolated static let socketProbePollAttempts = 3 + private nonisolated static let socketProbePollRetryBackoffUs: useconds_t = 50_000 + private nonisolated static let unixSocketPathMaxLength: Int = { + var addr = sockaddr_un() + // Reserve one byte for the null terminator. + return MemoryLayout.size(ofValue: addr.sun_path) - 1 + }() private struct ListenerStateSnapshot { let socketPath: String @@ -508,6 +522,99 @@ class TerminalController { return !isRunning && activeGeneration == 0 } + private nonisolated static func unixSocketAddress(path: String) -> sockaddr_un? { + var addr = sockaddr_un() + addr.sun_family = sa_family_t(AF_UNIX) + + let maxLength = unixSocketPathMaxLength + 1 + var didFit = false + path.withCString { source in + let sourceLength = strlen(source) + guard sourceLength < maxLength else { return } + + _ = withUnsafeMutableBytes(of: &addr.sun_path) { buffer in + buffer.initializeMemory(as: UInt8.self, repeating: 0) + } + withUnsafeMutablePointer(to: &addr.sun_path) { pathPtr in + let destination = UnsafeMutableRawPointer(pathPtr).assumingMemoryBound(to: CChar.self) + strncpy(destination, source, maxLength - 1) + } + didFit = true + } + return didFit ? addr : nil + } + + private nonisolated static func bindUnixSocket(_ socket: Int32, path: String) -> Int32? { + guard var addr = unixSocketAddress(path: path) else { return nil } + return withUnsafePointer(to: &addr) { ptr in + ptr.withMemoryRebound(to: sockaddr.self, capacity: 1) { sockaddrPtr in + bind(socket, sockaddrPtr, socklen_t(MemoryLayout.size)) + } + } + } + + private nonisolated static func probeSocketConnectability(path: String) -> (isConnectable: Bool?, errnoCode: Int32?) { + let probeSocket = socket(AF_UNIX, SOCK_STREAM, 0) + guard probeSocket >= 0 else { + return (false, errno) + } + defer { close(probeSocket) } + + let existingFlags = fcntl(probeSocket, F_GETFL, 0) + if existingFlags >= 0 { + _ = fcntl(probeSocket, F_SETFL, existingFlags | O_NONBLOCK) + } + + guard var addr = unixSocketAddress(path: path) else { + return (false, ENAMETOOLONG) + } + let connectResult = withUnsafePointer(to: &addr) { ptr in + ptr.withMemoryRebound(to: sockaddr.self, capacity: 1) { sockaddrPtr in + connect(probeSocket, sockaddrPtr, socklen_t(MemoryLayout.size)) + } + } + if connectResult == 0 { + return (true, nil) + } + let connectErrno = errno + if connectErrno == EINPROGRESS { + var pollDescriptor = pollfd(fd: probeSocket, events: Int16(POLLOUT), revents: 0) + for attempt in 0.. 0 { + var socketError: Int32 = 0 + var socketErrorLength = socklen_t(MemoryLayout.size) + let status = getsockopt( + probeSocket, + SOL_SOCKET, + SO_ERROR, + &socketError, + &socketErrorLength + ) + if status == 0 && socketError == 0 { + return (true, nil) + } + if status == 0 { + return (false, socketError) + } + return (false, errno) + } + + let pollErrno = errno + if pollResult == 0 || pollErrno == EINTR { + if attempt + 1 < Self.socketProbePollAttempts { + usleep(Self.socketProbePollRetryBackoffUs) + continue + } + return (false, pollResult == 0 ? ETIMEDOUT : pollErrno) + } + return (false, pollErrno) + } + } + return (false, connectErrno) + } + func start(tabManager: TabManager, socketPath: String, accessMode: SocketControlMode) { self.tabManager = tabManager self.accessMode = accessMode @@ -556,19 +663,18 @@ class TerminalController { } // Bind to path - var addr = sockaddr_un() - addr.sun_family = sa_family_t(AF_UNIX) - socketPath.withCString { ptr in - withUnsafeMutablePointer(to: &addr.sun_path) { pathPtr in - let pathBuf = UnsafeMutableRawPointer(pathPtr).assumingMemoryBound(to: CChar.self) - strcpy(pathBuf, ptr) - } - } - - let bindResult = withUnsafePointer(to: &addr) { ptr in - ptr.withMemoryRebound(to: sockaddr.self, capacity: 1) { sockaddrPtr in - bind(newServerSocket, sockaddrPtr, socklen_t(MemoryLayout.size)) - } + guard let bindResult = Self.bindUnixSocket(newServerSocket, path: socketPath) else { + close(newServerSocket) + reportSocketListenerFailure( + message: "socket.listener.start.failed", + stage: "bind_path_too_long", + errnoCode: ENAMETOOLONG, + extra: [ + "pathLength": socketPath.utf8.count, + "maxPathLength": Self.unixSocketPathMaxLength + ] + ) + return } guard bindResult >= 0 else { @@ -653,12 +759,19 @@ class TerminalController { var st = stat() let exists = lstat(expectedSocketPath, &st) == 0 && (st.st_mode & S_IFMT) == S_IFSOCK + let shouldProbeConnection = snapshot.isRunning && snapshot.acceptLoopAlive && pathMatches && exists + let connectability = shouldProbeConnection + ? Self.probeSocketConnectability(path: expectedSocketPath) + : (isConnectable: nil, errnoCode: nil) return SocketListenerHealth( isRunning: snapshot.isRunning, acceptLoopAlive: snapshot.acceptLoopAlive, socketPathMatches: pathMatches, - socketPathExists: exists + socketPathExists: exists, + socketProbePerformed: shouldProbeConnection, + socketConnectable: connectability.isConnectable, + socketConnectErrno: connectability.errnoCode ) } diff --git a/cmuxTests/CmuxWebViewKeyEquivalentTests.swift b/cmuxTests/CmuxWebViewKeyEquivalentTests.swift index 114c7b2b..9e34690f 100644 --- a/cmuxTests/CmuxWebViewKeyEquivalentTests.swift +++ b/cmuxTests/CmuxWebViewKeyEquivalentTests.swift @@ -10732,6 +10732,7 @@ final class TerminalControllerSocketListenerHealthTests: XCTestCase { return fd } + @MainActor func testSocketListenerHealthRecognizesSocketPath() throws { let path = makeTempSocketPath() let fd = try bindUnixSocket(at: path) @@ -10745,6 +10746,7 @@ final class TerminalControllerSocketListenerHealthTests: XCTestCase { XCTAssertFalse(health.isHealthy) } + @MainActor func testSocketListenerHealthRejectsRegularFile() throws { let path = makeTempSocketPath() let url = URL(fileURLWithPath: path) @@ -10761,10 +10763,16 @@ final class TerminalControllerSocketListenerHealthTests: XCTestCase { isRunning: true, acceptLoopAlive: true, socketPathMatches: true, - socketPathExists: true + socketPathExists: true, + socketProbePerformed: true, + socketConnectable: true, + socketConnectErrno: nil ) XCTAssertTrue(health.isHealthy) - XCTAssertEqual(health.failureSignals, []) + XCTAssertTrue(health.failureSignals.isEmpty) + XCTAssertTrue(health.socketProbePerformed) + XCTAssertEqual(health.socketConnectable, true) + XCTAssertNil(health.socketConnectErrno) } func testSocketListenerHealthFailureSignalsIncludeAllDetectedProblems() { @@ -10772,9 +10780,15 @@ final class TerminalControllerSocketListenerHealthTests: XCTestCase { isRunning: false, acceptLoopAlive: false, socketPathMatches: false, - socketPathExists: false + socketPathExists: false, + socketProbePerformed: false, + socketConnectable: nil, + socketConnectErrno: nil ) XCTAssertFalse(health.isHealthy) + XCTAssertFalse(health.socketProbePerformed) + XCTAssertNil(health.socketConnectable) + XCTAssertNil(health.socketConnectErrno) XCTAssertEqual( health.failureSignals, ["not_running", "accept_loop_dead", "socket_path_mismatch", "socket_missing"] diff --git a/tests/test_issue_952_socket_listener_recovery.py b/tests/test_issue_952_socket_listener_recovery.py new file mode 100644 index 00000000..46a83644 --- /dev/null +++ b/tests/test_issue_952_socket_listener_recovery.py @@ -0,0 +1,162 @@ +#!/usr/bin/env python3 +"""Regression guard for issue #952 (flaky CLI socket connections).""" + +from __future__ import annotations + +import re +import shutil +import subprocess +from pathlib import Path + + +def get_repo_root() -> Path: + """Return the repository root for source inspections.""" + fallback_root = Path(__file__).resolve().parents[1] + git_path = shutil.which("git") + if git_path is None: + return fallback_root + + try: + result = subprocess.run( + [git_path, "rev-parse", "--show-toplevel"], + capture_output=True, + text=True, + check=False, + ) + except OSError: + return fallback_root + if result.returncode == 0: + return Path(result.stdout.strip()) + return fallback_root + + +def require(content: str, needle: str, message: str, failures: list[str], *, regex: bool = False) -> None: + """Record a failure when a required source pattern is missing.""" + matched = re.search(needle, content, re.MULTILINE) is not None if regex else needle in content + if not matched: + failures.append(message) + + +def collect_failures() -> list[str]: + """Collect missing source-level guards for the socket listener recovery fix.""" + repo_root = get_repo_root() + terminal_controller_path = repo_root / "Sources" / "TerminalController.swift" + app_delegate_path = repo_root / "Sources" / "AppDelegate.swift" + failures: list[str] = [] + + missing_paths = [ + str(path) for path in [terminal_controller_path, app_delegate_path] if not path.exists() + ] + if missing_paths: + for path in missing_paths: + failures.append(f"Missing expected file: {path}") + return failures + + terminal_controller = terminal_controller_path.read_text(encoding="utf-8") + app_delegate = app_delegate_path.read_text(encoding="utf-8") + + require( + terminal_controller, + "let socketProbePerformed: Bool", + "Socket health snapshot no longer tracks whether connectability was probed", + failures, + ) + require( + terminal_controller, + "let socketConnectable: Bool?", + "Socket health snapshot no longer distinguishes unprobed vs connectable sockets", + failures, + ) + require( + terminal_controller, + "let socketConnectErrno: Int32?", + "Socket health snapshot no longer preserves probe errno", + failures, + ) + require( + terminal_controller, + "signals.append(\"socket_unreachable\")", + "Socket health failures no longer flag unreachable listeners", + failures, + ) + require( + terminal_controller, + r"private\s+nonisolated\s+static\s+func\s+probeSocketConnectability\s*\(\s*path:\s*String\s*\)", + "Missing active socket connectability probe helper", + failures, + regex=True, + ) + require( + terminal_controller, + r"connect\s*\(\s*probeSocket\s*,\s*sockaddrPtr\s*,\s*socklen_t\s*\(\s*MemoryLayout\.size\s*\)\s*\)", + "Socket health probe no longer performs a real connect() check", + failures, + regex=True, + ) + require( + terminal_controller, + "stage: \"bind_path_too_long\"", + "Socket listener start no longer reports overlong Unix socket paths", + failures, + ) + require( + terminal_controller, + "Self.unixSocketPathMaxLength", + "Socket listener path-length telemetry was removed", + failures, + ) + + require( + app_delegate, + "private static let socketListenerHealthCheckInterval: DispatchTimeInterval = .seconds(2)", + "Socket health timer interval drifted from the fast recovery setting", + failures, + ) + require( + app_delegate, + "\"socketProbePerformed\": health.socketProbePerformed ? 1 : 0", + "Health telemetry no longer records whether a connectability probe ran", + failures, + ) + require( + app_delegate, + "if let socketConnectable = health.socketConnectable {", + "Health telemetry no longer gates connectability on an actual probe result", + failures, + ) + require( + app_delegate, + "data[\"socketConnectable\"] = socketConnectable ? 1 : 0", + "Health telemetry no longer includes connectability when a probe ran", + failures, + ) + require( + app_delegate, + "if let socketConnectErrno = health.socketConnectErrno {", + "Health telemetry no longer records connect probe errno when available", + failures, + ) + return failures + + +def test_issue_952_socket_listener_recovery() -> None: + """Keep the source-level recovery guards for issue #952 in CI.""" + failures = collect_failures() + assert not failures, "issue #952 regression(s) detected:\n- " + "\n- ".join(failures) + + +def main() -> int: + """Run the regression guard without requiring pytest to be installed.""" + failures = collect_failures() + if failures: + print("FAIL: issue #952 regression(s) detected") + for failure in failures: + print(f"- {failure}") + return 1 + + print("PASS: issue #952 socket listener recovery guards are present") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main())