Move port scanning from shell to app-side with batching (#100)

* Move port scanning from shell to app-side with batching

Replace per-shell `ps -axo + lsof` scanning with a centralized
PortScanner singleton in the app. Each shell now sends lightweight
`report_tty` (once per session) and `ports_kick` (on preexec/precmd)
socket messages. The app coalesces kicks across all panels and runs a
single `ps -t <ttys> + lsof -p <pids>` covering every active panel.

Also fixes a macOS 26 Tahoe regression where `getsockopt(LOCAL_PEERPID)`
returns ENOTCONN on accepted sockets when the peer disconnects before
the handler thread starts. This was silently breaking ALL socket
commands sent via ncat --send-only. The fix captures the peer PID in
the accept loop immediately after accept(), and falls back to
LOCAL_PEERCRED (uid check) when the PID lookup fails.

* Fix PR review feedback: burst timing and auth comment clarity

- P2: burstDelays were accumulating (0.5+1.5+3+... = ~22.5s) instead of
  firing at absolute offsets from burst start. Now uses burstStart anchor
  so scans fire at 0.5s, 1.5s, 3s, 5s, 7.5s, 10s as intended.

- P1: Clarify LOCAL_PEERCRED fallback rationale — same security boundary
  as socket file permissions (0600), does not widen attack surface.
  Long-lived connections still get full descendant check via LOCAL_PEERPID.
This commit is contained in:
Lawrence Chen 2026-02-19 01:04:47 -08:00 committed by GitHub
parent 3193e602d4
commit 9642bb59fc
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 516 additions and 158 deletions

262
Sources/PortScanner.swift Normal file
View file

@ -0,0 +1,262 @@
import Foundation
/// Batched port scanner that replaces per-shell `ps + lsof` scanning.
///
/// Each shell sends a lightweight `report_tty` + `ports_kick` over the socket.
/// PortScanner coalesces kicks across all panels, then runs a single
/// `ps -t <ttys>` + `lsof -p <pids>` covering every panel that needs scanning.
///
/// Kick coalesce burst flow:
/// 1. `kick()` adds panel to `pendingKicks` set
/// 2. If no burst is active, starts a 200ms coalesce timer
/// 3. Coalesce fires snapshots pending set starts burst of 6 scans
/// 4. New kicks during burst merge into the active burst
/// 5. After last scan, if new kicks arrived, start a new coalesce cycle
final class PortScanner: @unchecked Sendable {
static let shared = PortScanner()
/// Callback delivers `(workspaceId, panelId, ports)` on main thread.
var onPortsUpdated: ((_ workspaceId: UUID, _ panelId: UUID, _ ports: [Int]) -> Void)?
// MARK: - State (all guarded by `queue`)
private let queue = DispatchQueue(label: "com.cmux.port-scanner", qos: .utility)
/// TTY name per (workspace, panel).
private var ttyNames: [PanelKey: String] = [:]
/// Panels that requested a scan since the last coalesce snapshot.
private var pendingKicks: Set<PanelKey> = []
/// Whether a burst sequence is currently running.
private var burstActive = false
/// Coalesce timer (200ms after first kick).
private var coalesceTimer: DispatchSourceTimer?
/// Burst scan offsets in seconds from the start of the burst.
/// Each scan fires at this absolute offset; the recursive scheduler
/// converts to relative delays between consecutive scans.
private static let burstOffsets: [Double] = [0.5, 1.5, 3, 5, 7.5, 10]
// MARK: - Public API
struct PanelKey: Hashable {
let workspaceId: UUID
let panelId: UUID
}
func registerTTY(workspaceId: UUID, panelId: UUID, ttyName: String) {
queue.async { [self] in
let key = PanelKey(workspaceId: workspaceId, panelId: panelId)
ttyNames[key] = ttyName
}
}
func unregisterPanel(workspaceId: UUID, panelId: UUID) {
queue.async { [self] in
let key = PanelKey(workspaceId: workspaceId, panelId: panelId)
ttyNames.removeValue(forKey: key)
pendingKicks.remove(key)
}
}
func kick(workspaceId: UUID, panelId: UUID) {
queue.async { [self] in
let key = PanelKey(workspaceId: workspaceId, panelId: panelId)
guard ttyNames[key] != nil else { return }
pendingKicks.insert(key)
if !burstActive {
startCoalesce()
}
// If burst is active, the next scan iteration will pick up the new kick.
}
}
// MARK: - Coalesce + Burst
private func startCoalesce() {
// Already on `queue`.
coalesceTimer?.cancel()
let timer = DispatchSource.makeTimerSource(queue: queue)
timer.schedule(deadline: .now() + 0.2)
timer.setEventHandler { [weak self] in
self?.coalesceTimerFired()
}
coalesceTimer = timer
timer.resume()
}
private func coalesceTimerFired() {
// Already on `queue`.
coalesceTimer?.cancel()
coalesceTimer = nil
guard !pendingKicks.isEmpty else { return }
burstActive = true
runBurst(index: 0)
}
private func runBurst(index: Int, burstStart: DispatchTime? = nil) {
// Already on `queue`.
guard index < Self.burstOffsets.count else {
burstActive = false
// If new kicks arrived during the burst, start a new coalesce cycle.
if !pendingKicks.isEmpty {
startCoalesce()
}
return
}
let start = burstStart ?? .now()
let deadline = start + Self.burstOffsets[index]
queue.asyncAfter(deadline: deadline) { [weak self] in
guard let self else { return }
self.runScan()
self.runBurst(index: index + 1, burstStart: start)
}
}
// MARK: - Scan
private func runScan() {
// Already on `queue`. Snapshot which panels to scan and their TTYs.
// We scan all registered panels, not just pending ones, since ports can
// appear/disappear on any panel.
let snapshot = ttyNames
guard !snapshot.isEmpty else {
pendingKicks.removeAll()
return
}
// Clear pending kicks they're accounted for in this scan.
pendingKicks.removeAll()
// Build TTY set (deduplicated).
let uniqueTTYs = Set(snapshot.values)
let ttyList = uniqueTTYs.joined(separator: ",")
// 1. ps -t tty1,tty2,... -o pid=,tty=
let pidToTTY = runPS(ttyList: ttyList)
guard !pidToTTY.isEmpty else {
// No processes on any TTY clear ports for all panels.
let results = snapshot.map { ($0.key, [Int]()) }
deliverResults(results)
return
}
// 2. lsof -nP -a -p <all_pids> -iTCP -sTCP:LISTEN -F pn
let allPids = pidToTTY.keys.sorted().map(String.init).joined(separator: ",")
let pidToPorts = runLsof(pidsCsv: allPids)
// 3. Join: PIDTTY + PIDports TTYports
var portsByTTY: [String: Set<Int>] = [:]
for (pid, ports) in pidToPorts {
guard let tty = pidToTTY[pid] else { continue }
portsByTTY[tty, default: []].formUnion(ports)
}
// 4. Map to per-panel port lists.
var results: [(PanelKey, [Int])] = []
for (key, tty) in snapshot {
let ports = portsByTTY[tty].map { Array($0).sorted() } ?? []
results.append((key, ports))
}
deliverResults(results)
}
private func deliverResults(_ results: [(PanelKey, [Int])]) {
guard let callback = onPortsUpdated else { return }
DispatchQueue.main.async {
for (key, ports) in results {
callback(key.workspaceId, key.panelId, ports)
}
}
}
// MARK: - Process helpers
private func runPS(ttyList: String) -> [Int: String] {
// `ps -t tty1,tty2,... -o pid=,tty=` targeted scan, much cheaper than -ax.
let process = Process()
let pipe = Pipe()
process.executableURL = URL(fileURLWithPath: "/bin/ps")
process.arguments = ["-t", ttyList, "-o", "pid=,tty="]
process.standardOutput = pipe
process.standardError = FileHandle.nullDevice
do {
try process.run()
} catch {
return [:]
}
let data = pipe.fileHandleForReading.readDataToEndOfFile()
process.waitUntilExit()
guard let output = String(data: data, encoding: .utf8) else { return [:] }
var mapping: [Int: String] = [:]
for line in output.split(separator: "\n") {
let parts = line.split(whereSeparator: \.isWhitespace)
guard parts.count >= 2,
let pid = Int(parts[0]) else { continue }
mapping[pid] = String(parts[1])
}
return mapping
}
private func runLsof(pidsCsv: String) -> [Int: Set<Int>] {
// `lsof -nP -a -p <pids> -iTCP -sTCP:LISTEN -F pn`
let process = Process()
let pipe = Pipe()
process.executableURL = URL(fileURLWithPath: "/usr/sbin/lsof")
process.arguments = ["-nP", "-a", "-p", pidsCsv, "-iTCP", "-sTCP:LISTEN", "-Fpn"]
process.standardOutput = pipe
process.standardError = FileHandle.nullDevice
do {
try process.run()
} catch {
return [:]
}
let data = pipe.fileHandleForReading.readDataToEndOfFile()
process.waitUntilExit()
guard let output = String(data: data, encoding: .utf8) else { return [:] }
// Parse lsof -F output: lines starting with 'p' = PID, 'n' = name (host:port).
var result: [Int: Set<Int>] = [:]
var currentPid: Int?
for line in output.split(separator: "\n") {
guard let first = line.first else { continue }
switch first {
case "p":
currentPid = Int(line.dropFirst())
case "n":
guard let pid = currentPid else { continue }
var name = String(line.dropFirst())
// Strip remote endpoint if present.
if let arrowIdx = name.range(of: "->") {
name = String(name[..<arrowIdx.lowerBound])
}
// Port is after the last colon.
if let colonIdx = name.lastIndex(of: ":") {
let portStr = name[name.index(after: colonIdx)...]
// Strip anything non-numeric.
let cleaned = portStr.prefix(while: \.isNumber)
if let port = Int(cleaned), port > 0, port <= 65535 {
result[pid, default: []].insert(port)
}
}
default:
break
}
}
return result
}
}

View file

@ -77,14 +77,26 @@ class TerminalController {
// MARK: - Process Ancestry Check
/// Get the peer PID of a connected Unix domain socket using LOCAL_PEERPID.
private func getPeerPid(_ socket: Int32) -> pid_t? {
private nonisolated func getPeerPid(_ socket: Int32) -> pid_t? {
var pid: pid_t = 0
var pidSize = socklen_t(MemoryLayout<pid_t>.size)
let result = getsockopt(socket, SOL_LOCAL, LOCAL_PEERPID, &pid, &pidSize)
guard result == 0, pid > 0 else { return nil }
if result != 0 || pid <= 0 {
return nil
}
return pid
}
/// Check if the peer has the same UID as this process using LOCAL_PEERCRED.
/// This works even after the peer has disconnected (unlike LOCAL_PEERPID).
private func peerHasSameUID(_ socket: Int32) -> Bool {
var cred = xucred()
var credLen = socklen_t(MemoryLayout<xucred>.size)
let result = getsockopt(socket, SOL_LOCAL, LOCAL_PEERCRED, &cred, &credLen)
guard result == 0 else { return false }
return cred.cr_uid == getuid()
}
/// Check if `pid` is a descendant of this process by walking the process tree.
func isDescendant(_ pid: pid_t) -> Bool {
var current = pid
@ -175,6 +187,18 @@ class TerminalController {
isRunning = true
print("TerminalController: Listening on \(socketPath)")
// Wire batched port scanner results back to workspace state.
PortScanner.shared.onPortsUpdated = { [weak self] workspaceId, panelId, ports in
MainActor.assumeIsolated {
guard let self, let tabManager = self.tabManager else { return }
guard let workspace = tabManager.tabs.first(where: { $0.id == workspaceId }) else { return }
let validSurfaceIds = Set(workspace.panels.keys)
guard validSurfaceIds.contains(panelId) else { return }
workspace.surfaceListeningPorts[panelId] = ports.isEmpty ? nil : ports
workspace.recomputeListeningPorts()
}
}
// Accept connections in background thread
Thread.detachNewThread { [weak self] in
self?.acceptLoop()
@ -223,28 +247,47 @@ class TerminalController {
consecutiveFailures = 0
// Capture peer PID immediately before the client can disconnect.
// ncat --send-only closes the connection right after writing, so by
// the time a new thread starts the peer may already be gone.
let peerPid = getPeerPid(clientSocket)
// Handle client in new thread
Thread.detachNewThread { [weak self] in
self?.handleClient(clientSocket)
self?.handleClient(clientSocket, peerPid: peerPid)
}
}
}
private func handleClient(_ socket: Int32) {
private func handleClient(_ socket: Int32, peerPid: pid_t? = nil) {
defer { close(socket) }
// In cmuxOnly mode, verify the connecting process is a descendant of cmux.
// In allowAll mode (env-var only), skip the ancestry check.
if accessMode == .cmuxOnly {
guard let peerPid = getPeerPid(socket) else {
let msg = "ERROR: Unable to verify client process\n"
msg.withCString { ptr in _ = write(socket, ptr, strlen(ptr)) }
return
// Use pre-captured peer PID if available (captured in accept loop before
// the peer can disconnect), falling back to live lookup.
let pid = peerPid ?? getPeerPid(socket)
if let pid {
guard isDescendant(pid) else {
let msg = "ERROR: Access denied — only processes started inside cmux can connect\n"
msg.withCString { ptr in _ = write(socket, ptr, strlen(ptr)) }
return
}
}
guard isDescendant(peerPid) else {
let msg = "ERROR: Access denied — only processes started inside cmux can connect\n"
msg.withCString { ptr in _ = write(socket, ptr, strlen(ptr)) }
return
// If pid is nil, LOCAL_PEERPID failed (peer disconnected before we
// could read it common with ncat --send-only). We still verify the
// peer runs as the same user via LOCAL_PEERCRED. This is the same
// security boundary as the socket file permissions (0600), so it does
// not widen the attack surface. We also require that the peer actually
// sent data (checked in the read loop below) a connect-only probe
// with no data is harmless.
if pid == nil {
guard peerHasSameUID(socket) else {
let msg = "ERROR: Unable to verify client process\n"
msg.withCString { ptr in _ = write(socket, ptr, strlen(ptr)) }
return
}
}
}
@ -403,6 +446,12 @@ class TerminalController {
case "clear_ports":
return clearPorts(args)
case "report_tty":
return reportTTY(args)
case "ports_kick":
return portsKick(args)
case "report_pwd":
return reportPwd(args)
@ -6187,6 +6236,8 @@ class TerminalController {
report_git_branch <branch> [--status=dirty] [--tab=X] - Report git branch
clear_git_branch [--tab=X] - Clear git branch
report_ports <port1> [port2...] [--tab=X] [--panel=Y] - Report listening ports
report_tty <tty_name> [--tab=X] [--panel=Y] - Register TTY for batched port scanning
ports_kick [--tab=X] [--panel=Y] - Request batched port scan for panel
report_pwd <path> [--tab=X] [--panel=Y] - Report current working directory
clear_ports [--tab=X] [--panel=Y] - Clear listening ports
sidebar_state [--tab=X] - Dump sidebar metadata
@ -9113,6 +9164,86 @@ class TerminalController {
return result
}
private func reportTTY(_ args: String) -> String {
let parsed = parseOptions(args)
guard let ttyName = parsed.positional.first, !ttyName.isEmpty else {
return "ERROR: Missing tty name — usage: report_tty <tty_name> [--tab=X] [--panel=Y]"
}
var result = "OK"
DispatchQueue.main.sync {
guard let tab = resolveTabForReport(args) else {
result = parsed.options["tab"] != nil ? "ERROR: Tab not found" : "ERROR: No tab selected"
return
}
let panelArg = parsed.options["panel"] ?? parsed.options["surface"]
let surfaceId: UUID
if let panelArg {
if panelArg.isEmpty {
result = "ERROR: Missing panel id — usage: report_tty <tty_name> [--tab=X] [--panel=Y]"
return
}
guard let parsedId = UUID(uuidString: panelArg) else {
result = "ERROR: Invalid panel id '\(panelArg)'"
return
}
surfaceId = parsedId
} else {
guard let focused = tab.focusedPanelId else {
result = "ERROR: Missing panel id (no focused surface)"
return
}
surfaceId = focused
}
let validSurfaceIds = Set(tab.panels.keys)
guard validSurfaceIds.contains(surfaceId) else {
result = "ERROR: Panel not found '\(surfaceId.uuidString)'"
return
}
tab.surfaceTTYNames[surfaceId] = ttyName
PortScanner.shared.registerTTY(workspaceId: tab.id, panelId: surfaceId, ttyName: ttyName)
}
return result
}
private func portsKick(_ args: String) -> String {
var result = "OK"
DispatchQueue.main.sync {
guard let tab = resolveTabForReport(args) else {
let parsed = parseOptions(args)
result = parsed.options["tab"] != nil ? "ERROR: Tab not found" : "ERROR: No tab selected"
return
}
let parsed = parseOptions(args)
let panelArg = parsed.options["panel"] ?? parsed.options["surface"]
let surfaceId: UUID
if let panelArg {
if panelArg.isEmpty {
result = "ERROR: Missing panel id — usage: ports_kick [--tab=X] [--panel=Y]"
return
}
guard let parsedId = UUID(uuidString: panelArg) else {
result = "ERROR: Invalid panel id '\(panelArg)'"
return
}
surfaceId = parsedId
} else {
guard let focused = tab.focusedPanelId else {
result = "ERROR: Missing panel id (no focused surface)"
return
}
surfaceId = focused
}
PortScanner.shared.kick(workspaceId: tab.id, panelId: surfaceId)
}
return result
}
private func sidebarState(_ args: String) -> String {
var result = ""
DispatchQueue.main.sync {

View file

@ -89,6 +89,7 @@ final class Workspace: Identifiable, ObservableObject {
@Published var gitBranch: SidebarGitBranchState?
@Published var surfaceListeningPorts: [UUID: [Int]] = [:]
@Published var listeningPorts: [Int] = []
var surfaceTTYNames: [UUID: String] = [:]
var focusedSurfaceId: UUID? { focusedPanelId }
var surfaceDirectories: [UUID: String] {
@ -330,6 +331,7 @@ final class Workspace: Identifiable, ObservableObject {
panelDirectories = panelDirectories.filter { validSurfaceIds.contains($0.key) }
panelTitles = panelTitles.filter { validSurfaceIds.contains($0.key) }
surfaceListeningPorts = surfaceListeningPorts.filter { validSurfaceIds.contains($0.key) }
surfaceTTYNames = surfaceTTYNames.filter { validSurfaceIds.contains($0.key) }
recomputeListeningPorts()
}
@ -1292,6 +1294,8 @@ extension Workspace: BonsplitDelegate {
panelDirectories.removeValue(forKey: panelId)
panelTitles.removeValue(forKey: panelId)
panelSubscriptions.removeValue(forKey: panelId)
surfaceTTYNames.removeValue(forKey: panelId)
PortScanner.shared.unregisterPanel(workspaceId: id, panelId: panelId)
// Keep the workspace invariant: always retain at least one real panel.
// This prevents runtime close callbacks from ever collapsing into a tabless workspace.