Fix flaky CLI socket listener recovery (#952) (#954)

* Harden socket listener health checks and path handling

* Make socket health probe non-blocking

* Address PR feedback for issue 952 recovery guard

* Run issue 952 regression guard without pytest dependency

* Harden socket probe timing and health telemetry

* Handle missing git in issue 952 regression guard

* Address remaining PR 954 review fixes
This commit is contained in:
Austin Wang 2026-03-05 19:52:59 -08:00 committed by GitHub
parent d99fa96c09
commit c5577dd495
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 355 additions and 21 deletions

View file

@ -94,6 +94,9 @@ jobs:
sleep $((attempt * 5))
done
- name: Run issue #952 regression guard
run: python3 tests/test_issue_952_socket_listener_recovery.py
- name: Run unit tests
run: |
set -euo pipefail

View file

@ -122,6 +122,9 @@ jobs:
sleep $((attempt * 5))
done
- name: Run issue #952 regression guard
run: python3 tests/test_issue_952_socket_listener_recovery.py
- name: Run unit tests
run: |
set -euo pipefail
@ -244,6 +247,9 @@ jobs:
sleep $((attempt * 5))
done
- name: Run issue #952 regression guard
run: python3 tests/test_issue_952_socket_listener_recovery.py
- name: Create virtual display
run: |
set -euo pipefail

View file

@ -122,6 +122,9 @@ jobs:
sleep $((attempt * 5))
done
- name: Run issue #952 regression guard
run: python3 tests/test_issue_952_socket_listener_recovery.py
- name: Run unit tests
if: ${{ !inputs.skip_unit_tests }}
run: |

View file

@ -1577,7 +1577,8 @@ final class AppDelegate: NSObject, NSApplicationDelegate, UNUserNotificationCent
private var isApplyingStartupSessionRestore = false
private var sessionAutosaveTimer: DispatchSourceTimer?
private var socketListenerHealthTimer: DispatchSourceTimer?
private static let socketListenerHealthCheckInterval: DispatchTimeInterval = .seconds(5)
private var socketListenerHealthCheckInFlight = false
private static let socketListenerHealthCheckInterval: DispatchTimeInterval = .seconds(2)
private var lastSocketListenerUnhealthyCaptureAt: Date = .distantPast
private static let socketListenerUnhealthyCaptureCooldown: TimeInterval = 60
private let sessionPersistenceQueue = DispatchQueue(
@ -2508,25 +2509,57 @@ final class AppDelegate: NSObject, NSApplicationDelegate, UNUserNotificationCent
private func stopSocketListenerHealthMonitor() {
socketListenerHealthTimer?.cancel()
socketListenerHealthTimer = nil
socketListenerHealthCheckInFlight = false
}
private func restartSocketListenerIfNeededForHealthCheck(source: String) {
guard let config = socketListenerConfigurationIfEnabled() else { return }
let health = TerminalController.shared.socketListenerHealth(expectedSocketPath: config.path)
guard !socketListenerHealthCheckInFlight,
let config = socketListenerConfigurationIfEnabled() else { return }
let expectedSocketPath = config.path
let terminalController = TerminalController.shared
socketListenerHealthCheckInFlight = true
Thread.detachNewThread { [weak self, expectedSocketPath, source, terminalController] in
let health = terminalController.socketListenerHealth(expectedSocketPath: expectedSocketPath)
Task { @MainActor [weak self, health] in
guard let self else { return }
self.socketListenerHealthCheckInFlight = false
self.handleSocketListenerHealthCheckResult(
health,
source: source,
expectedSocketPath: expectedSocketPath
)
}
}
}
private func handleSocketListenerHealthCheckResult(
_ health: TerminalController.SocketListenerHealth,
source: String,
expectedSocketPath: String
) {
guard let config = socketListenerConfigurationIfEnabled(),
config.path == expectedSocketPath else { return }
guard !health.isHealthy else {
lastSocketListenerUnhealthyCaptureAt = .distantPast
return
}
let failureSignals = health.failureSignals
let data: [String: Any] = [
var data: [String: Any] = [
"source": source,
"path": config.path,
"isRunning": health.isRunning ? 1 : 0,
"acceptLoopAlive": health.acceptLoopAlive ? 1 : 0,
"socketPathMatches": health.socketPathMatches ? 1 : 0,
"socketPathExists": health.socketPathExists ? 1 : 0,
"socketProbePerformed": health.socketProbePerformed ? 1 : 0,
"failureSignals": failureSignals
]
if let socketConnectable = health.socketConnectable {
data["socketConnectable"] = socketConnectable ? 1 : 0
}
if let socketConnectErrno = health.socketConnectErrno {
data["socketConnectErrno"] = Int(socketConnectErrno)
}
sentryBreadcrumb("socket.listener.unhealthy", category: "socket", data: data)
let now = Date()
if now.timeIntervalSince(lastSocketListenerUnhealthyCaptureAt) >= Self.socketListenerUnhealthyCaptureCooldown {

View file

@ -13,6 +13,9 @@ class TerminalController {
let acceptLoopAlive: Bool
let socketPathMatches: Bool
let socketPathExists: Bool
let socketProbePerformed: Bool
let socketConnectable: Bool?
let socketConnectErrno: Int32?
var failureSignals: [String] {
var signals: [String] = []
@ -20,6 +23,9 @@ class TerminalController {
if !acceptLoopAlive { signals.append("accept_loop_dead") }
if !socketPathMatches { signals.append("socket_path_mismatch") }
if !socketPathExists { signals.append("socket_missing") }
if socketProbePerformed && isRunning && acceptLoopAlive && socketPathMatches && socketPathExists && socketConnectable == false {
signals.append("socket_unreachable")
}
return signals
}
@ -51,6 +57,14 @@ class TerminalController {
private nonisolated static let acceptFailureMaxBackoffMs = 5_000
private nonisolated static let acceptFailureMinimumRearmDelayMs = 100
private nonisolated static let acceptFailureRearmThreshold = 50
private nonisolated static let socketProbePollTimeoutMs: Int32 = 100
private nonisolated static let socketProbePollAttempts = 3
private nonisolated static let socketProbePollRetryBackoffUs: useconds_t = 50_000
private nonisolated static let unixSocketPathMaxLength: Int = {
var addr = sockaddr_un()
// Reserve one byte for the null terminator.
return MemoryLayout.size(ofValue: addr.sun_path) - 1
}()
private struct ListenerStateSnapshot {
let socketPath: String
@ -508,6 +522,99 @@ class TerminalController {
return !isRunning && activeGeneration == 0
}
private nonisolated static func unixSocketAddress(path: String) -> sockaddr_un? {
var addr = sockaddr_un()
addr.sun_family = sa_family_t(AF_UNIX)
let maxLength = unixSocketPathMaxLength + 1
var didFit = false
path.withCString { source in
let sourceLength = strlen(source)
guard sourceLength < maxLength else { return }
_ = withUnsafeMutableBytes(of: &addr.sun_path) { buffer in
buffer.initializeMemory(as: UInt8.self, repeating: 0)
}
withUnsafeMutablePointer(to: &addr.sun_path) { pathPtr in
let destination = UnsafeMutableRawPointer(pathPtr).assumingMemoryBound(to: CChar.self)
strncpy(destination, source, maxLength - 1)
}
didFit = true
}
return didFit ? addr : nil
}
private nonisolated static func bindUnixSocket(_ socket: Int32, path: String) -> Int32? {
guard var addr = unixSocketAddress(path: path) else { return nil }
return withUnsafePointer(to: &addr) { ptr in
ptr.withMemoryRebound(to: sockaddr.self, capacity: 1) { sockaddrPtr in
bind(socket, sockaddrPtr, socklen_t(MemoryLayout<sockaddr_un>.size))
}
}
}
private nonisolated static func probeSocketConnectability(path: String) -> (isConnectable: Bool?, errnoCode: Int32?) {
let probeSocket = socket(AF_UNIX, SOCK_STREAM, 0)
guard probeSocket >= 0 else {
return (false, errno)
}
defer { close(probeSocket) }
let existingFlags = fcntl(probeSocket, F_GETFL, 0)
if existingFlags >= 0 {
_ = fcntl(probeSocket, F_SETFL, existingFlags | O_NONBLOCK)
}
guard var addr = unixSocketAddress(path: path) else {
return (false, ENAMETOOLONG)
}
let connectResult = withUnsafePointer(to: &addr) { ptr in
ptr.withMemoryRebound(to: sockaddr.self, capacity: 1) { sockaddrPtr in
connect(probeSocket, sockaddrPtr, socklen_t(MemoryLayout<sockaddr_un>.size))
}
}
if connectResult == 0 {
return (true, nil)
}
let connectErrno = errno
if connectErrno == EINPROGRESS {
var pollDescriptor = pollfd(fd: probeSocket, events: Int16(POLLOUT), revents: 0)
for attempt in 0..<Self.socketProbePollAttempts {
pollDescriptor.revents = 0
let pollResult = poll(&pollDescriptor, 1, Self.socketProbePollTimeoutMs)
if pollResult > 0 {
var socketError: Int32 = 0
var socketErrorLength = socklen_t(MemoryLayout<Int32>.size)
let status = getsockopt(
probeSocket,
SOL_SOCKET,
SO_ERROR,
&socketError,
&socketErrorLength
)
if status == 0 && socketError == 0 {
return (true, nil)
}
if status == 0 {
return (false, socketError)
}
return (false, errno)
}
let pollErrno = errno
if pollResult == 0 || pollErrno == EINTR {
if attempt + 1 < Self.socketProbePollAttempts {
usleep(Self.socketProbePollRetryBackoffUs)
continue
}
return (false, pollResult == 0 ? ETIMEDOUT : pollErrno)
}
return (false, pollErrno)
}
}
return (false, connectErrno)
}
func start(tabManager: TabManager, socketPath: String, accessMode: SocketControlMode) {
self.tabManager = tabManager
self.accessMode = accessMode
@ -556,19 +663,18 @@ class TerminalController {
}
// Bind to path
var addr = sockaddr_un()
addr.sun_family = sa_family_t(AF_UNIX)
socketPath.withCString { ptr in
withUnsafeMutablePointer(to: &addr.sun_path) { pathPtr in
let pathBuf = UnsafeMutableRawPointer(pathPtr).assumingMemoryBound(to: CChar.self)
strcpy(pathBuf, ptr)
}
}
let bindResult = withUnsafePointer(to: &addr) { ptr in
ptr.withMemoryRebound(to: sockaddr.self, capacity: 1) { sockaddrPtr in
bind(newServerSocket, sockaddrPtr, socklen_t(MemoryLayout<sockaddr_un>.size))
}
guard let bindResult = Self.bindUnixSocket(newServerSocket, path: socketPath) else {
close(newServerSocket)
reportSocketListenerFailure(
message: "socket.listener.start.failed",
stage: "bind_path_too_long",
errnoCode: ENAMETOOLONG,
extra: [
"pathLength": socketPath.utf8.count,
"maxPathLength": Self.unixSocketPathMaxLength
]
)
return
}
guard bindResult >= 0 else {
@ -653,12 +759,19 @@ class TerminalController {
var st = stat()
let exists = lstat(expectedSocketPath, &st) == 0 && (st.st_mode & S_IFMT) == S_IFSOCK
let shouldProbeConnection = snapshot.isRunning && snapshot.acceptLoopAlive && pathMatches && exists
let connectability = shouldProbeConnection
? Self.probeSocketConnectability(path: expectedSocketPath)
: (isConnectable: nil, errnoCode: nil)
return SocketListenerHealth(
isRunning: snapshot.isRunning,
acceptLoopAlive: snapshot.acceptLoopAlive,
socketPathMatches: pathMatches,
socketPathExists: exists
socketPathExists: exists,
socketProbePerformed: shouldProbeConnection,
socketConnectable: connectability.isConnectable,
socketConnectErrno: connectability.errnoCode
)
}

View file

@ -10732,6 +10732,7 @@ final class TerminalControllerSocketListenerHealthTests: XCTestCase {
return fd
}
@MainActor
func testSocketListenerHealthRecognizesSocketPath() throws {
let path = makeTempSocketPath()
let fd = try bindUnixSocket(at: path)
@ -10745,6 +10746,7 @@ final class TerminalControllerSocketListenerHealthTests: XCTestCase {
XCTAssertFalse(health.isHealthy)
}
@MainActor
func testSocketListenerHealthRejectsRegularFile() throws {
let path = makeTempSocketPath()
let url = URL(fileURLWithPath: path)
@ -10761,10 +10763,16 @@ final class TerminalControllerSocketListenerHealthTests: XCTestCase {
isRunning: true,
acceptLoopAlive: true,
socketPathMatches: true,
socketPathExists: true
socketPathExists: true,
socketProbePerformed: true,
socketConnectable: true,
socketConnectErrno: nil
)
XCTAssertTrue(health.isHealthy)
XCTAssertEqual(health.failureSignals, [])
XCTAssertTrue(health.failureSignals.isEmpty)
XCTAssertTrue(health.socketProbePerformed)
XCTAssertEqual(health.socketConnectable, true)
XCTAssertNil(health.socketConnectErrno)
}
func testSocketListenerHealthFailureSignalsIncludeAllDetectedProblems() {
@ -10772,9 +10780,15 @@ final class TerminalControllerSocketListenerHealthTests: XCTestCase {
isRunning: false,
acceptLoopAlive: false,
socketPathMatches: false,
socketPathExists: false
socketPathExists: false,
socketProbePerformed: false,
socketConnectable: nil,
socketConnectErrno: nil
)
XCTAssertFalse(health.isHealthy)
XCTAssertFalse(health.socketProbePerformed)
XCTAssertNil(health.socketConnectable)
XCTAssertNil(health.socketConnectErrno)
XCTAssertEqual(
health.failureSignals,
["not_running", "accept_loop_dead", "socket_path_mismatch", "socket_missing"]

View file

@ -0,0 +1,162 @@
#!/usr/bin/env python3
"""Regression guard for issue #952 (flaky CLI socket connections)."""
from __future__ import annotations
import re
import shutil
import subprocess
from pathlib import Path
def get_repo_root() -> Path:
"""Return the repository root for source inspections."""
fallback_root = Path(__file__).resolve().parents[1]
git_path = shutil.which("git")
if git_path is None:
return fallback_root
try:
result = subprocess.run(
[git_path, "rev-parse", "--show-toplevel"],
capture_output=True,
text=True,
check=False,
)
except OSError:
return fallback_root
if result.returncode == 0:
return Path(result.stdout.strip())
return fallback_root
def require(content: str, needle: str, message: str, failures: list[str], *, regex: bool = False) -> None:
"""Record a failure when a required source pattern is missing."""
matched = re.search(needle, content, re.MULTILINE) is not None if regex else needle in content
if not matched:
failures.append(message)
def collect_failures() -> list[str]:
"""Collect missing source-level guards for the socket listener recovery fix."""
repo_root = get_repo_root()
terminal_controller_path = repo_root / "Sources" / "TerminalController.swift"
app_delegate_path = repo_root / "Sources" / "AppDelegate.swift"
failures: list[str] = []
missing_paths = [
str(path) for path in [terminal_controller_path, app_delegate_path] if not path.exists()
]
if missing_paths:
for path in missing_paths:
failures.append(f"Missing expected file: {path}")
return failures
terminal_controller = terminal_controller_path.read_text(encoding="utf-8")
app_delegate = app_delegate_path.read_text(encoding="utf-8")
require(
terminal_controller,
"let socketProbePerformed: Bool",
"Socket health snapshot no longer tracks whether connectability was probed",
failures,
)
require(
terminal_controller,
"let socketConnectable: Bool?",
"Socket health snapshot no longer distinguishes unprobed vs connectable sockets",
failures,
)
require(
terminal_controller,
"let socketConnectErrno: Int32?",
"Socket health snapshot no longer preserves probe errno",
failures,
)
require(
terminal_controller,
"signals.append(\"socket_unreachable\")",
"Socket health failures no longer flag unreachable listeners",
failures,
)
require(
terminal_controller,
r"private\s+nonisolated\s+static\s+func\s+probeSocketConnectability\s*\(\s*path:\s*String\s*\)",
"Missing active socket connectability probe helper",
failures,
regex=True,
)
require(
terminal_controller,
r"connect\s*\(\s*probeSocket\s*,\s*sockaddrPtr\s*,\s*socklen_t\s*\(\s*MemoryLayout<sockaddr_un>\.size\s*\)\s*\)",
"Socket health probe no longer performs a real connect() check",
failures,
regex=True,
)
require(
terminal_controller,
"stage: \"bind_path_too_long\"",
"Socket listener start no longer reports overlong Unix socket paths",
failures,
)
require(
terminal_controller,
"Self.unixSocketPathMaxLength",
"Socket listener path-length telemetry was removed",
failures,
)
require(
app_delegate,
"private static let socketListenerHealthCheckInterval: DispatchTimeInterval = .seconds(2)",
"Socket health timer interval drifted from the fast recovery setting",
failures,
)
require(
app_delegate,
"\"socketProbePerformed\": health.socketProbePerformed ? 1 : 0",
"Health telemetry no longer records whether a connectability probe ran",
failures,
)
require(
app_delegate,
"if let socketConnectable = health.socketConnectable {",
"Health telemetry no longer gates connectability on an actual probe result",
failures,
)
require(
app_delegate,
"data[\"socketConnectable\"] = socketConnectable ? 1 : 0",
"Health telemetry no longer includes connectability when a probe ran",
failures,
)
require(
app_delegate,
"if let socketConnectErrno = health.socketConnectErrno {",
"Health telemetry no longer records connect probe errno when available",
failures,
)
return failures
def test_issue_952_socket_listener_recovery() -> None:
"""Keep the source-level recovery guards for issue #952 in CI."""
failures = collect_failures()
assert not failures, "issue #952 regression(s) detected:\n- " + "\n- ".join(failures)
def main() -> int:
"""Run the regression guard without requiring pytest to be installed."""
failures = collect_failures()
if failures:
print("FAIL: issue #952 regression(s) detected")
for failure in failures:
print(f"- {failure}")
return 1
print("PASS: issue #952 socket listener recovery guards are present")
return 0
if __name__ == "__main__":
raise SystemExit(main())