* Fallback stable socket listener to user socket path * Move stable socket path out of /tmp * Keep socket health checks active on fallback paths --------- Co-authored-by: Lawrence Chen <lawrencecchen@users.noreply.github.com>
1085 lines
40 KiB
Python
Executable file
1085 lines
40 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
"""cmux v2 Python Client
|
|
|
|
A client library for programmatically controlling cmux via the Unix socket.
|
|
|
|
This client speaks the v2 JSON line protocol (one JSON request/response per line).
|
|
It intentionally mirrors the existing v1 Python client's convenience API so the
|
|
existing test suite can be ported with minimal churn.
|
|
|
|
Protocol:
|
|
Request: {"id": 1, "method": "surface.list", "params": {..}}
|
|
Response: {"id": 1, "ok": true, "result": {...}}
|
|
|
|
Notes:
|
|
- v2 uses stable UUID handles for workspaces/panes/surfaces.
|
|
- For test convenience, this client accepts integer indexes for many methods and
|
|
resolves them to IDs using list calls.
|
|
"""
|
|
|
|
import base64
|
|
import errno
|
|
import glob
|
|
import json
|
|
import os
|
|
import select
|
|
import socket
|
|
import time
|
|
import uuid
|
|
from typing import Any, Dict, List, Optional, Tuple, Union
|
|
|
|
|
|
class cmuxError(Exception):
|
|
"""Exception raised for cmux errors."""
|
|
|
|
|
|
_APP_SUPPORT_DIR = os.path.expanduser("~/Library/Application Support/cmux")
|
|
_STABLE_SOCKET_PATH = os.path.join(_APP_SUPPORT_DIR, "cmux.sock")
|
|
_LEGACY_STABLE_SOCKET_PATH = "/tmp/cmux.sock"
|
|
_LAST_SOCKET_PATH_FILES = [
|
|
os.path.join(_APP_SUPPORT_DIR, "last-socket-path"),
|
|
"/tmp/cmux-last-socket-path",
|
|
]
|
|
|
|
|
|
def _read_last_socket_path() -> Optional[str]:
|
|
for marker_path in _LAST_SOCKET_PATH_FILES:
|
|
try:
|
|
with open(marker_path, "r", encoding="utf-8") as f:
|
|
path = f.read().strip()
|
|
if path:
|
|
return path
|
|
except OSError:
|
|
continue
|
|
return None
|
|
|
|
|
|
def _default_socket_path() -> str:
|
|
# Backwards/forward compatibility: some scripts export CMUX_SOCKET,
|
|
# while the client historically used CMUX_SOCKET_PATH.
|
|
override = os.environ.get("CMUX_SOCKET_PATH") or os.environ.get("CMUX_SOCKET")
|
|
if override:
|
|
if os.path.exists(override):
|
|
return override
|
|
if override not in {_STABLE_SOCKET_PATH, _LEGACY_STABLE_SOCKET_PATH}:
|
|
return override
|
|
|
|
last_socket = _read_last_socket_path()
|
|
if last_socket and os.path.exists(last_socket):
|
|
return last_socket
|
|
|
|
candidates = ["/tmp/cmux-debug.sock", _STABLE_SOCKET_PATH, _LEGACY_STABLE_SOCKET_PATH]
|
|
for path in candidates:
|
|
if os.path.exists(path):
|
|
return path
|
|
|
|
discovered = glob.glob("/tmp/cmux-debug-*.sock")
|
|
discovered.extend(glob.glob(os.path.join(_APP_SUPPORT_DIR, "cmux*.sock")))
|
|
discovered = [path for path in discovered if os.path.exists(path)]
|
|
if discovered:
|
|
discovered.sort(key=os.path.getmtime, reverse=True)
|
|
return discovered[0]
|
|
|
|
return candidates[0]
|
|
|
|
|
|
def _looks_like_uuid(s: str) -> bool:
|
|
try:
|
|
uuid.UUID(s)
|
|
return True
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
def _looks_like_ref(s: str, kind: Optional[str] = None) -> bool:
|
|
parts = s.split(":", 1)
|
|
if len(parts) != 2:
|
|
return False
|
|
ref_kind, ordinal = parts[0].strip().lower(), parts[1].strip()
|
|
if kind is not None and ref_kind != kind:
|
|
return False
|
|
if ref_kind not in {"window", "workspace", "pane", "surface"}:
|
|
return False
|
|
return ordinal.isdigit()
|
|
|
|
|
|
def _unescape_backslash_controls(s: str) -> str:
|
|
"""Interpret \n/\r/\t/\\ sequences in a string.
|
|
|
|
v2 can carry raw newlines via JSON, but a lot of existing callsites use
|
|
backslash escapes (because v1 was line-oriented). This keeps the API
|
|
ergonomic for tests and scripts.
|
|
"""
|
|
|
|
out: List[str] = []
|
|
i = 0
|
|
while i < len(s):
|
|
ch = s[i]
|
|
if ch != "\\" or i + 1 >= len(s):
|
|
out.append(ch)
|
|
i += 1
|
|
continue
|
|
|
|
nxt = s[i + 1]
|
|
if nxt == "n":
|
|
out.append("\n")
|
|
i += 2
|
|
elif nxt == "r":
|
|
out.append("\r")
|
|
i += 2
|
|
elif nxt == "t":
|
|
out.append("\t")
|
|
i += 2
|
|
elif nxt == "\\":
|
|
out.append("\\")
|
|
i += 2
|
|
else:
|
|
# Preserve unknown escapes literally.
|
|
out.append(ch)
|
|
i += 1
|
|
return "".join(out)
|
|
|
|
|
|
class cmux:
|
|
"""Client for controlling cmux via the v2 JSON Unix socket."""
|
|
|
|
DEFAULT_SOCKET_PATH = _default_socket_path()
|
|
|
|
def __init__(self, socket_path: str = None):
|
|
self.socket_path = socket_path or self.DEFAULT_SOCKET_PATH
|
|
self._socket: Optional[socket.socket] = None
|
|
self._recv_buffer: str = ""
|
|
self._next_id: int = 1
|
|
|
|
# ---------------------------------------------------------------------
|
|
# Connection
|
|
# ---------------------------------------------------------------------
|
|
|
|
def connect(self) -> None:
|
|
if self._socket is not None:
|
|
return
|
|
|
|
start = time.time()
|
|
while not os.path.exists(self.socket_path):
|
|
if time.time() - start >= 10.0:
|
|
raise cmuxError(
|
|
f"Socket not found at {self.socket_path}. Is cmux running?"
|
|
)
|
|
time.sleep(0.1)
|
|
|
|
last_error: Optional[socket.error] = None
|
|
while True:
|
|
self._socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
|
try:
|
|
self._socket.connect(self.socket_path)
|
|
self._socket.settimeout(10.0)
|
|
return
|
|
except socket.error as e:
|
|
last_error = e
|
|
try:
|
|
self._socket.close()
|
|
except Exception:
|
|
pass
|
|
self._socket = None
|
|
if e.errno in (errno.ECONNREFUSED, errno.ENOENT) and time.time() - start < 10.0:
|
|
time.sleep(0.1)
|
|
continue
|
|
raise cmuxError(f"Failed to connect: {e}")
|
|
|
|
def close(self) -> None:
|
|
if self._socket is not None:
|
|
try:
|
|
self._socket.close()
|
|
finally:
|
|
self._socket = None
|
|
|
|
def __enter__(self):
|
|
self.connect()
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
self.close()
|
|
return False
|
|
|
|
# ---------------------------------------------------------------------
|
|
# Low-level protocol
|
|
# ---------------------------------------------------------------------
|
|
|
|
def _recv_line(self, timeout_s: float = 20.0) -> str:
|
|
if self._socket is None:
|
|
raise cmuxError("Not connected")
|
|
|
|
if "\n" in self._recv_buffer:
|
|
line, rest = self._recv_buffer.split("\n", 1)
|
|
self._recv_buffer = rest
|
|
return line
|
|
|
|
deadline = time.time() + timeout_s
|
|
while time.time() < deadline:
|
|
remaining = max(0.0, deadline - time.time())
|
|
ready, _, _ = select.select([self._socket], [], [], min(0.2, remaining))
|
|
if not ready:
|
|
continue
|
|
|
|
chunk = self._socket.recv(8192)
|
|
if not chunk:
|
|
raise cmuxError("Socket closed")
|
|
self._recv_buffer += chunk.decode("utf-8", errors="replace")
|
|
|
|
if "\n" in self._recv_buffer:
|
|
line, rest = self._recv_buffer.split("\n", 1)
|
|
self._recv_buffer = rest
|
|
return line
|
|
|
|
raise cmuxError("Timed out waiting for response")
|
|
|
|
def _call(self, method: str, params: Optional[Dict[str, Any]] = None, timeout_s: float = 20.0) -> Any:
|
|
if self._socket is None:
|
|
raise cmuxError("Not connected")
|
|
|
|
req_id = self._next_id
|
|
self._next_id += 1
|
|
|
|
payload = {
|
|
"id": req_id,
|
|
"method": method,
|
|
"params": params or {},
|
|
}
|
|
line = json.dumps(payload, separators=(",", ":")) + "\n"
|
|
self._socket.sendall(line.encode("utf-8"))
|
|
|
|
resp_line = self._recv_line(timeout_s=timeout_s)
|
|
try:
|
|
resp = json.loads(resp_line)
|
|
except json.JSONDecodeError as e:
|
|
raise cmuxError(f"Invalid JSON response: {e}: {resp_line[:200]}")
|
|
|
|
if not isinstance(resp, dict):
|
|
raise cmuxError(f"Invalid response type: {type(resp).__name__}")
|
|
|
|
if resp.get("id") != req_id:
|
|
raise cmuxError(f"Mismatched response id: expected {req_id}, got {resp.get('id')}")
|
|
|
|
if resp.get("ok") is True:
|
|
return resp.get("result")
|
|
|
|
err = resp.get("error") or {}
|
|
code = err.get("code") or "error"
|
|
msg = err.get("message") or "Unknown error"
|
|
data = err.get("data")
|
|
if data is not None:
|
|
raise cmuxError(f"{code}: {msg} ({data})")
|
|
raise cmuxError(f"{code}: {msg}")
|
|
|
|
# ---------------------------------------------------------------------
|
|
# ID resolution helpers (index -> id)
|
|
# ---------------------------------------------------------------------
|
|
|
|
def _resolve_workspace_id(self, workspace: Union[str, int, None]) -> Optional[str]:
|
|
if workspace is None:
|
|
res = self._call("workspace.current")
|
|
wsid = (res or {}).get("workspace_id")
|
|
if not wsid:
|
|
raise cmuxError("No workspace selected")
|
|
return str(wsid)
|
|
|
|
if isinstance(workspace, int):
|
|
items = (self._call("workspace.list") or {}).get("workspaces") or []
|
|
for row in items:
|
|
if int(row.get("index", -1)) == workspace:
|
|
return str(row.get("id"))
|
|
raise cmuxError(f"Workspace index not found: {workspace}")
|
|
|
|
s = str(workspace).strip()
|
|
if not s:
|
|
return None
|
|
if s.isdigit():
|
|
return self._resolve_workspace_id(int(s))
|
|
if _looks_like_ref(s, "workspace"):
|
|
return s
|
|
if not _looks_like_uuid(s):
|
|
raise cmuxError(f"Invalid workspace id: {s}")
|
|
return s
|
|
|
|
def _resolve_surface_id(self, surface: Union[str, int, None], workspace_id: Optional[str] = None) -> Optional[str]:
|
|
if surface is None:
|
|
# Try fast-path via identify.
|
|
ident = self._call("system.identify")
|
|
focused = (ident or {}).get("focused") or {}
|
|
sid = focused.get("surface_id") if isinstance(focused, dict) else None
|
|
return None if sid in (None, "", {}) else str(sid)
|
|
|
|
if isinstance(surface, int):
|
|
params: Dict[str, Any] = {}
|
|
if workspace_id:
|
|
params["workspace_id"] = workspace_id
|
|
items = (self._call("surface.list", params) or {}).get("surfaces") or []
|
|
for row in items:
|
|
if int(row.get("index", -1)) == surface:
|
|
return str(row.get("id"))
|
|
raise cmuxError(f"Surface index not found: {surface}")
|
|
|
|
s = str(surface).strip()
|
|
if not s:
|
|
return None
|
|
if s.isdigit():
|
|
return self._resolve_surface_id(int(s), workspace_id=workspace_id)
|
|
if _looks_like_ref(s, "surface"):
|
|
return s
|
|
if not _looks_like_uuid(s):
|
|
raise cmuxError(f"Invalid surface id: {s}")
|
|
return s
|
|
|
|
def _resolve_pane_id(self, pane: Union[str, int, None], workspace_id: Optional[str] = None) -> Optional[str]:
|
|
if pane is None:
|
|
ident = self._call("system.identify")
|
|
focused = (ident or {}).get("focused") or {}
|
|
pid = focused.get("pane_id") if isinstance(focused, dict) else None
|
|
return None if pid in (None, "", {}) else str(pid)
|
|
|
|
if isinstance(pane, int):
|
|
params: Dict[str, Any] = {}
|
|
if workspace_id:
|
|
params["workspace_id"] = workspace_id
|
|
items = (self._call("pane.list", params) or {}).get("panes") or []
|
|
for row in items:
|
|
if int(row.get("index", -1)) == pane:
|
|
return str(row.get("id"))
|
|
raise cmuxError(f"Pane index not found: {pane}")
|
|
|
|
s = str(pane).strip()
|
|
if not s:
|
|
return None
|
|
if s.isdigit():
|
|
return self._resolve_pane_id(int(s), workspace_id=workspace_id)
|
|
if _looks_like_ref(s, "pane"):
|
|
return s
|
|
if not _looks_like_uuid(s):
|
|
raise cmuxError(f"Invalid pane id: {s}")
|
|
return s
|
|
|
|
# ---------------------------------------------------------------------
|
|
# System
|
|
# ---------------------------------------------------------------------
|
|
|
|
def ping(self) -> bool:
|
|
res = self._call("system.ping")
|
|
return bool((res or {}).get("pong"))
|
|
|
|
def capabilities(self) -> dict:
|
|
return dict(self._call("system.capabilities") or {})
|
|
|
|
def identify(self, caller: Optional[dict] = None) -> dict:
|
|
params: Dict[str, Any] = {}
|
|
if caller is not None:
|
|
params["caller"] = caller
|
|
return dict(self._call("system.identify", params) or {})
|
|
|
|
# ---------------------------------------------------------------------
|
|
# Windows
|
|
# ---------------------------------------------------------------------
|
|
|
|
def list_windows(self) -> List[dict]:
|
|
res = self._call("window.list") or {}
|
|
return list(res.get("windows") or [])
|
|
|
|
def current_window(self) -> str:
|
|
res = self._call("window.current") or {}
|
|
wid = res.get("window_id")
|
|
if not wid:
|
|
raise cmuxError(f"window.current returned no window_id: {res}")
|
|
return str(wid)
|
|
|
|
def new_window(self) -> str:
|
|
res = self._call("window.create") or {}
|
|
wid = res.get("window_id")
|
|
if not wid:
|
|
raise cmuxError(f"window.create returned no window_id: {res}")
|
|
return str(wid)
|
|
|
|
def focus_window(self, window_id: str) -> None:
|
|
self._call("window.focus", {"window_id": str(window_id)})
|
|
|
|
def close_window(self, window_id: str) -> None:
|
|
self._call("window.close", {"window_id": str(window_id)})
|
|
|
|
# ---------------------------------------------------------------------
|
|
# Workspaces
|
|
# ---------------------------------------------------------------------
|
|
|
|
def list_workspaces(self, window_id: Optional[str] = None) -> List[Tuple[int, str, str, bool]]:
|
|
params: Dict[str, Any] = {}
|
|
if window_id is not None:
|
|
params["window_id"] = str(window_id)
|
|
res = self._call("workspace.list", params) or {}
|
|
out: List[Tuple[int, str, str, bool]] = []
|
|
for row in res.get("workspaces") or []:
|
|
out.append((
|
|
int(row.get("index", 0)),
|
|
str(row.get("id")),
|
|
str(row.get("title", "")),
|
|
bool(row.get("selected", False)),
|
|
))
|
|
return out
|
|
|
|
def new_workspace(self, window_id: Optional[str] = None) -> str:
|
|
params: Dict[str, Any] = {}
|
|
if window_id is not None:
|
|
params["window_id"] = str(window_id)
|
|
res = self._call("workspace.create", params) or {}
|
|
wsid = res.get("workspace_id")
|
|
if not wsid:
|
|
raise cmuxError(f"workspace.create returned no workspace_id: {res}")
|
|
return str(wsid)
|
|
|
|
def select_workspace(self, workspace: Union[str, int]) -> None:
|
|
wsid = self._resolve_workspace_id(workspace)
|
|
self._call("workspace.select", {"workspace_id": wsid})
|
|
|
|
def rename_workspace(self, title: str, workspace: Union[str, int, None] = None) -> None:
|
|
renamed = str(title).strip()
|
|
if not renamed:
|
|
raise cmuxError("rename_workspace requires a non-empty title")
|
|
wsid = self._resolve_workspace_id(workspace)
|
|
params: Dict[str, Any] = {"title": renamed}
|
|
if wsid:
|
|
params["workspace_id"] = wsid
|
|
self._call("workspace.rename", params)
|
|
|
|
def current_workspace(self) -> str:
|
|
wsid = self._resolve_workspace_id(None)
|
|
if not wsid:
|
|
raise cmuxError("No current workspace")
|
|
return wsid
|
|
|
|
def next_workspace(self) -> str:
|
|
res = self._call("workspace.next") or {}
|
|
wsid = res.get("workspace_id")
|
|
if not wsid:
|
|
raise cmuxError(f"workspace.next returned no workspace_id: {res}")
|
|
return str(wsid)
|
|
|
|
def previous_workspace(self) -> str:
|
|
res = self._call("workspace.previous") or {}
|
|
wsid = res.get("workspace_id")
|
|
if not wsid:
|
|
raise cmuxError(f"workspace.previous returned no workspace_id: {res}")
|
|
return str(wsid)
|
|
|
|
def last_workspace(self) -> str:
|
|
res = self._call("workspace.last") or {}
|
|
wsid = res.get("workspace_id")
|
|
if not wsid:
|
|
raise cmuxError(f"workspace.last returned no workspace_id: {res}")
|
|
return str(wsid)
|
|
|
|
def move_workspace_to_window(self, workspace: Union[str, int], window_id: str, focus: bool = True) -> None:
|
|
wsid = self._resolve_workspace_id(workspace)
|
|
self._call(
|
|
"workspace.move_to_window",
|
|
{"workspace_id": wsid, "window_id": str(window_id), "focus": bool(focus)},
|
|
)
|
|
|
|
def reorder_workspace(
|
|
self,
|
|
workspace: Union[str, int],
|
|
*,
|
|
index: Optional[int] = None,
|
|
before_workspace: Union[str, int, None] = None,
|
|
after_workspace: Union[str, int, None] = None,
|
|
window_id: Optional[str] = None,
|
|
) -> None:
|
|
wsid = self._resolve_workspace_id(workspace)
|
|
params: Dict[str, Any] = {"workspace_id": wsid}
|
|
|
|
targets = 0
|
|
if index is not None:
|
|
params["index"] = int(index)
|
|
targets += 1
|
|
if before_workspace is not None:
|
|
params["before_workspace_id"] = self._resolve_workspace_id(before_workspace)
|
|
targets += 1
|
|
if after_workspace is not None:
|
|
params["after_workspace_id"] = self._resolve_workspace_id(after_workspace)
|
|
targets += 1
|
|
if targets != 1:
|
|
raise cmuxError("reorder_workspace requires exactly one target: index|before_workspace|after_workspace")
|
|
|
|
if window_id is not None:
|
|
params["window_id"] = str(window_id)
|
|
|
|
self._call("workspace.reorder", params)
|
|
|
|
def close_workspace(self, workspace_id: str) -> None:
|
|
wsid = self._resolve_workspace_id(workspace_id)
|
|
self._call("workspace.close", {"workspace_id": wsid})
|
|
|
|
# Backwards-compatible aliases
|
|
def list_tabs(self) -> List[Tuple[int, str, str, bool]]:
|
|
return self.list_workspaces()
|
|
|
|
def new_tab(self) -> str:
|
|
return self.new_workspace()
|
|
|
|
def close_tab(self, workspace_id: str) -> None:
|
|
return self.close_workspace(workspace_id)
|
|
|
|
def select_tab(self, workspace: Union[str, int]) -> None:
|
|
return self.select_workspace(workspace)
|
|
|
|
def current_tab(self) -> str:
|
|
return self.current_workspace()
|
|
|
|
# ---------------------------------------------------------------------
|
|
# Surfaces / panes
|
|
# ---------------------------------------------------------------------
|
|
|
|
def list_surfaces(self, workspace: Union[str, int, None] = None) -> List[Tuple[int, str, bool]]:
|
|
params: Dict[str, Any] = {}
|
|
if workspace is not None:
|
|
wsid = self._resolve_workspace_id(workspace)
|
|
params["workspace_id"] = wsid
|
|
res = self._call("surface.list", params) or {}
|
|
out: List[Tuple[int, str, bool]] = []
|
|
for row in res.get("surfaces") or []:
|
|
out.append((
|
|
int(row.get("index", 0)),
|
|
str(row.get("id")),
|
|
bool(row.get("focused", False)),
|
|
))
|
|
return out
|
|
|
|
def focus_surface(self, surface: Union[str, int]) -> None:
|
|
sid = self._resolve_surface_id(surface)
|
|
if not sid:
|
|
raise cmuxError(f"Invalid surface: {surface!r}")
|
|
self._call("surface.focus", {"surface_id": sid})
|
|
|
|
def focus_surface_by_panel(self, surface_id: str) -> None:
|
|
# In v2, surface_id is the panel UUID.
|
|
self.focus_surface(surface_id)
|
|
|
|
def new_split(self, direction: str) -> str:
|
|
res = self._call("surface.split", {"direction": direction}) or {}
|
|
sid = res.get("surface_id")
|
|
if not sid:
|
|
raise cmuxError(f"surface.split returned no surface_id: {res}")
|
|
return str(sid)
|
|
|
|
def drag_surface_to_split(self, surface: Union[str, int], direction: str) -> None:
|
|
sid = self._resolve_surface_id(surface)
|
|
if not sid:
|
|
raise cmuxError(f"Invalid surface: {surface!r}")
|
|
self._call("surface.drag_to_split", {"surface_id": sid, "direction": direction})
|
|
|
|
def new_pane(self, direction: str = "right", panel_type: str = "terminal", url: str = None) -> str:
|
|
params: Dict[str, Any] = {"direction": direction, "type": panel_type}
|
|
if url:
|
|
params["url"] = url
|
|
res = self._call("pane.create", params) or {}
|
|
sid = res.get("surface_id")
|
|
if not sid:
|
|
raise cmuxError(f"pane.create returned no surface_id: {res}")
|
|
return str(sid)
|
|
|
|
def new_surface(self, pane: Union[str, int, None] = None, panel_type: str = "terminal", url: str = None) -> str:
|
|
params: Dict[str, Any] = {"type": panel_type}
|
|
if pane is not None:
|
|
pid = self._resolve_pane_id(pane)
|
|
if not pid:
|
|
raise cmuxError(f"Invalid pane: {pane!r}")
|
|
params["pane_id"] = pid
|
|
if url:
|
|
params["url"] = url
|
|
res = self._call("surface.create", params) or {}
|
|
sid = res.get("surface_id")
|
|
if not sid:
|
|
raise cmuxError(f"surface.create returned no surface_id: {res}")
|
|
return str(sid)
|
|
|
|
def close_surface(self, surface: Union[str, int, None] = None) -> None:
|
|
params: Dict[str, Any] = {}
|
|
if surface is not None:
|
|
sid = self._resolve_surface_id(surface)
|
|
if not sid:
|
|
raise cmuxError(f"Invalid surface: {surface!r}")
|
|
params["surface_id"] = sid
|
|
self._call("surface.close", params)
|
|
|
|
def move_surface(
|
|
self,
|
|
surface: Union[str, int],
|
|
*,
|
|
pane: Union[str, int, None] = None,
|
|
workspace: Union[str, int, None] = None,
|
|
window_id: Optional[str] = None,
|
|
before_surface: Union[str, int, None] = None,
|
|
after_surface: Union[str, int, None] = None,
|
|
index: Optional[int] = None,
|
|
focus: bool = True,
|
|
) -> None:
|
|
sid = self._resolve_surface_id(surface)
|
|
if not sid:
|
|
raise cmuxError(f"Invalid surface: {surface!r}")
|
|
|
|
params: Dict[str, Any] = {"surface_id": sid, "focus": bool(focus)}
|
|
if pane is not None:
|
|
pid = self._resolve_pane_id(pane)
|
|
if not pid:
|
|
raise cmuxError(f"Invalid pane: {pane!r}")
|
|
params["pane_id"] = pid
|
|
if workspace is not None:
|
|
wsid = self._resolve_workspace_id(workspace)
|
|
if not wsid:
|
|
raise cmuxError(f"Invalid workspace: {workspace!r}")
|
|
params["workspace_id"] = wsid
|
|
if window_id is not None:
|
|
params["window_id"] = str(window_id)
|
|
if before_surface is not None:
|
|
before_id = self._resolve_surface_id(before_surface)
|
|
if not before_id:
|
|
raise cmuxError(f"Invalid before_surface: {before_surface!r}")
|
|
params["before_surface_id"] = before_id
|
|
if after_surface is not None:
|
|
after_id = self._resolve_surface_id(after_surface)
|
|
if not after_id:
|
|
raise cmuxError(f"Invalid after_surface: {after_surface!r}")
|
|
params["after_surface_id"] = after_id
|
|
if index is not None:
|
|
params["index"] = int(index)
|
|
|
|
self._call("surface.move", params)
|
|
|
|
def reorder_surface(
|
|
self,
|
|
surface: Union[str, int],
|
|
*,
|
|
index: Optional[int] = None,
|
|
before_surface: Union[str, int, None] = None,
|
|
after_surface: Union[str, int, None] = None,
|
|
) -> None:
|
|
sid = self._resolve_surface_id(surface)
|
|
if not sid:
|
|
raise cmuxError(f"Invalid surface: {surface!r}")
|
|
|
|
params: Dict[str, Any] = {"surface_id": sid}
|
|
targets = 0
|
|
if index is not None:
|
|
params["index"] = int(index)
|
|
targets += 1
|
|
if before_surface is not None:
|
|
before_id = self._resolve_surface_id(before_surface)
|
|
if not before_id:
|
|
raise cmuxError(f"Invalid before_surface: {before_surface!r}")
|
|
params["before_surface_id"] = before_id
|
|
targets += 1
|
|
if after_surface is not None:
|
|
after_id = self._resolve_surface_id(after_surface)
|
|
if not after_id:
|
|
raise cmuxError(f"Invalid after_surface: {after_surface!r}")
|
|
params["after_surface_id"] = after_id
|
|
targets += 1
|
|
if targets != 1:
|
|
raise cmuxError("reorder_surface requires exactly one target: index|before_surface|after_surface")
|
|
|
|
self._call("surface.reorder", params)
|
|
|
|
def trigger_flash(self, surface: Union[str, int, None] = None) -> None:
|
|
params: Dict[str, Any] = {}
|
|
if surface is not None:
|
|
sid = self._resolve_surface_id(surface)
|
|
if not sid:
|
|
raise cmuxError(f"Invalid surface: {surface!r}")
|
|
params["surface_id"] = sid
|
|
self._call("surface.trigger_flash", params)
|
|
|
|
def refresh_surfaces(self, workspace: Union[str, int, None] = None) -> None:
|
|
params: Dict[str, Any] = {}
|
|
if workspace is not None:
|
|
wsid = self._resolve_workspace_id(workspace)
|
|
params["workspace_id"] = wsid
|
|
self._call("surface.refresh", params)
|
|
|
|
def surface_health(self, workspace: Union[str, int, None] = None) -> List[dict]:
|
|
params: Dict[str, Any] = {}
|
|
if workspace is not None:
|
|
wsid = self._resolve_workspace_id(workspace)
|
|
params["workspace_id"] = wsid
|
|
res = self._call("surface.health", params) or {}
|
|
return list(res.get("surfaces") or [])
|
|
|
|
def clear_history(self, surface: Union[str, int, None] = None, workspace: Union[str, int, None] = None) -> None:
|
|
params: Dict[str, Any] = {}
|
|
if workspace is not None:
|
|
wsid = self._resolve_workspace_id(workspace)
|
|
params["workspace_id"] = wsid
|
|
if surface is not None:
|
|
sid = self._resolve_surface_id(surface, workspace_id=params.get("workspace_id"))
|
|
if not sid:
|
|
raise cmuxError(f"Invalid surface: {surface!r}")
|
|
params["surface_id"] = sid
|
|
self._call("surface.clear_history", params)
|
|
|
|
# ---------------------------------------------------------------------
|
|
# Pane commands
|
|
# ---------------------------------------------------------------------
|
|
|
|
def list_panes(self) -> List[Tuple[int, str, int, bool]]:
|
|
res = self._call("pane.list") or {}
|
|
out: List[Tuple[int, str, int, bool]] = []
|
|
for row in res.get("panes") or []:
|
|
out.append((
|
|
int(row.get("index", 0)),
|
|
str(row.get("id")),
|
|
int(row.get("surface_count", 0)),
|
|
bool(row.get("focused", False)),
|
|
))
|
|
return out
|
|
|
|
def focus_pane(self, pane: Union[str, int]) -> None:
|
|
pid = self._resolve_pane_id(pane)
|
|
if not pid:
|
|
raise cmuxError(f"Invalid pane: {pane!r}")
|
|
self._call("pane.focus", {"pane_id": pid})
|
|
|
|
def list_pane_surfaces(self, pane: Union[str, int, None] = None) -> List[Tuple[int, str, str, bool]]:
|
|
params: Dict[str, Any] = {}
|
|
if pane is not None:
|
|
pid = self._resolve_pane_id(pane)
|
|
params["pane_id"] = pid
|
|
res = self._call("pane.surfaces", params) or {}
|
|
out: List[Tuple[int, str, str, bool]] = []
|
|
for row in res.get("surfaces") or []:
|
|
out.append((
|
|
int(row.get("index", 0)),
|
|
str(row.get("id")),
|
|
str(row.get("title", "")),
|
|
bool(row.get("selected", False)),
|
|
))
|
|
return out
|
|
|
|
def swap_pane(self, pane: Union[str, int], target_pane: Union[str, int], focus: bool = True) -> None:
|
|
source = self._resolve_pane_id(pane)
|
|
target = self._resolve_pane_id(target_pane)
|
|
if not source or not target:
|
|
raise cmuxError(f"Invalid panes: pane={pane!r}, target_pane={target_pane!r}")
|
|
self._call("pane.swap", {"pane_id": source, "target_pane_id": target, "focus": bool(focus)})
|
|
|
|
def break_pane(self, pane: Union[str, int, None] = None, surface: Union[str, int, None] = None, focus: bool = True) -> str:
|
|
params: Dict[str, Any] = {"focus": bool(focus)}
|
|
if pane is not None:
|
|
pid = self._resolve_pane_id(pane)
|
|
if not pid:
|
|
raise cmuxError(f"Invalid pane: {pane!r}")
|
|
params["pane_id"] = pid
|
|
if surface is not None:
|
|
sid = self._resolve_surface_id(surface)
|
|
if not sid:
|
|
raise cmuxError(f"Invalid surface: {surface!r}")
|
|
params["surface_id"] = sid
|
|
res = self._call("pane.break", params) or {}
|
|
wsid = res.get("workspace_id")
|
|
if not wsid:
|
|
raise cmuxError(f"pane.break returned no workspace_id: {res}")
|
|
return str(wsid)
|
|
|
|
def join_pane(
|
|
self,
|
|
target_pane: Union[str, int],
|
|
pane: Union[str, int, None] = None,
|
|
surface: Union[str, int, None] = None,
|
|
focus: bool = True,
|
|
) -> None:
|
|
target = self._resolve_pane_id(target_pane)
|
|
if not target:
|
|
raise cmuxError(f"Invalid target_pane: {target_pane!r}")
|
|
params: Dict[str, Any] = {"target_pane_id": target, "focus": bool(focus)}
|
|
if pane is not None:
|
|
source = self._resolve_pane_id(pane)
|
|
if not source:
|
|
raise cmuxError(f"Invalid pane: {pane!r}")
|
|
params["pane_id"] = source
|
|
if surface is not None:
|
|
sid = self._resolve_surface_id(surface)
|
|
if not sid:
|
|
raise cmuxError(f"Invalid surface: {surface!r}")
|
|
params["surface_id"] = sid
|
|
self._call("pane.join", params)
|
|
|
|
def last_pane(self) -> str:
|
|
res = self._call("pane.last") or {}
|
|
pid = res.get("pane_id")
|
|
if not pid:
|
|
raise cmuxError(f"pane.last returned no pane_id: {res}")
|
|
return str(pid)
|
|
|
|
# ---------------------------------------------------------------------
|
|
# Input
|
|
# ---------------------------------------------------------------------
|
|
|
|
def send(self, text: str) -> None:
|
|
text2 = _unescape_backslash_controls(text)
|
|
self._call("surface.send_text", {"text": text2})
|
|
|
|
def send_surface(self, surface: Union[str, int], text: str) -> None:
|
|
sid = self._resolve_surface_id(surface)
|
|
if not sid:
|
|
raise cmuxError(f"Invalid surface: {surface!r}")
|
|
text2 = _unescape_backslash_controls(text)
|
|
self._call("surface.send_text", {"surface_id": sid, "text": text2})
|
|
|
|
def send_key(self, key: str) -> None:
|
|
self._call("surface.send_key", {"key": key})
|
|
|
|
def send_key_surface(self, surface: Union[str, int], key: str) -> None:
|
|
sid = self._resolve_surface_id(surface)
|
|
if not sid:
|
|
raise cmuxError(f"Invalid surface: {surface!r}")
|
|
self._call("surface.send_key", {"surface_id": sid, "key": key})
|
|
|
|
def send_ctrl_c(self) -> None:
|
|
self.send_key("ctrl-c")
|
|
|
|
def send_ctrl_d(self) -> None:
|
|
self.send_key("ctrl-d")
|
|
|
|
# ---------------------------------------------------------------------
|
|
# Notifications
|
|
# ---------------------------------------------------------------------
|
|
|
|
def notify(self, title: str, subtitle: str = "", body: str = "") -> None:
|
|
self._call("notification.create", {"title": title, "subtitle": subtitle, "body": body})
|
|
|
|
def notify_surface(self, surface: Union[str, int], title: str, subtitle: str = "", body: str = "") -> None:
|
|
sid = self._resolve_surface_id(surface)
|
|
if not sid:
|
|
raise cmuxError(f"Invalid surface: {surface!r}")
|
|
self._call(
|
|
"notification.create_for_surface",
|
|
{"surface_id": sid, "title": title, "subtitle": subtitle, "body": body},
|
|
)
|
|
|
|
def list_notifications(self) -> list[dict]:
|
|
res = self._call("notification.list") or {}
|
|
return list(res.get("notifications") or [])
|
|
|
|
def clear_notifications(self) -> None:
|
|
self._call("notification.clear")
|
|
|
|
def set_app_focus(self, active: Union[bool, None]) -> None:
|
|
if active is None:
|
|
state = "clear"
|
|
else:
|
|
state = "active" if active else "inactive"
|
|
self._call("app.focus_override.set", {"state": state})
|
|
|
|
def simulate_app_active(self) -> None:
|
|
self._call("app.simulate_active")
|
|
|
|
# Debug-only: focus via notification flow
|
|
def focus_notification(self, workspace: Union[str, int], surface: Union[str, int, None] = None) -> None:
|
|
wsid = self._resolve_workspace_id(workspace)
|
|
params: Dict[str, Any] = {"workspace_id": wsid}
|
|
if surface is not None:
|
|
sid = self._resolve_surface_id(surface, workspace_id=wsid)
|
|
params["surface_id"] = sid
|
|
self._call("debug.notification.focus", params)
|
|
|
|
# ---------------------------------------------------------------------
|
|
# Browser
|
|
# ---------------------------------------------------------------------
|
|
|
|
def open_browser(self, url: str = None) -> str:
|
|
params: Dict[str, Any] = {}
|
|
if url:
|
|
params["url"] = url
|
|
res = self._call("browser.open_split", params) or {}
|
|
sid = res.get("surface_id")
|
|
if not sid:
|
|
raise cmuxError(f"browser.open_split returned no surface_id: {res}")
|
|
return str(sid)
|
|
|
|
def navigate(self, panel_id: str, url: str) -> None:
|
|
sid = self._resolve_surface_id(panel_id)
|
|
if not sid:
|
|
raise cmuxError(f"Invalid surface: {panel_id!r}")
|
|
self._call("browser.navigate", {"surface_id": sid, "url": url})
|
|
|
|
def browser_back(self, panel_id: str) -> None:
|
|
sid = self._resolve_surface_id(panel_id)
|
|
self._call("browser.back", {"surface_id": sid})
|
|
|
|
def browser_forward(self, panel_id: str) -> None:
|
|
sid = self._resolve_surface_id(panel_id)
|
|
self._call("browser.forward", {"surface_id": sid})
|
|
|
|
def browser_reload(self, panel_id: str) -> None:
|
|
sid = self._resolve_surface_id(panel_id)
|
|
self._call("browser.reload", {"surface_id": sid})
|
|
|
|
def get_url(self, panel_id: str) -> str:
|
|
sid = self._resolve_surface_id(panel_id)
|
|
res = self._call("browser.url.get", {"surface_id": sid}) or {}
|
|
return str(res.get("url") or "")
|
|
|
|
def focus_webview(self, panel_id: str) -> None:
|
|
sid = self._resolve_surface_id(panel_id)
|
|
self._call("browser.focus_webview", {"surface_id": sid})
|
|
|
|
def is_webview_focused(self, panel_id: str) -> bool:
|
|
sid = self._resolve_surface_id(panel_id)
|
|
res = self._call("browser.is_webview_focused", {"surface_id": sid}) or {}
|
|
return bool(res.get("focused"))
|
|
|
|
def wait_for_webview_focus(self, panel_id: str, timeout_s: float = 2.0) -> None:
|
|
start = time.time()
|
|
while time.time() - start < timeout_s:
|
|
if self.is_webview_focused(panel_id):
|
|
return
|
|
time.sleep(0.05)
|
|
raise cmuxError(f"Timed out waiting for webview focus: {panel_id}")
|
|
|
|
# ---------------------------------------------------------------------
|
|
# Debug / test-only
|
|
# ---------------------------------------------------------------------
|
|
|
|
def set_shortcut(self, name: str, combo: str) -> None:
|
|
self._call("debug.shortcut.set", {"name": name, "combo": combo})
|
|
|
|
def simulate_shortcut(self, combo: str) -> None:
|
|
self._call("debug.shortcut.simulate", {"combo": combo})
|
|
|
|
def simulate_type(self, text: str) -> None:
|
|
text2 = _unescape_backslash_controls(text)
|
|
self._call("debug.type", {"text": text2})
|
|
|
|
def activate_app(self) -> None:
|
|
self._call("debug.app.activate")
|
|
|
|
def open_command_palette_rename_tab_input(self, window_id: Optional[str] = None) -> None:
|
|
params: Dict[str, Any] = {}
|
|
if window_id is not None:
|
|
params["window_id"] = str(window_id)
|
|
self._call("debug.command_palette.rename_tab.open", params)
|
|
|
|
def command_palette_results(self, window_id: str, limit: int = 20) -> dict:
|
|
res = self._call(
|
|
"debug.command_palette.results",
|
|
{"window_id": str(window_id), "limit": int(limit)},
|
|
) or {}
|
|
return dict(res)
|
|
|
|
def command_palette_rename_select_all(self) -> bool:
|
|
res = self._call("debug.command_palette.rename_input.select_all") or {}
|
|
return bool(res.get("enabled"))
|
|
|
|
def set_command_palette_rename_select_all(self, enabled: bool) -> bool:
|
|
res = self._call("debug.command_palette.rename_input.select_all", {"enabled": bool(enabled)}) or {}
|
|
return bool(res.get("enabled"))
|
|
|
|
def is_terminal_focused(self, panel: Union[str, int]) -> bool:
|
|
sid = self._resolve_surface_id(panel)
|
|
res = self._call("debug.terminal.is_focused", {"surface_id": sid}) or {}
|
|
return bool(res.get("focused"))
|
|
|
|
def read_terminal_text(self, panel: Union[str, int, None] = None) -> str:
|
|
params: Dict[str, Any] = {}
|
|
if panel is not None:
|
|
sid = self._resolve_surface_id(panel)
|
|
params["surface_id"] = sid
|
|
try:
|
|
res = self._call("surface.read_text", params) or {}
|
|
if "text" in res:
|
|
return str(res.get("text") or "")
|
|
b64 = str(res.get("base64") or "")
|
|
raw = base64.b64decode(b64) if b64 else b""
|
|
return raw.decode("utf-8", errors="replace")
|
|
except cmuxError as exc:
|
|
# Back-compat for older builds that only expose the debug method.
|
|
if "method_not_found" not in str(exc):
|
|
raise
|
|
|
|
res = self._call("debug.terminal.read_text", params) or {}
|
|
b64 = str(res.get("base64") or "")
|
|
raw = base64.b64decode(b64) if b64 else b""
|
|
return raw.decode("utf-8", errors="replace")
|
|
|
|
def render_stats(self, panel: Union[str, int, None] = None) -> dict:
|
|
params: Dict[str, Any] = {}
|
|
if panel is not None:
|
|
sid = self._resolve_surface_id(panel)
|
|
params["surface_id"] = sid
|
|
res = self._call("debug.terminal.render_stats", params) or {}
|
|
# Server wraps the underlying stats object under "stats".
|
|
return dict(res.get("stats") or {})
|
|
|
|
def layout_debug(self) -> dict:
|
|
res = self._call("debug.layout") or {}
|
|
# Server wraps LayoutDebugResponse under "layout".
|
|
return dict(res.get("layout") or {})
|
|
|
|
def panel_snapshot_reset(self, panel: Union[str, int]) -> None:
|
|
sid = self._resolve_surface_id(panel)
|
|
self._call("debug.panel_snapshot.reset", {"surface_id": sid})
|
|
|
|
def panel_snapshot(self, panel: Union[str, int], label: str = "") -> dict:
|
|
sid = self._resolve_surface_id(panel)
|
|
params: Dict[str, Any] = {"surface_id": sid}
|
|
if label:
|
|
params["label"] = label
|
|
res = dict(self._call("debug.panel_snapshot", params) or {})
|
|
# Normalize key to match the v1 client (panel_id).
|
|
if "panel_id" not in res and "surface_id" in res:
|
|
res["panel_id"] = res.get("surface_id")
|
|
return res
|
|
|
|
def bonsplit_underflow_count(self) -> int:
|
|
res = self._call("debug.bonsplit_underflow.count") or {}
|
|
return int(res.get("count") or 0)
|
|
|
|
def reset_bonsplit_underflow_count(self) -> None:
|
|
self._call("debug.bonsplit_underflow.reset")
|
|
|
|
def empty_panel_count(self) -> int:
|
|
res = self._call("debug.empty_panel.count") or {}
|
|
return int(res.get("count") or 0)
|
|
|
|
def reset_empty_panel_count(self) -> None:
|
|
self._call("debug.empty_panel.reset")
|
|
|
|
def flash_count(self, surface: Union[str, int]) -> int:
|
|
sid = self._resolve_surface_id(surface)
|
|
res = self._call("debug.flash.count", {"surface_id": sid}) or {}
|
|
return int(res.get("count") or 0)
|
|
|
|
def reset_flash_counts(self) -> None:
|
|
self._call("debug.flash.reset")
|
|
|
|
def screenshot(self, label: str = "") -> dict:
|
|
params: Dict[str, Any] = {}
|
|
if label:
|
|
params["label"] = label
|
|
return dict(self._call("debug.window.screenshot", params) or {})
|
|
|
|
|
|
def main() -> None:
|
|
import argparse
|
|
|
|
parser = argparse.ArgumentParser(description="cmux v2 socket client")
|
|
parser.add_argument("-s", "--socket", default=cmux.DEFAULT_SOCKET_PATH, help="Socket path")
|
|
parser.add_argument("--method", help="v2 method name")
|
|
parser.add_argument("--params", default="{}", help="JSON params")
|
|
|
|
args = parser.parse_args()
|
|
|
|
with cmux(args.socket) as c:
|
|
if not args.method:
|
|
# Minimal smoke.
|
|
print(json.dumps(c.capabilities(), indent=2, sort_keys=True))
|
|
return
|
|
params = json.loads(args.params)
|
|
print(json.dumps(c._call(args.method, params), indent=2, sort_keys=True))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|