* Fix blank terminal after split operations and add visual tests ## Blank Terminal Fix - Add `needsRefreshAfterWindowChange` flag in GhosttyTerminalView - Force terminal refresh when view is added to window, even if size unchanged - Add `ghostty_surface_refresh()` call in attachToView for same-view reattachment - Add debug logging for surface attachment lifecycle (DEBUG builds only) ## Bonsplit Migration - Add bonsplit as local Swift package (vendor/bonsplit submodule) - Replace custom SplitTree with BonsplitController - Add Panel protocol with TerminalPanel and BrowserPanel implementations - Add SidebarTab as main tab container with BonsplitController - Remove old Splits/ directory (SplitTree, SplitView, TerminalSplitTreeView) ## Visual Screenshot Tests - Add test_visual_screenshots.py for automated visual regression testing - Uses in-app screenshot API (CGWindowListCreateImage) - no screen recording needed - Generates HTML report with before/after comparisons - Tests: splits, browser panels, focus switching, close operations, rapid cycles - Includes annotation fields for easy feedback ## Browser Shortcut (⌘⇧B) - Add keyboard shortcut to open browser panel in current pane - Add openBrowser() method to TabManager - Add shortcut configuration in KeyboardShortcutSettings ## Screenshot Command - Add 'screenshot' command to TerminalController for in-app window capture - Returns OK with screenshot ID and path ## Other - Add tests/visual_output/ and tests/visual_report.html to .gitignore * Add browser title subscription and set tab height to 30px - Subscribe to BrowserPanel.$pageTitle changes to update bonsplit tabs - Update tab titles in real-time as page navigation occurs - Clean up subscriptions when panels are removed - Set bonsplit tab bar and tab height to 30px (in submodule) * Fix socket API regressions in list_surfaces, list_bonsplit_tabs, focus_pane - list_surfaces: Remove [terminal]/[browser] suffix to keep UUID-only format that clients and tests expect for parsing - list_bonsplit_tabs --pane: Properly look up pane by UUID instead of creating a new PaneID (requires bonsplit PaneID.id to be public) - focus_pane: Accept both UUID strings and integer indices as documented * Fix browser panel stability and keyboard shortcuts - Prevent WKWebView focus lifecycle crashes during split/view reshuffles - Match bracket shortcuts via keyCode (Cmd+Shift+[ / ], Cmd+Ctrl+[ / ]) - Support Ghostty config goto_split:* keybinds when WebView is focused - Add focus_webview/is_webview_focused socket commands and regression tests - Rename SidebarTab to Workspace and update docs * Make ctrl+enter keybind test skippable Skip when the Ghostty keybind isn't configured or when osascript can't send keystrokes (no Accessibility permission), so VM runs stay green. * Auto-focus browser omnibar when blank When a browser surface is focused but no URL is loaded yet, focus the address bar instead of the WKWebView. * Stabilize socket surface indexing * Focus browser omnibar escape; add webview keybind UI tests - Escape in omnibar now returns focus to WKWebView\n- Add UI tests for Cmd+Ctrl+H pane navigation with WebKit focused (including Ghostty config)\n- Avoid flaky element screenshots in UpdatePillUITests on the UTM VM * Fix browser drag-to-split blanks and socket parsing * Fix webview-focused shortcuts and stabilize browser splits - Match ctrl/shift shortcuts by keyCode where needed (Ctrl+H, bracket keys) - Load Ghostty goto_split triggers reliably and refresh on config load - Add debug socket helpers: set_shortcut + simulate_shortcut for tests - Convert browser goto_split/keybind tests to socket-based injection (no osascript) - Bump bonsplit for drag-to-split fixes * Fix split layout collapse and harden socket pane APIs * Stabilize OSC 99 notification test timing * Fix terminal focus routing after split reparent * Support simulate_shortcut enter for focus routing test * Stabilize terminal focus routing test * Fix frozen new terminal tabs after many splits * Fix frozen new terminal tabs after splits * Fix terminal freeze on launch/new tabs * Update ghostty submodule * Fix terminal focus/render stalls after split churn * Fix nested split collapsing existing pane * Fix nested split collapse + stabilize new-surface focus * Update bonsplit submodule * Fix SIGINT test flake * Remove bonsplit tab-switch crossfade * Remove PROJECTS.md * Remove bonsplit tab selection animation * Ignore generated test reports * Middle click closes tab * Revert unintended .gitignore change * Fix build after main merge * Revert "Fix build after main merge" This reverts commit 16bf9816d0856b5385d52f886aa5eb50f3c9d9a4. * Revert "Merge remote-tracking branch 'origin/main' into fix/blank-terminal-and-visual-tests" This reverts commit 7c20fb53fd71fea7a19a3673f2dd73e5f0c783c4, reversing changes made to 0aff107d787bc9d8bbc28220090b4ca7af72e040. * Remove tab close fade animation * Use terminal.fill icon * Make terminal tab icon smaller * Match browser globe tab icon size * Bonsplit: tab min width 48 and tighter close button * Bonsplit: smaller tab title font * Show unread notification badge in bonsplit tabs and improve UI polish Sync unread notification state to bonsplit tab badges (blue dot). Improve EmptyPanelView with Terminal/Browser buttons and shortcut hints. Add tooltips to close tab button and search overlay buttons. * Fix reload.sh single-instance safety check on macOS Replace GNU-only `ps -o etimes=` with portable `ps -o etime=` and parse the dd-hh:mm:ss format manually for macOS compatibility. * Centralize keyboard shortcut definitions into Action enum Replace per-shortcut boilerplate with a single Action enum that holds the label, defaults key, and default binding for each shortcut. All call sites now use shortcut(for:). Settings UI is data-driven via ForEach(Action.allCases). Titlebar tooltips update dynamically when shortcuts are changed. Remove duplicate .keyboardShortcut() modifiers from menu items that are already handled by the event monitor. * Fix WKWebView consuming app menu shortcuts and close panel confirmation Add CmuxWebView subclass that routes key equivalents through the main menu before WebKit, so Cmd+N/Cmd+W/tab switching work when a browser pane is focused. Fix Cmd+W close-panel path: bypass Bonsplit delegate gating after the user confirms the running-process dialog by tracking forceCloseTabIds. Add unit tests (CmuxWebViewKeyEquivalentTests) and UI test scaffolding (MenuKeyEquivalentRoutingUITests) with a new cmux-unit Xcode scheme. * Update CLAUDE.md and PROJECTS.md with recent changes CLAUDE.md: enforce --tag for reload commands, add cleanup safety rules. PROJECTS.md: log notification badge, reload.sh fix, Cmd+W fix, WebView key equiv fix, and centralized shortcuts work. * Keep selection index stable on close * Add concepts page documenting terminology hierarchy New docs page explaining Window > Workspace > Pane > Surface > Panel hierarchy with aligned ASCII diagram. Updated tabs.mdx and splits.mdx to use consistent terminology (workspace instead of tab, surface instead of panel) and corrected outdated CLI command references. * Update bonsplit submodule * WIP: improve split close stability and UI regressions * Close terminal panel on child exit; hide terminal dirty dot * Fix split close/focus regressions and stabilize UI tests * Add unread Dock/Cmd+Tab badge with settings toggle * Fix browser-surface shortcuts and Cmd+L browser opening * Snapshot current workspace state before regression fixes * Update bonsplit submodule snapshot * Stabilize split-close regression capture and sidebar resize assertions * Change default Show Notifications shortcut from Cmd+Shift+I to Cmd+I * Fix update check readiness race, enable release update logging, and improve checking spinner * Restore terminal file drop, fix browser omnibar click focus, and add panel workspace ID mutation for surface moves * Add Cmd+digit workspace hints, titlebar shortcut pills, sidebar drag-reorder, and workspace placement settings * Add v2 browser automation API, surface move/reorder commands, and short-handle ref system to TerminalController * Add CLI browser command surface, --id-format flag, and move/reorder commands * Extend test clients with move/reorder APIs, ref-handle support, and increased timeouts * Harden test runner scripts with deterministic builds, retry logic, and robust socket readiness * Stabilize existing test suites with focus-wait helpers, increased timeouts, and API shape updates * Add terminal file drop e2e regression test * Add v2 browser API, CLI ref resolution, and surface move/reorder test suites * Add unit tests for shortcut hints, workspace reorder, drop planner, and update UI test stabilization * Add cmux-debug-windows skill with snapshot script and agent config * Update project docs: mark browser parity and move/reorder phases complete, add parallel agent workflow guidelines * Update bonsplit submodule: re-entrant setPosition guard, tab shortcut hints, and moveTab/reorderTab API * Add browser agent UX improvements: snapshot refs, placement reuse, diagnostics, and skill docs - Upgrade browser.snapshot to emit accessibility tree text with element refs (eN) - Add right-sibling pane reuse policy for browser.open_split placement - Add rich not_found diagnostics with retry logic for selector actions - Support --snapshot-after for post-action verification on mutating commands - Allow browser fill with empty text for clearing inputs - Default CLI --id-format to refs-first (UUIDs opt-in via --id-format uuids|both) - Format legacy new-pane/new-surface output with short surface refs - Add skills/cmuxterm-browser/ and skills/cmuxterm/ end-user skill docs - Add regression tests for placement policy, snapshot refs, diagnostics, and ID defaults * Update bonsplit submodule: keep raster favicons in color when inactive
1060 lines
38 KiB
Python
Executable file
1060 lines
38 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
"""
|
|
cmux Python Client
|
|
|
|
A client library for programmatically controlling cmux via Unix socket.
|
|
|
|
Usage:
|
|
from cmux import cmux
|
|
|
|
client = cmux()
|
|
client.connect()
|
|
|
|
# Send text to terminal
|
|
client.send("echo hello\\n")
|
|
|
|
# Send special keys
|
|
client.send_key("ctrl-c")
|
|
client.send_key("ctrl-d")
|
|
|
|
# Tab management
|
|
client.new_tab()
|
|
client.list_tabs()
|
|
client.select_tab(0)
|
|
client.new_split("right")
|
|
client.list_surfaces()
|
|
client.focus_surface(0)
|
|
|
|
client.close()
|
|
"""
|
|
|
|
import socket
|
|
import select
|
|
import os
|
|
import time
|
|
import errno
|
|
import json
|
|
import base64
|
|
import glob
|
|
import re
|
|
from typing import Optional, List, Tuple, Union
|
|
|
|
|
|
class cmuxError(Exception):
|
|
"""Exception raised for cmux errors"""
|
|
pass
|
|
|
|
|
|
_LAST_SOCKET_PATH_FILE = "/tmp/cmux-last-socket-path"
|
|
_DEFAULT_DEBUG_BUNDLE_ID = "com.cmuxterm.app.debug"
|
|
|
|
|
|
def _sanitize_tag_slug(raw: str) -> str:
|
|
cleaned = re.sub(r"[^a-z0-9]+", "-", (raw or "").strip().lower())
|
|
cleaned = re.sub(r"-+", "-", cleaned).strip("-")
|
|
return cleaned or "agent"
|
|
|
|
|
|
def _sanitize_bundle_suffix(raw: str) -> str:
|
|
# Must match scripts/reload.sh sanitize_bundle() so tagged tests can
|
|
# reliably target the correct app via AppleScript.
|
|
cleaned = re.sub(r"[^a-z0-9]+", ".", (raw or "").strip().lower())
|
|
cleaned = re.sub(r"\.+", ".", cleaned).strip(".")
|
|
return cleaned or "agent"
|
|
|
|
|
|
def _quote_option_value(value: str) -> str:
|
|
# Must match TerminalController.parseOptions() quoting rules.
|
|
escaped = (value or "").replace("\\", "\\\\").replace('"', '\\"')
|
|
return f"\"{escaped}\""
|
|
|
|
|
|
def _default_bundle_id() -> str:
|
|
override = os.environ.get("CMUX_BUNDLE_ID")
|
|
if override:
|
|
return override
|
|
|
|
tag = os.environ.get("CMUX_TAG")
|
|
if tag:
|
|
suffix = _sanitize_bundle_suffix(tag)
|
|
return f"{_DEFAULT_DEBUG_BUNDLE_ID}.{suffix}"
|
|
|
|
return _DEFAULT_DEBUG_BUNDLE_ID
|
|
|
|
|
|
def _read_last_socket_path() -> Optional[str]:
|
|
try:
|
|
with open(_LAST_SOCKET_PATH_FILE, "r", encoding="utf-8") as f:
|
|
path = f.read().strip()
|
|
if path:
|
|
return path
|
|
except OSError:
|
|
pass
|
|
return None
|
|
|
|
|
|
def _can_connect(path: str, timeout: float = 0.15, retries: int = 4) -> bool:
|
|
# Best-effort check to avoid getting stuck on stale socket files.
|
|
for _ in range(max(1, retries)):
|
|
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
|
try:
|
|
s.settimeout(timeout)
|
|
s.connect(path)
|
|
return True
|
|
except OSError:
|
|
time.sleep(0.05)
|
|
finally:
|
|
try:
|
|
s.close()
|
|
except Exception:
|
|
pass
|
|
return False
|
|
|
|
|
|
def _default_socket_path() -> str:
|
|
tag = os.environ.get("CMUX_TAG")
|
|
if tag:
|
|
slug = _sanitize_tag_slug(tag)
|
|
tagged_candidates = [
|
|
f"/tmp/cmux-debug-{slug}.sock",
|
|
f"/tmp/cmux-{slug}.sock",
|
|
]
|
|
for path in tagged_candidates:
|
|
if os.path.exists(path) and _can_connect(path):
|
|
return path
|
|
# If nothing is connectable yet (e.g. the app is still starting),
|
|
# fall back to the first existing candidate.
|
|
for path in tagged_candidates:
|
|
if os.path.exists(path):
|
|
return path
|
|
# Prefer the debug naming convention when we have to guess.
|
|
return tagged_candidates[0]
|
|
|
|
override = os.environ.get("CMUX_SOCKET_PATH")
|
|
if override:
|
|
if os.path.exists(override) and _can_connect(override):
|
|
return override
|
|
# Fall back to other heuristics if the override points at a stale socket file.
|
|
if not os.path.exists(override):
|
|
return override
|
|
|
|
last_socket = _read_last_socket_path()
|
|
if last_socket:
|
|
if os.path.exists(last_socket) and _can_connect(last_socket):
|
|
return last_socket
|
|
|
|
# Prefer the non-tagged sockets when present.
|
|
candidates = ["/tmp/cmux-debug.sock", "/tmp/cmux.sock"]
|
|
for path in candidates:
|
|
if os.path.exists(path) and _can_connect(path):
|
|
return path
|
|
|
|
# Otherwise, fall back to the newest tagged debug socket if there is one.
|
|
tagged = glob.glob("/tmp/cmux-debug-*.sock")
|
|
tagged = [p for p in tagged if os.path.exists(p)]
|
|
if tagged:
|
|
tagged.sort(key=lambda p: os.path.getmtime(p), reverse=True)
|
|
for p in tagged:
|
|
if _can_connect(p, timeout=0.1, retries=2):
|
|
return p
|
|
|
|
return candidates[0]
|
|
|
|
|
|
class cmux:
|
|
"""Client for controlling cmux via Unix socket"""
|
|
|
|
DEFAULT_SOCKET_PATH = _default_socket_path()
|
|
DEFAULT_BUNDLE_ID = _default_bundle_id()
|
|
|
|
@staticmethod
|
|
def default_socket_path() -> str:
|
|
return _default_socket_path()
|
|
|
|
@staticmethod
|
|
def default_bundle_id() -> str:
|
|
return _default_bundle_id()
|
|
|
|
def __init__(self, socket_path: str = None):
|
|
# Resolve at init time so imports don't "lock in" a stale path.
|
|
self.socket_path = socket_path or _default_socket_path()
|
|
self._socket: Optional[socket.socket] = None
|
|
self._recv_buffer: str = ""
|
|
|
|
def connect(self) -> None:
|
|
"""Connect to the cmux socket"""
|
|
if self._socket is not None:
|
|
return
|
|
|
|
start = time.time()
|
|
while not os.path.exists(self.socket_path):
|
|
if time.time() - start >= 2.0:
|
|
raise cmuxError(
|
|
f"Socket not found at {self.socket_path}. "
|
|
"Is cmux running?"
|
|
)
|
|
time.sleep(0.1)
|
|
|
|
last_error: Optional[socket.error] = None
|
|
while True:
|
|
self._socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
|
try:
|
|
self._socket.connect(self.socket_path)
|
|
self._socket.settimeout(5.0)
|
|
return
|
|
except socket.error as e:
|
|
last_error = e
|
|
self._socket.close()
|
|
self._socket = None
|
|
if e.errno in (errno.ECONNREFUSED, errno.ENOENT) and time.time() - start < 2.0:
|
|
time.sleep(0.1)
|
|
continue
|
|
raise cmuxError(f"Failed to connect: {e}")
|
|
|
|
def close(self) -> None:
|
|
"""Close the connection"""
|
|
if self._socket is not None:
|
|
self._socket.close()
|
|
self._socket = None
|
|
|
|
def __enter__(self):
|
|
self.connect()
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
self.close()
|
|
return False
|
|
|
|
def _send_command(self, command: str) -> str:
|
|
"""Send a command and receive response"""
|
|
if self._socket is None:
|
|
raise cmuxError("Not connected")
|
|
|
|
try:
|
|
self._socket.sendall((command + "\n").encode())
|
|
data = self._recv_buffer
|
|
self._recv_buffer = ""
|
|
saw_newline = "\n" in data
|
|
start = time.time()
|
|
while True:
|
|
if saw_newline:
|
|
ready, _, _ = select.select([self._socket], [], [], 0.1)
|
|
if not ready:
|
|
break
|
|
try:
|
|
chunk = self._socket.recv(8192)
|
|
except socket.timeout:
|
|
if saw_newline:
|
|
break
|
|
if time.time() - start >= 5.0:
|
|
raise cmuxError("Command timed out")
|
|
continue
|
|
if not chunk:
|
|
break
|
|
data += chunk.decode()
|
|
if "\n" in data:
|
|
saw_newline = True
|
|
if data.endswith("\n"):
|
|
data = data[:-1]
|
|
return data
|
|
except socket.timeout:
|
|
raise cmuxError("Command timed out")
|
|
except socket.error as e:
|
|
raise cmuxError(f"Socket error: {e}")
|
|
|
|
def ping(self) -> bool:
|
|
"""Check if the server is responding"""
|
|
response = self._send_command("ping")
|
|
return response == "PONG"
|
|
|
|
def list_tabs(self) -> List[Tuple[int, str, str, bool]]:
|
|
"""
|
|
List all tabs.
|
|
Returns list of (index, id, title, is_selected) tuples.
|
|
"""
|
|
response = self._send_command("list_tabs")
|
|
if response.startswith("ERROR: Unknown command"):
|
|
response = self._send_command("list_workspaces")
|
|
if response in ("No tabs", "No workspaces"):
|
|
return []
|
|
|
|
tabs = []
|
|
for line in response.split("\n"):
|
|
if not line.strip():
|
|
continue
|
|
selected = line.startswith("*")
|
|
parts = line.lstrip("* ").split(" ", 2)
|
|
if len(parts) >= 3:
|
|
index = int(parts[0].rstrip(":"))
|
|
tab_id = parts[1]
|
|
title = parts[2] if len(parts) > 2 else ""
|
|
tabs.append((index, tab_id, title, selected))
|
|
return tabs
|
|
|
|
def new_tab(self) -> str:
|
|
"""Create a new tab. Returns the new tab's ID."""
|
|
response = self._send_command("new_tab")
|
|
if response.startswith("ERROR: Unknown command"):
|
|
response = self._send_command("new_workspace")
|
|
if response.startswith("OK "):
|
|
return response[3:]
|
|
raise cmuxError(response)
|
|
|
|
def new_split(self, direction: str) -> str:
|
|
"""Create a split in the given direction (left/right/up/down). Returns new panel ID when available."""
|
|
response = self._send_command(f"new_split {direction}")
|
|
if response.startswith("OK "):
|
|
return response[3:]
|
|
if response.startswith("OK"):
|
|
return ""
|
|
if not response.startswith("OK"):
|
|
raise cmuxError(response)
|
|
|
|
def close_tab(self, tab_id: str) -> None:
|
|
"""Close a tab by ID"""
|
|
response = self._send_command(f"close_tab {tab_id}")
|
|
if response.startswith("ERROR: Unknown command"):
|
|
response = self._send_command(f"close_workspace {tab_id}")
|
|
if not response.startswith("OK"):
|
|
raise cmuxError(response)
|
|
|
|
def select_tab(self, tab: Union[str, int]) -> None:
|
|
"""Select a tab by ID or index"""
|
|
response = self._send_command(f"select_tab {tab}")
|
|
if response.startswith("ERROR: Unknown command"):
|
|
response = self._send_command(f"select_workspace {tab}")
|
|
if not response.startswith("OK"):
|
|
raise cmuxError(response)
|
|
|
|
def list_surfaces(self, tab: Union[str, int, None] = None) -> List[Tuple[int, str, bool]]:
|
|
"""
|
|
List surfaces for a tab. Returns list of (index, id, is_focused) tuples.
|
|
If tab is None, uses the current tab.
|
|
"""
|
|
arg = "" if tab is None else str(tab)
|
|
response = self._send_command(f"list_surfaces {arg}".rstrip())
|
|
if response in ("No surfaces", "ERROR: Tab not found"):
|
|
return []
|
|
|
|
surfaces = []
|
|
for line in response.split("\n"):
|
|
if not line.strip():
|
|
continue
|
|
selected = line.startswith("*")
|
|
parts = line.lstrip("* ").split(" ", 1)
|
|
if len(parts) >= 2:
|
|
index = int(parts[0].rstrip(":"))
|
|
surface_id = parts[1]
|
|
surfaces.append((index, surface_id, selected))
|
|
return surfaces
|
|
|
|
def focus_surface(self, surface: Union[str, int]) -> None:
|
|
"""Focus a surface by ID or index in the current tab."""
|
|
response = self._send_command(f"focus_surface {surface}")
|
|
if not response.startswith("OK"):
|
|
raise cmuxError(response)
|
|
|
|
def current_tab(self) -> str:
|
|
"""Get the current tab's ID"""
|
|
response = self._send_command("current_tab")
|
|
if response.startswith("ERROR: Unknown command"):
|
|
response = self._send_command("current_workspace")
|
|
if response.startswith("ERROR"):
|
|
raise cmuxError(response)
|
|
return response
|
|
|
|
def current_workspace(self) -> str:
|
|
"""Get the current workspace's ID."""
|
|
response = self._send_command("current_workspace")
|
|
if response.startswith("ERROR"):
|
|
raise cmuxError(response)
|
|
return response
|
|
|
|
def send(self, text: str) -> None:
|
|
"""
|
|
Send text to the current terminal.
|
|
Use \\n for newline (Enter), \\t for tab, etc.
|
|
|
|
Note: The text is sent as-is. Use actual escape sequences:
|
|
client.send("echo hello\\n") # Sends: echo hello<Enter>
|
|
client.send("echo hello" + "\\n") # Same thing
|
|
"""
|
|
# Escape actual newlines/tabs to their backslash forms for protocol
|
|
# The server will unescape them
|
|
escaped = text.replace("\n", "\\n").replace("\r", "\\r").replace("\t", "\\t")
|
|
response = self._send_command(f"send {escaped}")
|
|
if not response.startswith("OK"):
|
|
raise cmuxError(response)
|
|
|
|
def send_surface(self, surface: Union[str, int], text: str) -> None:
|
|
"""Send text to a specific surface by ID or index in the current tab."""
|
|
escaped = text.replace("\n", "\\n").replace("\r", "\\r").replace("\t", "\\t")
|
|
response = self._send_command(f"send_surface {surface} {escaped}")
|
|
if not response.startswith("OK"):
|
|
raise cmuxError(response)
|
|
|
|
def send_key(self, key: str) -> None:
|
|
"""
|
|
Send a special key to the current terminal.
|
|
|
|
Supported keys:
|
|
ctrl-c, ctrl-d, ctrl-z, ctrl-\\
|
|
enter, tab, escape, backspace
|
|
ctrl-<letter> for any letter
|
|
"""
|
|
response = self._send_command(f"send_key {key}")
|
|
if not response.startswith("OK"):
|
|
raise cmuxError(response)
|
|
|
|
def send_key_surface(self, surface: Union[str, int], key: str) -> None:
|
|
"""Send a special key to a specific surface by ID or index in the current tab."""
|
|
response = self._send_command(f"send_key_surface {surface} {key}")
|
|
if not response.startswith("OK"):
|
|
raise cmuxError(response)
|
|
|
|
def send_line(self, text: str) -> None:
|
|
"""Send text followed by Enter"""
|
|
self.send(text + "\\n")
|
|
|
|
def send_ctrl_c(self) -> None:
|
|
"""Send Ctrl+C (SIGINT)"""
|
|
self.send_key("ctrl-c")
|
|
|
|
def send_ctrl_d(self) -> None:
|
|
"""Send Ctrl+D (EOF)"""
|
|
self.send_key("ctrl-d")
|
|
|
|
def help(self) -> str:
|
|
"""Get help text from server"""
|
|
return self._send_command("help")
|
|
|
|
def notify(self, title: str, subtitle: str = "", body: str = "") -> None:
|
|
"""Create a notification for the focused surface."""
|
|
if subtitle or body:
|
|
payload = f"{title}|{subtitle}|{body}"
|
|
else:
|
|
payload = title
|
|
response = self._send_command(f"notify {payload}")
|
|
if not response.startswith("OK"):
|
|
raise cmuxError(response)
|
|
|
|
def notify_surface(self, surface: Union[str, int], title: str, subtitle: str = "", body: str = "") -> None:
|
|
"""Create a notification for a specific surface by ID or index."""
|
|
if subtitle or body:
|
|
payload = f"{title}|{subtitle}|{body}"
|
|
else:
|
|
payload = title
|
|
response = self._send_command(f"notify_surface {surface} {payload}")
|
|
if not response.startswith("OK"):
|
|
raise cmuxError(response)
|
|
|
|
def list_notifications(self) -> list[dict]:
|
|
"""
|
|
List notifications.
|
|
Returns list of dicts with keys: id, tab_id/workspace_id, surface_id, is_read, title, subtitle, body.
|
|
"""
|
|
response = self._send_command("list_notifications")
|
|
if response == "No notifications":
|
|
return []
|
|
|
|
items = []
|
|
for line in response.split("\n"):
|
|
if not line.strip():
|
|
continue
|
|
_, payload = line.split(":", 1)
|
|
parts = payload.split("|", 6)
|
|
if len(parts) < 7:
|
|
continue
|
|
notif_id, tab_id, surface_id, read_text, title, subtitle, body = parts
|
|
items.append({
|
|
"id": notif_id,
|
|
"tab_id": tab_id,
|
|
"workspace_id": tab_id,
|
|
"surface_id": None if surface_id == "none" else surface_id,
|
|
"is_read": read_text == "read",
|
|
"title": title,
|
|
"subtitle": subtitle,
|
|
"body": body,
|
|
})
|
|
return items
|
|
|
|
def clear_notifications(self) -> None:
|
|
"""Clear all notifications."""
|
|
response = self._send_command("clear_notifications")
|
|
if not response.startswith("OK"):
|
|
raise cmuxError(response)
|
|
|
|
def set_app_focus(self, active: Union[bool, None]) -> None:
|
|
"""Override app focus state. Use None to clear override."""
|
|
if active is None:
|
|
value = "clear"
|
|
else:
|
|
value = "active" if active else "inactive"
|
|
response = self._send_command(f"set_app_focus {value}")
|
|
if not response.startswith("OK"):
|
|
raise cmuxError(response)
|
|
|
|
def simulate_app_active(self) -> None:
|
|
"""Trigger the app active handler."""
|
|
response = self._send_command("simulate_app_active")
|
|
if not response.startswith("OK"):
|
|
raise cmuxError(response)
|
|
|
|
def set_status(self, key: str, value: str, icon: str = None, color: str = None, tab: str = None) -> None:
|
|
"""Set a sidebar status entry."""
|
|
# Put options before `--` so value can contain arbitrary tokens like `--tab`.
|
|
cmd = f"set_status {key}"
|
|
if icon:
|
|
cmd += f" --icon={icon}"
|
|
if color:
|
|
cmd += f" --color={color}"
|
|
if tab:
|
|
cmd += f" --tab={tab}"
|
|
cmd += f" -- {_quote_option_value(value)}"
|
|
response = self._send_command(cmd)
|
|
if not response.startswith("OK"):
|
|
raise cmuxError(response)
|
|
|
|
def clear_status(self, key: str, tab: str = None) -> None:
|
|
"""Remove a sidebar status entry."""
|
|
cmd = f"clear_status {key}"
|
|
if tab:
|
|
cmd += f" --tab={tab}"
|
|
response = self._send_command(cmd)
|
|
if not response.startswith("OK"):
|
|
raise cmuxError(response)
|
|
|
|
def log(self, message: str, level: str = None, source: str = None, tab: str = None) -> None:
|
|
"""Append a sidebar log entry."""
|
|
# TerminalController.parseOptions treats any --* token as an option until
|
|
# a `--` separator. Put options first and then use `--` so messages can
|
|
# contain arbitrary tokens like `--force`.
|
|
cmd = "log"
|
|
if level:
|
|
cmd += f" --level={level}"
|
|
if source:
|
|
cmd += f" --source={source}"
|
|
if tab:
|
|
cmd += f" --tab={tab}"
|
|
cmd += f" -- {_quote_option_value(message)}"
|
|
response = self._send_command(cmd)
|
|
if not response.startswith("OK"):
|
|
raise cmuxError(response)
|
|
|
|
def set_progress(self, value: float, label: str = None, tab: str = None) -> None:
|
|
"""Set sidebar progress bar (0.0-1.0)."""
|
|
cmd = f"set_progress {value}"
|
|
if label:
|
|
cmd += f" --label={_quote_option_value(label)}"
|
|
if tab:
|
|
cmd += f" --tab={tab}"
|
|
response = self._send_command(cmd)
|
|
if not response.startswith("OK"):
|
|
raise cmuxError(response)
|
|
|
|
def clear_progress(self, tab: str = None) -> None:
|
|
"""Clear sidebar progress bar."""
|
|
cmd = "clear_progress"
|
|
if tab:
|
|
cmd += f" --tab={tab}"
|
|
response = self._send_command(cmd)
|
|
if not response.startswith("OK"):
|
|
raise cmuxError(response)
|
|
|
|
def report_git_branch(self, branch: str, status: str = None, tab: str = None) -> None:
|
|
"""Report git branch for sidebar display."""
|
|
cmd = f"report_git_branch {branch}"
|
|
if status:
|
|
cmd += f" --status={status}"
|
|
if tab:
|
|
cmd += f" --tab={tab}"
|
|
response = self._send_command(cmd)
|
|
if not response.startswith("OK"):
|
|
raise cmuxError(response)
|
|
|
|
def report_ports(self, *ports: int, tab: str = None) -> None:
|
|
"""Report listening ports for sidebar display."""
|
|
port_str = " ".join(str(p) for p in ports)
|
|
cmd = f"report_ports {port_str}"
|
|
if tab:
|
|
cmd += f" --tab={tab}"
|
|
response = self._send_command(cmd)
|
|
if not response.startswith("OK"):
|
|
raise cmuxError(response)
|
|
|
|
def clear_ports(self, tab: str = None) -> None:
|
|
"""Clear listening ports for sidebar display."""
|
|
cmd = "clear_ports"
|
|
if tab:
|
|
cmd += f" --tab={tab}"
|
|
response = self._send_command(cmd)
|
|
if not response.startswith("OK"):
|
|
raise cmuxError(response)
|
|
|
|
def sidebar_state(self, tab: str = None) -> str:
|
|
"""Dump all sidebar metadata for a tab."""
|
|
cmd = "sidebar_state"
|
|
if tab:
|
|
cmd += f" --tab={tab}"
|
|
return self._send_command(cmd)
|
|
|
|
def reset_sidebar(self, tab: str = None) -> None:
|
|
"""Clear all sidebar metadata for a tab."""
|
|
cmd = "reset_sidebar"
|
|
if tab:
|
|
cmd += f" --tab={tab}"
|
|
response = self._send_command(cmd)
|
|
if not response.startswith("OK"):
|
|
raise cmuxError(response)
|
|
|
|
def focus_notification(self, tab: Union[str, int], surface: Union[str, int, None] = None) -> None:
|
|
"""Focus tab/surface using the notification flow."""
|
|
if surface is None:
|
|
command = f"focus_notification {tab}"
|
|
else:
|
|
command = f"focus_notification {tab} {surface}"
|
|
response = self._send_command(command)
|
|
if not response.startswith("OK"):
|
|
raise cmuxError(response)
|
|
|
|
def flash_count(self, surface: Union[str, int]) -> int:
|
|
"""Get flash count for a surface by ID or index."""
|
|
response = self._send_command(f"flash_count {surface}")
|
|
if response.startswith("OK "):
|
|
return int(response.split(" ", 1)[1])
|
|
raise cmuxError(response)
|
|
|
|
def reset_flash_counts(self) -> None:
|
|
"""Reset flash counters."""
|
|
response = self._send_command("reset_flash_counts")
|
|
if not response.startswith("OK"):
|
|
raise cmuxError(response)
|
|
|
|
def read_screen(self) -> str:
|
|
"""Read the visible terminal text from the focused surface."""
|
|
return self._send_command("read_screen")
|
|
|
|
# Workspace commands
|
|
def list_workspaces(self) -> List[Tuple[int, str, str, bool]]:
|
|
"""List all workspaces."""
|
|
response = self._send_command("list_workspaces")
|
|
if response.startswith("ERROR: Unknown command"):
|
|
return self.list_tabs()
|
|
if response in ("No workspaces", "No tabs"):
|
|
return []
|
|
|
|
workspaces = []
|
|
for line in response.split("\n"):
|
|
if not line.strip():
|
|
continue
|
|
selected = line.startswith("*")
|
|
parts = line.lstrip("* ").split(" ", 2)
|
|
if len(parts) >= 3:
|
|
index = int(parts[0].rstrip(":"))
|
|
workspace_id = parts[1]
|
|
title = parts[2] if len(parts) > 2 else ""
|
|
workspaces.append((index, workspace_id, title, selected))
|
|
return workspaces
|
|
|
|
def new_workspace(self) -> str:
|
|
"""Create a new workspace. Returns the new workspace's ID."""
|
|
response = self._send_command("new_workspace")
|
|
if response.startswith("ERROR: Unknown command"):
|
|
return self.new_tab()
|
|
if response.startswith("OK "):
|
|
return response[3:]
|
|
raise cmuxError(response)
|
|
|
|
def close_workspace(self, workspace_id: str) -> None:
|
|
"""Close a workspace by ID."""
|
|
response = self._send_command(f"close_workspace {workspace_id}")
|
|
if response.startswith("ERROR: Unknown command"):
|
|
self.close_tab(workspace_id)
|
|
return
|
|
if not response.startswith("OK"):
|
|
raise cmuxError(response)
|
|
|
|
def select_workspace(self, workspace: Union[str, int]) -> None:
|
|
"""Select a workspace by ID or index."""
|
|
response = self._send_command(f"select_workspace {workspace}")
|
|
if response.startswith("ERROR: Unknown command"):
|
|
self.select_tab(workspace)
|
|
return
|
|
if not response.startswith("OK"):
|
|
raise cmuxError(response)
|
|
|
|
# Pane commands
|
|
def list_panes(self) -> List[Tuple[int, str, int, bool]]:
|
|
"""
|
|
List all panes in the current workspace.
|
|
Returns list of (index, pane_id, surface_count, is_focused) tuples.
|
|
"""
|
|
response = self._send_command("list_panes")
|
|
if response in ("No panes", "ERROR: No tab selected", "ERROR: No workspace selected"):
|
|
return []
|
|
|
|
panes = []
|
|
for line in response.split("\n"):
|
|
if not line.strip():
|
|
continue
|
|
selected = line.startswith("*")
|
|
parts = line.lstrip("* ").split()
|
|
if len(parts) >= 4:
|
|
index = int(parts[0].rstrip(":"))
|
|
pane_id = parts[1]
|
|
surface_count = int(parts[2].lstrip("["))
|
|
panes.append((index, pane_id, surface_count, selected))
|
|
return panes
|
|
|
|
def focus_pane(self, pane: Union[str, int]) -> None:
|
|
"""Focus a pane by ID or index in the current workspace."""
|
|
response = self._send_command(f"focus_pane {pane}")
|
|
if not response.startswith("OK"):
|
|
raise cmuxError(response)
|
|
|
|
def list_pane_surfaces(self, pane: Union[str, int, None] = None) -> List[Tuple[int, str, str, bool]]:
|
|
"""
|
|
List surfaces in a pane.
|
|
Returns list of (index, surface_id, title, is_selected) tuples.
|
|
If pane is None, uses the focused pane.
|
|
"""
|
|
if pane is not None:
|
|
response = self._send_command(f"list_pane_surfaces --pane={pane}")
|
|
else:
|
|
response = self._send_command("list_pane_surfaces")
|
|
|
|
if response in ("No surfaces", "No tabs in pane"):
|
|
return []
|
|
if response.startswith("ERROR:"):
|
|
raise cmuxError(response)
|
|
|
|
surfaces = []
|
|
for line in response.split("\n"):
|
|
if not line.strip():
|
|
continue
|
|
selected = line.startswith("*")
|
|
line2 = line.lstrip("* ").strip()
|
|
try:
|
|
idx_part, rest = line2.split(":", 1)
|
|
index = int(idx_part.strip())
|
|
rest = rest.strip()
|
|
except ValueError:
|
|
continue
|
|
|
|
panel_id = ""
|
|
title = rest
|
|
marker = " [panel:"
|
|
if marker in rest and rest.endswith("]"):
|
|
title, suffix = rest.split(marker, 1)
|
|
title = title.strip()
|
|
panel_id = suffix[:-1]
|
|
surfaces.append((index, panel_id, title, selected))
|
|
return surfaces
|
|
|
|
def focus_surface_by_panel(self, surface_id: str) -> None:
|
|
"""Focus a surface by its panel ID."""
|
|
response = self._send_command(f"focus_surface_by_panel {surface_id}")
|
|
if not response.startswith("OK"):
|
|
raise cmuxError(response)
|
|
|
|
def focus_webview(self, panel_id: str) -> None:
|
|
"""Move keyboard focus into a browser panel's WKWebView."""
|
|
response = self._send_command(f"focus_webview {panel_id}")
|
|
if not response.startswith("OK"):
|
|
raise cmuxError(response)
|
|
|
|
def is_webview_focused(self, panel_id: str) -> bool:
|
|
"""Return True if the browser panel's WKWebView is first responder."""
|
|
response = self._send_command(f"is_webview_focused {panel_id}")
|
|
if response.startswith("ERROR"):
|
|
raise cmuxError(response)
|
|
return response.strip().lower() == "true"
|
|
|
|
def wait_for_webview_focus(self, panel_id: str, timeout_s: float = 2.0) -> None:
|
|
"""Poll until the browser panel's WKWebView has focus, or raise."""
|
|
start = time.time()
|
|
while time.time() - start < timeout_s:
|
|
if self.is_webview_focused(panel_id):
|
|
return
|
|
time.sleep(0.05)
|
|
raise cmuxError(f"Timed out waiting for webview focus: {panel_id}")
|
|
|
|
def set_shortcut(self, name: str, combo: str) -> None:
|
|
"""Set a keyboard shortcut via the debug socket."""
|
|
response = self._send_command(f"set_shortcut {name} {combo}")
|
|
if not response.startswith("OK"):
|
|
raise cmuxError(response)
|
|
|
|
def simulate_shortcut(self, combo: str) -> None:
|
|
"""Simulate a keyDown shortcut via the debug socket."""
|
|
response = self._send_command(f"simulate_shortcut {combo}")
|
|
if not response.startswith("OK"):
|
|
raise cmuxError(response)
|
|
|
|
def simulate_type(self, text: str) -> None:
|
|
"""Insert text into the current first responder (debug builds only)."""
|
|
escaped = (
|
|
text
|
|
.replace("\\", "\\\\")
|
|
.replace("\r", "\\r")
|
|
.replace("\n", "\\n")
|
|
.replace("\t", "\\t")
|
|
)
|
|
response = self._send_command(f"simulate_type {escaped}")
|
|
if not response.startswith("OK"):
|
|
raise cmuxError(response)
|
|
|
|
def simulate_file_drop(self, surface: Union[str, int], paths: Union[str, List[str]]) -> None:
|
|
"""Simulate dropping file path(s) onto a terminal surface (debug builds only)."""
|
|
payload = paths if isinstance(paths, str) else "|".join(paths)
|
|
response = self._send_command(f"simulate_file_drop {surface} {payload}")
|
|
if not response.startswith("OK"):
|
|
raise cmuxError(response)
|
|
|
|
def activate_app(self) -> None:
|
|
"""Bring app + main window to front (debug builds only)."""
|
|
response = self._send_command("activate_app")
|
|
if not response.startswith("OK"):
|
|
raise cmuxError(response)
|
|
|
|
def is_terminal_focused(self, panel: Union[str, int]) -> bool:
|
|
"""Return True if the terminal panel's Ghostty view is first responder."""
|
|
response = self._send_command(f"is_terminal_focused {panel}")
|
|
if response.startswith("ERROR"):
|
|
raise cmuxError(response)
|
|
return response.strip().lower() == "true"
|
|
|
|
def identify(self) -> dict:
|
|
"""Best-effort legacy identify helper."""
|
|
response = self._send_command("identify")
|
|
if response.startswith("ERROR"):
|
|
raise cmuxError(response)
|
|
try:
|
|
return json.loads(response)
|
|
except Exception:
|
|
return {}
|
|
|
|
def layout_debug(self) -> dict:
|
|
"""Return bonsplit layout snapshot + selected panel bounds."""
|
|
response = self._send_command("layout_debug")
|
|
if not response.startswith("OK "):
|
|
raise cmuxError(response)
|
|
payload = response[3:].strip()
|
|
try:
|
|
return json.loads(payload)
|
|
except json.JSONDecodeError as e:
|
|
raise cmuxError(f"layout_debug JSON decode failed: {e}: {payload[:200]}")
|
|
|
|
def read_terminal_text(self, panel: Union[str, int, None] = None) -> str:
|
|
"""
|
|
Read visible terminal text for a panel.
|
|
Returns UTF-8 decoded text.
|
|
"""
|
|
cmd = "read_terminal_text"
|
|
if panel is not None:
|
|
cmd += f" {panel}"
|
|
response = self._send_command(cmd)
|
|
if not response.startswith("OK "):
|
|
raise cmuxError(response)
|
|
b64 = response[3:].strip()
|
|
raw = base64.b64decode(b64) if b64 else b""
|
|
return raw.decode("utf-8", errors="replace")
|
|
|
|
def render_stats(self, panel: Union[str, int, None] = None) -> dict:
|
|
"""Return terminal render stats (debug builds only)."""
|
|
cmd = "render_stats"
|
|
if panel is not None:
|
|
cmd += f" {panel}"
|
|
response = self._send_command(cmd)
|
|
if not response.startswith("OK "):
|
|
raise cmuxError(response)
|
|
payload = response[3:].strip()
|
|
try:
|
|
return json.loads(payload)
|
|
except json.JSONDecodeError as e:
|
|
raise cmuxError(f"render_stats JSON decode failed: {e}: {payload[:200]}")
|
|
|
|
def panel_snapshot_reset(self, panel: Union[str, int]) -> None:
|
|
"""Reset the stored snapshot for a panel (debug builds only)."""
|
|
response = self._send_command(f"panel_snapshot_reset {panel}")
|
|
if not response.startswith("OK"):
|
|
raise cmuxError(response)
|
|
|
|
def panel_snapshot(self, panel: Union[str, int], label: str = "") -> dict:
|
|
"""
|
|
Capture a screenshot of a panel and return pixel-diff info.
|
|
Returns: panel_id, changed_pixels, width, height, path.
|
|
"""
|
|
cmd = f"panel_snapshot {panel}"
|
|
if label:
|
|
cmd += f" {label}"
|
|
response = self._send_command(cmd)
|
|
if not response.startswith("OK "):
|
|
raise cmuxError(response)
|
|
payload = response[3:].strip()
|
|
parts = payload.split(" ", 4)
|
|
if len(parts) != 5:
|
|
raise cmuxError(f"panel_snapshot parse failed: {response}")
|
|
panel_id, changed, width, height, path = parts
|
|
return {
|
|
"panel_id": panel_id,
|
|
"changed_pixels": int(changed),
|
|
"width": int(width),
|
|
"height": int(height),
|
|
"path": path,
|
|
}
|
|
|
|
def bonsplit_underflow_count(self) -> int:
|
|
"""Return bonsplit arranged-subview underflow counter."""
|
|
response = self._send_command("bonsplit_underflow_count")
|
|
if response.startswith("OK "):
|
|
return int(response.split(" ", 1)[1])
|
|
raise cmuxError(response)
|
|
|
|
def reset_bonsplit_underflow_count(self) -> None:
|
|
"""Reset bonsplit arranged-subview underflow counter."""
|
|
response = self._send_command("reset_bonsplit_underflow_count")
|
|
if not response.startswith("OK"):
|
|
raise cmuxError(response)
|
|
|
|
def empty_panel_count(self) -> int:
|
|
"""Return the number of EmptyPanelView appearances."""
|
|
response = self._send_command("empty_panel_count")
|
|
if response.startswith("OK "):
|
|
return int(response.split(" ", 1)[1])
|
|
raise cmuxError(response)
|
|
|
|
def reset_empty_panel_count(self) -> None:
|
|
"""Reset the EmptyPanelView appearance counter."""
|
|
response = self._send_command("reset_empty_panel_count")
|
|
if not response.startswith("OK"):
|
|
raise cmuxError(response)
|
|
|
|
def new_surface(self, pane: Union[str, int, None] = None,
|
|
panel_type: str = "terminal", url: str = None) -> str:
|
|
"""
|
|
Create a new surface in a pane.
|
|
Returns the new surface ID.
|
|
"""
|
|
args = []
|
|
if panel_type != "terminal":
|
|
args.append(f"--type={panel_type}")
|
|
if pane is not None:
|
|
args.append(f"--pane={pane}")
|
|
if url:
|
|
args.append(f"--url={url}")
|
|
|
|
cmd = "new_surface"
|
|
if args:
|
|
cmd += " " + " ".join(args)
|
|
|
|
response = self._send_command(cmd)
|
|
if response.startswith("OK "):
|
|
return response[3:]
|
|
raise cmuxError(response)
|
|
|
|
def new_pane(self, direction: str = "right", panel_type: str = "terminal",
|
|
url: str = None) -> str:
|
|
"""
|
|
Create a new pane (split).
|
|
Returns the new surface/panel ID created in the new pane.
|
|
"""
|
|
args = [f"--direction={direction}"]
|
|
if panel_type != "terminal":
|
|
args.append(f"--type={panel_type}")
|
|
if url:
|
|
args.append(f"--url={url}")
|
|
|
|
cmd = "new_pane " + " ".join(args)
|
|
response = self._send_command(cmd)
|
|
if response.startswith("OK "):
|
|
return response[3:]
|
|
raise cmuxError(response)
|
|
|
|
def close_surface(self, surface: Union[str, int, None] = None) -> None:
|
|
"""
|
|
Close a surface (collapse split) by ID or index.
|
|
If surface is None, closes the focused surface.
|
|
"""
|
|
if surface is None:
|
|
response = self._send_command("close_surface")
|
|
else:
|
|
response = self._send_command(f"close_surface {surface}")
|
|
if not response.startswith("OK"):
|
|
raise cmuxError(response)
|
|
|
|
def surface_health(self, workspace: Union[str, int, None] = None) -> List[dict]:
|
|
"""
|
|
Check view health of all surfaces in a workspace.
|
|
Returns list of dicts with keys: index, id, type, in_window.
|
|
"""
|
|
arg = "" if workspace is None else str(workspace)
|
|
response = self._send_command(f"surface_health {arg}".rstrip())
|
|
if response.startswith("ERROR") or response == "No panels":
|
|
return []
|
|
|
|
surfaces = []
|
|
for line in response.split("\n"):
|
|
if not line.strip():
|
|
continue
|
|
parts = line.strip().split()
|
|
if len(parts) < 4:
|
|
continue
|
|
index = int(parts[0].rstrip(":"))
|
|
surface_id = parts[1]
|
|
panel_type = parts[2].split("=", 1)[1] if "=" in parts[2] else "unknown"
|
|
in_window = parts[3].split("=", 1)[1] == "true" if "=" in parts[3] else False
|
|
surfaces.append({
|
|
"index": index,
|
|
"id": surface_id,
|
|
"type": panel_type,
|
|
"in_window": in_window,
|
|
})
|
|
return surfaces
|
|
|
|
|
|
def main():
|
|
"""CLI interface for cmux"""
|
|
import sys
|
|
import argparse
|
|
|
|
parser = argparse.ArgumentParser(description="cmux CLI")
|
|
parser.add_argument("command", nargs="?", help="Command to send")
|
|
parser.add_argument("args", nargs="*", help="Command arguments")
|
|
parser.add_argument("-s", "--socket", default=None,
|
|
help="Socket path (default: auto-detect)")
|
|
|
|
args = parser.parse_args()
|
|
|
|
try:
|
|
with cmux(args.socket) as client:
|
|
if not args.command:
|
|
# Interactive mode
|
|
print("cmux CLI (type 'help' for commands, 'quit' to exit)")
|
|
while True:
|
|
try:
|
|
line = input("> ").strip()
|
|
if line.lower() in ("quit", "exit"):
|
|
break
|
|
if line:
|
|
response = client._send_command(line)
|
|
print(response)
|
|
except EOFError:
|
|
break
|
|
except KeyboardInterrupt:
|
|
print()
|
|
break
|
|
else:
|
|
# Single command mode
|
|
command = args.command
|
|
if args.args:
|
|
command += " " + " ".join(args.args)
|
|
response = client._send_command(command)
|
|
print(response)
|
|
except cmuxError as e:
|
|
print(f"Error: {e}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|