Expose production read-screen capture APIs

Implements capture-pane parity item from https://github.com/manaflow-ai/cmux/issues/153 by shipping production read-screen support from https://github.com/manaflow-ai/cmux/issues/152.
This commit is contained in:
Lawrence Chen 2026-02-20 16:22:33 -08:00
parent 47151f45e7
commit b163c2bbf3
4 changed files with 381 additions and 80 deletions

View file

@ -868,6 +868,43 @@ struct CMUXCLI {
print(response)
}
case "read-screen":
let (wsArg, rem0) = parseOption(commandArgs, name: "--workspace")
let (sfArg, rem1) = parseOption(rem0, name: "--surface")
let (linesArg, rem2) = parseOption(rem1, name: "--lines")
let trailing = rem2.filter { $0 != "--scrollback" }
if !trailing.isEmpty {
throw CLIError(message: "read-screen: unexpected arguments: \(trailing.joined(separator: " "))")
}
let workspaceArg = wsArg ?? (windowId == nil ? ProcessInfo.processInfo.environment["CMUX_WORKSPACE_ID"] : nil)
let surfaceArg = sfArg ?? (wsArg == nil && windowId == nil ? ProcessInfo.processInfo.environment["CMUX_SURFACE_ID"] : nil)
var params: [String: Any] = [:]
let wsId = try normalizeWorkspaceHandle(workspaceArg, client: client)
if let wsId { params["workspace_id"] = wsId }
let sfId = try normalizeSurfaceHandle(surfaceArg, client: client, workspaceHandle: wsId)
if let sfId { params["surface_id"] = sfId }
let includeScrollback = rem2.contains("--scrollback")
if includeScrollback {
params["scrollback"] = true
}
if let linesArg {
guard let lineCount = Int(linesArg), lineCount > 0 else {
throw CLIError(message: "--lines must be greater than 0")
}
params["lines"] = lineCount
params["scrollback"] = true
}
let payload = try client.sendV2(method: "surface.read_text", params: params)
if jsonOutput {
print(jsonString(payload))
} else {
print((payload["text"] as? String) ?? "")
}
case "send":
let (wsArg, rem0) = parseOption(commandArgs, name: "--workspace")
let (sfArg, rem1) = parseOption(rem0, name: "--surface")
@ -2803,6 +2840,22 @@ struct CMUXCLI {
cmux select-workspace --workspace workspace:2
cmux select-workspace --workspace 0
"""
case "read-screen":
return """
Usage: cmux read-screen [flags]
Read terminal text from a surface as plain text.
Flags:
--workspace <id|ref> Target workspace (default: $CMUX_WORKSPACE_ID)
--surface <id|ref> Target surface (default: $CMUX_SURFACE_ID)
--scrollback Include scrollback (not just visible viewport)
--lines <n> Limit to the last n lines (implies --scrollback)
Example:
cmux read-screen
cmux read-screen --surface surface:2 --scrollback --lines 200
"""
case "send":
return """
Usage: cmux send [flags] [--] <text>
@ -3463,6 +3516,7 @@ struct CMUXCLI {
close-workspace --workspace <id|ref>
select-workspace --workspace <id|ref>
current-workspace
read-screen [--workspace <id|ref>] [--surface <id|ref>] [--scrollback] [--lines <n>]
send [--workspace <id|ref>] [--surface <id|ref>] <text>
send-key [--workspace <id|ref>] [--surface <id|ref>] <key>
send-panel --panel <id|ref> [--workspace <id|ref>] <text>

View file

@ -461,6 +461,9 @@ class TerminalController {
case "reset_sidebar":
return resetSidebar(args)
case "read_screen":
return readScreenText(args)
#if DEBUG
case "set_shortcut":
@ -496,9 +499,6 @@ class TerminalController {
case "read_terminal_text":
return readTerminalText(args)
case "read_screen":
return readScreen(args)
case "render_stats":
return renderStats(args)
@ -896,6 +896,8 @@ class TerminalController {
return v2Result(id: id, self.v2BrowserInputKeyboard(params: params))
case "browser.input_touch":
return v2Result(id: id, self.v2BrowserInputTouch(params: params))
case "surface.read_text":
return v2Result(id: id, self.v2SurfaceReadText(params: params))
#if DEBUG
@ -973,6 +975,7 @@ class TerminalController {
"surface.health",
"surface.send_text",
"surface.send_key",
"surface.read_text",
"surface.trigger_flash",
"pane.list",
"pane.focus",
@ -2387,6 +2390,36 @@ class TerminalController {
return result
}
private func v2SurfaceReadText(params: [String: Any]) -> V2CallResult {
var includeScrollback = v2Bool(params, "scrollback") ?? false
let lineLimit = v2Int(params, "lines")
if let lineLimit, lineLimit <= 0 {
return .err(code: "invalid_params", message: "lines must be greater than 0", data: nil)
}
if lineLimit != nil {
includeScrollback = true
}
let response = readTerminalTextBase64(
surfaceArg: v2String(params, "surface_id") ?? "",
includeScrollback: includeScrollback,
lineLimit: lineLimit
)
guard response.hasPrefix("OK ") else {
return .err(code: "internal_error", message: response, data: nil)
}
let base64 = String(response.dropFirst(3)).trimmingCharacters(in: .whitespacesAndNewlines)
let decoded = Data(base64Encoded: base64).flatMap { String(data: $0, encoding: .utf8) }
guard let text = decoded ?? (base64.isEmpty ? "" : nil) else {
return .err(code: "internal_error", message: "Failed to decode terminal text", data: nil)
}
return .ok([
"text": text,
"base64": base64
])
}
private func v2SurfaceTriggerFlash(params: [String: Any]) -> V2CallResult {
guard let tabManager = v2ResolveTabManager(params: params) else {
return .err(code: "unavailable", message: "TabManager not available", data: nil)
@ -6193,6 +6226,161 @@ class TerminalController {
}
#endif
private struct ReadScreenOptions {
let surfaceArg: String
let includeScrollback: Bool
let lineLimit: Int?
}
private struct ReadScreenParseError: Error {
let message: String
}
private func parseReadScreenArgs(_ args: String) -> Result<ReadScreenOptions, ReadScreenParseError> {
let tokens = args
.split(whereSeparator: { $0.isWhitespace })
.map(String.init)
var surfaceArg: String?
var includeScrollback = false
var lineLimit: Int?
var idx = 0
while idx < tokens.count {
let token = tokens[idx]
switch token {
case "--scrollback":
includeScrollback = true
idx += 1
case "--lines":
guard idx + 1 < tokens.count, let parsed = Int(tokens[idx + 1]), parsed > 0 else {
return .failure(ReadScreenParseError(message: "ERROR: --lines must be greater than 0"))
}
lineLimit = parsed
includeScrollback = true
idx += 2
default:
guard surfaceArg == nil else {
return .failure(ReadScreenParseError(message: "ERROR: Usage: read_screen [id|idx] [--scrollback] [--lines <n>]"))
}
surfaceArg = token
idx += 1
}
}
return .success(
ReadScreenOptions(
surfaceArg: surfaceArg ?? "",
includeScrollback: includeScrollback,
lineLimit: lineLimit
)
)
}
private func tailTerminalLines(_ text: String, maxLines: Int) -> String {
guard maxLines > 0 else { return "" }
let lines = text.split(separator: "\n", omittingEmptySubsequences: false)
guard lines.count > maxLines else { return text }
return lines.suffix(maxLines).joined(separator: "\n")
}
private func readTerminalTextBase64(surfaceArg: String, includeScrollback: Bool = false, lineLimit: Int? = nil) -> String {
guard let tabManager = tabManager else { return "ERROR: TabManager not available" }
let trimmedSurfaceArg = surfaceArg.trimmingCharacters(in: .whitespacesAndNewlines)
var result = "ERROR: No tab selected"
DispatchQueue.main.sync {
guard let tabId = tabManager.selectedTabId,
let tab = tabManager.tabs.first(where: { $0.id == tabId }) else {
return
}
let panelId: UUID?
if trimmedSurfaceArg.isEmpty {
panelId = tab.focusedPanelId
} else {
panelId = resolveSurfaceId(from: trimmedSurfaceArg, tab: tab)
}
guard let panelId,
let terminalPanel = tab.terminalPanel(for: panelId),
let surface = terminalPanel.surface.surface else {
result = "ERROR: Terminal surface not found"
return
}
let pointTag: ghostty_point_tag_e = includeScrollback ? GHOSTTY_POINT_SCREEN : GHOSTTY_POINT_VIEWPORT
let topLeft = ghostty_point_s(
tag: pointTag,
coord: GHOSTTY_POINT_COORD_TOP_LEFT,
x: 0,
y: 0
)
let bottomRight = ghostty_point_s(
tag: pointTag,
coord: GHOSTTY_POINT_COORD_BOTTOM_RIGHT,
x: 0,
y: 0
)
var selection = ghostty_selection_s(
top_left: topLeft,
bottom_right: bottomRight,
rectangle: true
)
var text = ghostty_text_s()
guard ghostty_surface_read_text(surface, selection, &text) else {
result = "ERROR: Failed to read terminal text"
return
}
defer {
ghostty_surface_free_text(surface, &text)
}
let rawData: Data
if let ptr = text.text, text.text_len > 0 {
rawData = Data(bytes: ptr, count: Int(text.text_len))
} else {
rawData = Data()
}
var output = String(decoding: rawData, as: UTF8.self)
if let lineLimit {
output = tailTerminalLines(output, maxLines: lineLimit)
}
let base64 = output.data(using: .utf8)?.base64EncodedString() ?? ""
result = "OK \(base64)"
}
return result
}
private func readScreenText(_ args: String) -> String {
let options: ReadScreenOptions
switch parseReadScreenArgs(args) {
case .success(let parsed):
options = parsed
case .failure(let error):
return error.message
}
let response = readTerminalTextBase64(
surfaceArg: options.surfaceArg,
includeScrollback: options.includeScrollback,
lineLimit: options.lineLimit
)
guard response.hasPrefix("OK ") else { return response }
let payload = String(response.dropFirst(3)).trimmingCharacters(in: .whitespacesAndNewlines)
if payload.isEmpty {
return ""
}
guard let data = Data(base64Encoded: payload) else {
return "ERROR: Failed to decode terminal text"
}
return String(decoding: data, as: UTF8.self)
}
private func helpText() -> String {
var text = """
Hierarchy: Workspace (sidebar tab) > Pane (split region) > Surface (nested tab) > Panel (terminal/browser)
@ -6225,6 +6413,7 @@ class TerminalController {
send_key <key> - Send special key (ctrl-c, ctrl-d, enter, tab, escape)
send_surface <id|idx> <text> - Send text to a specific terminal
send_key_surface <id|idx> <key> - Send special key to a specific terminal
read_screen [id|idx] [--scrollback] [--lines N] - Read terminal text (plain text)
Notification commands:
notify <title>|<subtitle>|<body> - Notify focused panel
@ -6282,7 +6471,6 @@ class TerminalController {
activate_app - Bring app + main window to front (test-only)
is_terminal_focused <id|idx> - Return true/false if terminal surface is first responder (test-only)
read_terminal_text [id|idx] - Read visible terminal text (base64, test-only)
read_screen [id|idx] - Read visible terminal text (plain text, legacy test-only)
render_stats [id|idx] - Read terminal render stats (draw counters, test-only)
layout_debug - Dump bonsplit layout + selected panel bounds (test-only)
bonsplit_underflow_count - Count bonsplit arranged-subview underflow events (test-only)
@ -6638,82 +6826,7 @@ class TerminalController {
}
private func readTerminalText(_ args: String) -> String {
guard let tabManager = tabManager else { return "ERROR: TabManager not available" }
let panelArg = args.trimmingCharacters(in: .whitespacesAndNewlines)
var result = "ERROR: No tab selected"
DispatchQueue.main.sync {
guard let tabId = tabManager.selectedTabId,
let tab = tabManager.tabs.first(where: { $0.id == tabId }) else {
return
}
let panelId: UUID?
if panelArg.isEmpty {
panelId = tab.focusedPanelId
} else {
panelId = resolveSurfaceId(from: panelArg, tab: tab)
}
guard let panelId,
let terminalPanel = tab.terminalPanel(for: panelId),
let surface = terminalPanel.surface.surface else {
result = "ERROR: Terminal surface not found"
return
}
var selection = ghostty_selection_s(
top_left: ghostty_point_s(
tag: GHOSTTY_POINT_VIEWPORT,
coord: GHOSTTY_POINT_COORD_TOP_LEFT,
x: 0,
y: 0
),
bottom_right: ghostty_point_s(
tag: GHOSTTY_POINT_VIEWPORT,
coord: GHOSTTY_POINT_COORD_BOTTOM_RIGHT,
x: 0,
y: 0
),
rectangle: true
)
var text = ghostty_text_s()
guard ghostty_surface_read_text(surface, selection, &text) else {
result = "ERROR: Failed to read terminal text"
return
}
defer {
ghostty_surface_free_text(surface, &text)
}
let b64: String
if let ptr = text.text, text.text_len > 0 {
b64 = Data(bytes: ptr, count: Int(text.text_len)).base64EncodedString()
} else {
b64 = ""
}
result = "OK \(b64)"
}
return result
}
private func readScreen(_ args: String) -> String {
let response = readTerminalText(args)
guard response.hasPrefix("OK ") else { return response }
let payload = String(response.dropFirst(3)).trimmingCharacters(in: .whitespacesAndNewlines)
if payload.isEmpty {
return ""
}
guard let data = Data(base64Encoded: payload),
let text = String(data: data, encoding: .utf8) else {
return "ERROR: Failed to decode terminal text"
}
return text
readTerminalTextBase64(surfaceArg: args)
}
private struct RenderStatsResponse: Codable {

View file

@ -830,6 +830,18 @@ class cmux:
if panel is not None:
sid = self._resolve_surface_id(panel)
params["surface_id"] = sid
try:
res = self._call("surface.read_text", params) or {}
if "text" in res:
return str(res.get("text") or "")
b64 = str(res.get("base64") or "")
raw = base64.b64decode(b64) if b64 else b""
return raw.decode("utf-8", errors="replace")
except cmuxError as exc:
# Back-compat for older builds that only expose the debug method.
if "method_not_found" not in str(exc):
raise
res = self._call("debug.terminal.read_text", params) or {}
b64 = str(res.get("base64") or "")
raw = base64.b64decode(b64) if b64 else b""

View file

@ -0,0 +1,122 @@
#!/usr/bin/env python3
"""Regression: capture-pane parity via production read-screen APIs."""
import glob
import json
import os
import subprocess
import sys
import time
from pathlib import Path
from typing import Callable, List
sys.path.insert(0, str(Path(__file__).parent))
from cmux import cmux, cmuxError
SOCKET_PATH = os.environ.get("CMUX_SOCKET", "/tmp/cmux-debug.sock")
def _must(cond: bool, msg: str) -> None:
if not cond:
raise cmuxError(msg)
def _wait_for(pred: Callable[[], bool], timeout_s: float = 5.0, step_s: float = 0.05) -> None:
start = time.time()
while time.time() - start < timeout_s:
if pred():
return
time.sleep(step_s)
raise cmuxError("Timed out waiting for condition")
def _find_cli_binary() -> str:
env_cli = os.environ.get("CMUXTERM_CLI")
if env_cli and os.path.isfile(env_cli) and os.access(env_cli, os.X_OK):
return env_cli
fixed = os.path.expanduser("~/Library/Developer/Xcode/DerivedData/cmux-tests-v2/Build/Products/Debug/cmux")
if os.path.isfile(fixed) and os.access(fixed, os.X_OK):
return fixed
candidates = glob.glob(os.path.expanduser("~/Library/Developer/Xcode/DerivedData/**/Build/Products/Debug/cmux"), recursive=True)
candidates += glob.glob("/tmp/cmux-*/Build/Products/Debug/cmux")
candidates = [p for p in candidates if os.path.isfile(p) and os.access(p, os.X_OK)]
if not candidates:
raise cmuxError("Could not locate cmux CLI binary; set CMUXTERM_CLI")
candidates.sort(key=lambda p: os.path.getmtime(p), reverse=True)
return candidates[0]
def _run_cli(cli: str, args: List[str]) -> str:
cmd = [cli, "--socket", SOCKET_PATH] + args
proc = subprocess.run(cmd, capture_output=True, text=True, check=False)
if proc.returncode != 0:
merged = f"{proc.stdout}\n{proc.stderr}".strip()
raise cmuxError(f"CLI failed ({' '.join(cmd)}): {merged}")
return proc.stdout
def main() -> int:
cli = _find_cli_binary()
with cmux(SOCKET_PATH) as c:
caps = c.capabilities() or {}
methods = set(caps.get("methods") or [])
_must("surface.read_text" in methods, f"Missing surface.read_text in capabilities: {sorted(methods)[:20]}")
created = c._call("workspace.create") or {}
ws_id = str(created.get("workspace_id") or "")
_must(bool(ws_id), f"workspace.create returned no workspace_id: {created}")
c._call("workspace.select", {"workspace_id": ws_id})
surfaces_payload = c._call("surface.list", {"workspace_id": ws_id}) or {}
surfaces = surfaces_payload.get("surfaces") or []
_must(bool(surfaces), f"Expected at least one surface in workspace: {surfaces_payload}")
surface_id = str(surfaces[0].get("id") or "")
_must(bool(surface_id), f"surface.list returned surface without id: {surfaces_payload}")
token = f"CMUX_READ_SCREEN_{int(time.time() * 1000)}"
c._call("surface.send_text", {
"workspace_id": ws_id,
"surface_id": surface_id,
"text": f"echo {token}\n",
})
def has_token() -> bool:
payload = c._call("surface.read_text", {"workspace_id": ws_id, "surface_id": surface_id}) or {}
return token in str(payload.get("text") or "")
_wait_for(has_token, timeout_s=5.0)
read_payload = c._call("surface.read_text", {"workspace_id": ws_id, "surface_id": surface_id}) or {}
text = str(read_payload.get("text") or "")
_must(token in text, f"surface.read_text missing token {token!r}: {read_payload}")
cli_text = _run_cli(cli, ["read-screen", "--workspace", ws_id, "--surface", surface_id])
_must(token in cli_text, f"cmux read-screen output missing token {token!r}: {cli_text!r}")
cli_text_scrollback = _run_cli(cli, ["read-screen", "--workspace", ws_id, "--surface", surface_id, "--scrollback", "--lines", "80"])
_must(token in cli_text_scrollback, f"cmux read-screen --scrollback output missing token {token!r}: {cli_text_scrollback!r}")
cli_json = _run_cli(cli, ["--json", "read-screen", "--workspace", ws_id, "--surface", surface_id])
payload = json.loads(cli_json or "{}")
_must(token in str(payload.get("text") or ""), f"cmux --json read-screen missing token {token!r}: {payload}")
invalid = subprocess.run(
[cli, "--socket", SOCKET_PATH, "read-screen", "--workspace", ws_id, "--surface", surface_id, "--lines", "0"],
capture_output=True,
text=True,
check=False,
)
invalid_output = f"{invalid.stdout}\n{invalid.stderr}"
_must(invalid.returncode != 0, "Expected read-screen --lines 0 to fail")
_must("--lines must be greater than 0" in invalid_output, f"Unexpected error for --lines 0: {invalid_output!r}")
print("PASS: production read-screen APIs expose capture-pane behavior")
return 0
if __name__ == "__main__":
raise SystemExit(main())