Merge branch 'main' into issue-151-ssh-remote-port-proxying

# Conflicts:
#	CLI/cmux.swift
#	Sources/ContentView.swift
#	Sources/GhosttyTerminalView.swift
#	Sources/Panels/BrowserPanel.swift
#	Sources/Panels/BrowserPanelView.swift
#	Sources/TabManager.swift
#	Sources/TerminalController.swift
#	Sources/Workspace.swift
#	Sources/WorkspaceContentView.swift
#	ghostty
This commit is contained in:
Lawrence Chen 2026-03-09 18:36:59 -07:00
commit bdebc8ecc9
205 changed files with 107859 additions and 6333 deletions

View file

@ -0,0 +1,51 @@
#!/usr/bin/env python3
"""Shared helpers for static regression tests."""
from __future__ import annotations
import shutil
import subprocess
from pathlib import Path
def repo_root() -> Path:
git = shutil.which("git")
if git is None:
return Path(__file__).resolve().parents[1]
try:
result = subprocess.run(
[git, "rev-parse", "--show-toplevel"],
capture_output=True,
text=True,
check=False,
timeout=2,
)
except (subprocess.TimeoutExpired, OSError):
return Path(__file__).resolve().parents[1]
if result.returncode == 0:
return Path(result.stdout.strip())
return Path(__file__).resolve().parents[1]
def extract_block(source: str, signature: str) -> str:
# Targeted helper for this regression suite: assumes braces in the matched
# block are structural (not inside strings/comments/character literals).
start = source.find(signature)
if start < 0:
raise ValueError(f"Missing signature: {signature}")
brace_start = source.find("{", start)
if brace_start < 0:
raise ValueError(f"Missing opening brace for: {signature}")
depth = 0
for idx in range(brace_start, len(source)):
char = source[idx]
if char == "{":
depth += 1
elif char == "}":
depth -= 1
if depth == 0:
return source[brace_start : idx + 1]
raise ValueError(f"Unbalanced braces for: {signature}")

View file

@ -1,126 +0,0 @@
#!/usr/bin/env python3
"""Static regression guards for browser chrome contrast in mixed theme setups."""
from __future__ import annotations
import subprocess
from pathlib import Path
def repo_root() -> Path:
result = subprocess.run(
["git", "rev-parse", "--show-toplevel"],
capture_output=True,
text=True,
)
if result.returncode == 0:
return Path(result.stdout.strip())
return Path(__file__).resolve().parents[1]
def extract_block(source: str, signature: str) -> str:
start = source.find(signature)
if start < 0:
raise ValueError(f"Missing signature: {signature}")
brace_start = source.find("{", start)
if brace_start < 0:
raise ValueError(f"Missing opening brace for: {signature}")
depth = 0
for idx in range(brace_start, len(source)):
char = source[idx]
if char == "{":
depth += 1
elif char == "}":
depth -= 1
if depth == 0:
return source[brace_start : idx + 1]
raise ValueError(f"Unbalanced braces for: {signature}")
def main() -> int:
root = repo_root()
view_path = root / "Sources" / "Panels" / "BrowserPanelView.swift"
source = view_path.read_text(encoding="utf-8")
failures: list[str] = []
try:
browser_panel_view_block = extract_block(source, "struct BrowserPanelView: View")
except ValueError as error:
failures.append(str(error))
browser_panel_view_block = ""
try:
resolver_block = extract_block(source, "func resolvedBrowserChromeColorScheme(")
except ValueError as error:
failures.append(str(error))
resolver_block = ""
if resolver_block:
if "backgroundColor.isLightColor ? .light : .dark" not in resolver_block:
failures.append(
"resolvedBrowserChromeColorScheme must map luminance to a light/dark ColorScheme"
)
try:
chrome_scheme_block = extract_block(
browser_panel_view_block,
"private var browserChromeColorScheme: ColorScheme",
)
except ValueError as error:
failures.append(str(error))
chrome_scheme_block = ""
if chrome_scheme_block and "resolvedBrowserChromeColorScheme(" not in chrome_scheme_block:
failures.append("browserChromeColorScheme must use resolvedBrowserChromeColorScheme")
try:
omnibar_background_block = extract_block(
browser_panel_view_block,
"private var omnibarPillBackgroundColor: NSColor",
)
except ValueError as error:
failures.append(str(error))
omnibar_background_block = ""
if omnibar_background_block and "for: browserChromeColorScheme" not in omnibar_background_block:
failures.append("omnibar pill background must use browserChromeColorScheme")
try:
address_bar_block = extract_block(
browser_panel_view_block,
"private var addressBar: some View",
)
except ValueError as error:
failures.append(str(error))
address_bar_block = ""
if address_bar_block and ".environment(\\.colorScheme, browserChromeColorScheme)" not in address_bar_block:
failures.append("addressBar must apply browserChromeColorScheme via environment")
try:
body_block = extract_block(browser_panel_view_block, "var body: some View")
except ValueError as error:
failures.append(str(error))
body_block = ""
if body_block:
if "OmnibarSuggestionsView(" not in body_block:
failures.append("Expected OmnibarSuggestionsView block in BrowserPanelView body")
elif ".environment(\\.colorScheme, browserChromeColorScheme)" not in body_block:
failures.append("Omnibar suggestions must apply browserChromeColorScheme via environment")
if failures:
print("FAIL: browser chrome contrast regression guards failed")
for failure in failures:
print(f" - {failure}")
return 1
print("PASS: browser chrome contrast regression guards are in place")
return 0
if __name__ == "__main__":
raise SystemExit(main())

View file

@ -1,86 +0,0 @@
#!/usr/bin/env python3
"""Static regression guard for browser console/errors CLI output formatting.
Ensures non-JSON `browser console list` and `browser errors list` do not fall
back to unconditional `OK` when logs exist.
"""
from __future__ import annotations
import subprocess
from pathlib import Path
def repo_root() -> Path:
result = subprocess.run(
["git", "rev-parse", "--show-toplevel"],
capture_output=True,
text=True,
)
if result.returncode == 0:
return Path(result.stdout.strip())
return Path(__file__).resolve().parents[1]
def extract_block(source: str, signature: str) -> str:
start = source.find(signature)
if start < 0:
raise ValueError(f"Missing signature: {signature}")
brace_start = source.find("{", start)
if brace_start < 0:
raise ValueError(f"Missing opening brace for: {signature}")
depth = 0
for idx in range(brace_start, len(source)):
char = source[idx]
if char == "{":
depth += 1
elif char == "}":
depth -= 1
if depth == 0:
return source[brace_start : idx + 1]
raise ValueError(f"Unbalanced braces for: {signature}")
def main() -> int:
root = repo_root()
failures: list[str] = []
cli_path = root / "CLI" / "cmux.swift"
cli_source = cli_path.read_text(encoding="utf-8")
browser_block = extract_block(cli_source, "private func runBrowserCommand(")
if "func displayBrowserLogItems(_ value: Any?) -> String?" not in browser_block:
failures.append("runBrowserCommand() is missing displayBrowserLogItems() helper")
else:
helper_block = extract_block(browser_block, "func displayBrowserLogItems(_ value: Any?) -> String?")
if "return \"[\\(level)] \\(text)\"" not in helper_block:
failures.append("displayBrowserLogItems() no longer renders level-prefixed log lines")
if "return \"[error] \\(message)\"" not in helper_block:
failures.append("displayBrowserLogItems() no longer renders concise JS error messages")
if "return displayBrowserValue(dict)" not in helper_block:
failures.append("displayBrowserLogItems() no longer falls back to structured formatting")
console_block = extract_block(browser_block, 'if subcommand == "console"')
if 'displayBrowserLogItems(payload["entries"])' not in console_block:
failures.append("browser console path no longer formats entries for non-JSON output")
if 'output(payload, fallback: "OK")' in console_block:
failures.append("browser console path regressed to unconditional OK output")
errors_block = extract_block(browser_block, 'if subcommand == "errors"')
if 'displayBrowserLogItems(payload["errors"])' not in errors_block:
failures.append("browser errors path no longer formats errors for non-JSON output")
if 'output(payload, fallback: "OK")' in errors_block:
failures.append("browser errors path regressed to unconditional OK output")
if failures:
print("FAIL: browser console/errors CLI output regression guard failed")
for item in failures:
print(f" - {item}")
return 1
print("PASS: browser console/errors CLI output regression guard is in place")
return 0
if __name__ == "__main__":
raise SystemExit(main())

View file

@ -1,92 +0,0 @@
#!/usr/bin/env python3
"""Static regression checks for browser DevTools/portal review fixes.
Guards two follow-up fixes:
1) DevTools toggle path must retry restore when inspector show is transiently ignored.
2) Browser portal visibility must propagate even if host is temporarily off-window.
"""
from __future__ import annotations
import re
import subprocess
from pathlib import Path
def repo_root() -> Path:
result = subprocess.run(
["git", "rev-parse", "--show-toplevel"],
capture_output=True,
text=True,
)
if result.returncode == 0:
return Path(result.stdout.strip())
return Path(__file__).resolve().parents[1]
def extract_block(source: str, signature: str) -> str:
start = source.find(signature)
if start < 0:
raise ValueError(f"Missing signature: {signature}")
brace_start = source.find("{", start)
if brace_start < 0:
raise ValueError(f"Missing opening brace for: {signature}")
depth = 0
for idx in range(brace_start, len(source)):
char = source[idx]
if char == "{":
depth += 1
elif char == "}":
depth -= 1
if depth == 0:
return source[brace_start : idx + 1]
raise ValueError(f"Unbalanced braces for: {signature}")
def main() -> int:
root = repo_root()
failures: list[str] = []
panel_path = root / "Sources" / "Panels" / "BrowserPanel.swift"
panel_source = panel_path.read_text(encoding="utf-8")
toggle_block = extract_block(panel_source, "func toggleDeveloperTools() -> Bool")
if "visibleAfterToggle" not in toggle_block:
failures.append("toggleDeveloperTools() no longer re-checks inspector visibility")
if "scheduleDeveloperToolsRestoreRetry()" not in toggle_block:
failures.append("toggleDeveloperTools() no longer schedules a DevTools restore retry")
view_path = root / "Sources" / "Panels" / "BrowserPanelView.swift"
view_source = view_path.read_text(encoding="utf-8")
portal_update_block = extract_block(view_source, "private func updateUsingWindowPortal(")
if "BrowserWindowPortalRegistry.updateEntryVisibility(" not in portal_update_block:
failures.append("BrowserPanelView.updateUsingWindowPortal() is missing deferred portal visibility propagation")
if "zPriority: coordinator.desiredPortalZPriority" not in portal_update_block:
failures.append("BrowserPanelView deferred portal update no longer propagates zPriority")
portal_path = root / "Sources" / "BrowserWindowPortal.swift"
portal_source = portal_path.read_text(encoding="utf-8")
if not re.search(
r"func\s+updateEntryVisibility\s*\(\s*forWebViewId\s+webViewId:\s*ObjectIdentifier,\s*visibleInUI:\s*Bool,\s*zPriority:\s*Int\s*\)",
portal_source,
flags=re.MULTILINE,
):
failures.append("WindowBrowserPortal is missing updateEntryVisibility(forWebViewId:visibleInUI:zPriority:)")
if not re.search(
r"static\s+func\s+updateEntryVisibility\s*\(\s*for\s+webView:\s*WKWebView,\s*visibleInUI:\s*Bool,\s*zPriority:\s*Int\s*\)",
portal_source,
flags=re.MULTILINE,
):
failures.append("BrowserWindowPortalRegistry is missing updateEntryVisibility(for:visibleInUI:zPriority:)")
if failures:
print("FAIL: browser devtools/portal regression guards failed")
for item in failures:
print(f" - {item}")
return 1
print("PASS: browser devtools/portal regression guards are in place")
return 0
if __name__ == "__main__":
raise SystemExit(main())

View file

@ -1,128 +0,0 @@
#!/usr/bin/env python3
"""Static regression guard for browser eval async wrapping + telemetry injection."""
from __future__ import annotations
import subprocess
from pathlib import Path
def repo_root() -> Path:
result = subprocess.run(
["git", "rev-parse", "--show-toplevel"],
capture_output=True,
text=True,
)
if result.returncode == 0:
return Path(result.stdout.strip())
return Path(__file__).resolve().parents[1]
def extract_block(source: str, signature: str) -> str:
start = source.find(signature)
if start < 0:
raise ValueError(f"Missing signature: {signature}")
brace_start = source.find("{", start)
if brace_start < 0:
raise ValueError(f"Missing opening brace for: {signature}")
depth = 0
for idx in range(brace_start, len(source)):
char = source[idx]
if char == "{":
depth += 1
elif char == "}":
depth -= 1
if depth == 0:
return source[brace_start : idx + 1]
raise ValueError(f"Unbalanced braces for: {signature}")
def extract_span(source: str, start_marker: str, end_marker: str) -> str:
start = source.find(start_marker)
if start < 0:
raise ValueError(f"Missing start marker: {start_marker}")
end = source.find(end_marker, start)
if end < 0:
raise ValueError(f"Missing end marker: {end_marker}")
return source[start:end]
def main() -> int:
root = repo_root()
failures: list[str] = []
terminal_path = root / "Sources" / "TerminalController.swift"
panel_path = root / "Sources" / "Panels" / "BrowserPanel.swift"
terminal_source = terminal_path.read_text(encoding="utf-8")
panel_source = panel_path.read_text(encoding="utf-8")
if "preferAsync: Bool = false" not in terminal_source:
failures.append("v2RunJavaScript() no longer exposes preferAsync toggle")
run_js_block = extract_block(terminal_source, "private func v2RunJavaScript(")
if "callAsyncJavaScript" not in run_js_block:
failures.append("v2RunJavaScript() no longer uses callAsyncJavaScript for async JS")
run_browser_js_block = extract_block(terminal_source, "private func v2RunBrowserJavaScript(")
required_wrapper_tokens = [
"let asyncFunctionBody =",
"__cmuxMaybeAwait",
"__cmux_t",
"__cmux_v",
"return await __cmuxEvalInFrame();",
"preferAsync: true",
]
for token in required_wrapper_tokens:
if token not in run_browser_js_block:
failures.append(f"v2RunBrowserJavaScript() missing async eval wrapper token: {token}")
if "v2BrowserUndefinedSentinel" not in terminal_source:
failures.append("TerminalController is missing undefined sentinel handling")
if "v2BrowserEvalEnvelopeTypeUndefined" not in terminal_source:
failures.append("TerminalController is missing undefined envelope decode constant")
hook_block = extract_block(terminal_source, "private func v2BrowserEnsureTelemetryHooks(")
if "BrowserPanel.telemetryHookBootstrapScriptSource" not in hook_block:
failures.append("v2BrowserEnsureTelemetryHooks() no longer uses shared BrowserPanel telemetry source")
if "static let telemetryHookBootstrapScriptSource" not in panel_source:
failures.append("BrowserPanel is missing telemetryHookBootstrapScriptSource")
if "static let dialogTelemetryHookBootstrapScriptSource" not in panel_source:
failures.append("BrowserPanel is missing dialogTelemetryHookBootstrapScriptSource")
base_script_span = extract_span(
panel_source,
"static let telemetryHookBootstrapScriptSource =",
"static let dialogTelemetryHookBootstrapScriptSource =",
)
if "window.alert = function(message)" in base_script_span:
failures.append("Document-start telemetry script should not override alert dialogs")
if "window.confirm = function(message)" in base_script_span:
failures.append("Document-start telemetry script should not override confirm dialogs")
if "window.prompt = function(message, defaultValue)" in base_script_span:
failures.append("Document-start telemetry script should not override prompt dialogs")
panel_init_block = extract_block(
panel_source,
"init(workspaceId: UUID, initialURL: URL? = nil, bypassInsecureHTTPHostOnce: String? = nil)",
)
required_init_tokens = [
"config.userContentController.addUserScript(",
"source: Self.telemetryHookBootstrapScriptSource",
"injectionTime: .atDocumentStart",
]
for token in required_init_tokens:
if token not in panel_init_block:
failures.append(f"BrowserPanel init() missing telemetry user-script token: {token}")
if failures:
print("FAIL: browser eval async wrapper / telemetry injection regression guard failed")
for item in failures:
print(f" - {item}")
return 1
print("PASS: browser eval async wrapper / telemetry injection regression guard is in place")
return 0
if __name__ == "__main__":
raise SystemExit(main())

View file

@ -1,89 +0,0 @@
#!/usr/bin/env python3
"""Static regression guard for browser eval CLI output formatting.
Ensures `cmux browser <surface> eval <script>` prints the evaluated value
instead of always printing `OK`.
"""
from __future__ import annotations
import subprocess
from pathlib import Path
def repo_root() -> Path:
result = subprocess.run(
["git", "rev-parse", "--show-toplevel"],
capture_output=True,
text=True,
)
if result.returncode == 0:
return Path(result.stdout.strip())
return Path(__file__).resolve().parents[1]
def extract_block(source: str, signature: str) -> str:
start = source.find(signature)
if start < 0:
raise ValueError(f"Missing signature: {signature}")
brace_start = source.find("{", start)
if brace_start < 0:
raise ValueError(f"Missing opening brace for: {signature}")
depth = 0
for idx in range(brace_start, len(source)):
char = source[idx]
if char == "{":
depth += 1
elif char == "}":
depth -= 1
if depth == 0:
return source[brace_start : idx + 1]
raise ValueError(f"Unbalanced braces for: {signature}")
def main() -> int:
root = repo_root()
failures: list[str] = []
cli_path = root / "CLI" / "cmux.swift"
cli_source = cli_path.read_text(encoding="utf-8")
browser_block = extract_block(cli_source, "private func runBrowserCommand(")
if "func displayBrowserValue(_ value: Any) -> String" not in browser_block:
failures.append("runBrowserCommand() is missing displayBrowserValue() helper")
else:
value_block = extract_block(browser_block, "func displayBrowserValue(_ value: Any) -> String")
if 'dict["__cmux_t"] as? String' not in value_block or 'type == "undefined"' not in value_block:
failures.append("displayBrowserValue() no longer maps __cmux_t=undefined to literal 'undefined'")
required_guards = [
"if value is NSNull",
"if let string = value as? String",
"if let bool = value as? Bool",
"if let number = value as? NSNumber",
]
for guard in required_guards:
if guard not in value_block:
failures.append(f"displayBrowserValue() no longer handles: {guard}")
eval_block = extract_block(browser_block, 'if subcommand == "eval"')
if 'let payload = try client.sendV2(method: "browser.eval"' not in eval_block:
failures.append("browser eval path no longer calls browser.eval v2 method")
if 'if let value = payload["value"]' not in eval_block:
failures.append("browser eval path no longer reads payload value")
if "fallback = displayBrowserValue(value)" not in eval_block:
failures.append("browser eval path no longer formats payload value for CLI output")
if 'output(payload, fallback: "OK")' in eval_block:
failures.append("browser eval path regressed to unconditional OK output")
if failures:
print("FAIL: browser eval CLI output regression guard failed")
for item in failures:
print(f" - {item}")
return 1
print("PASS: browser eval CLI output regression guard is in place")
return 0
if __name__ == "__main__":
raise SystemExit(main())

View file

@ -1,84 +0,0 @@
#!/usr/bin/env python3
"""Static regression checks for favicon sync during browser navigation.
Guards the race fix where stale async favicon fetches must not overwrite the
icon after the user navigates (including back/forward and same-URL reloads),
while still allowing same-document URL changes (pushState/hash updates).
"""
from __future__ import annotations
import subprocess
from pathlib import Path
def repo_root() -> Path:
result = subprocess.run(
["git", "rev-parse", "--show-toplevel"],
capture_output=True,
text=True,
)
if result.returncode == 0:
return Path(result.stdout.strip())
return Path(__file__).resolve().parents[1]
def extract_block(source: str, signature: str) -> str:
start = source.find(signature)
if start < 0:
raise ValueError(f"Missing signature: {signature}")
brace_start = source.find("{", start)
if brace_start < 0:
raise ValueError(f"Missing opening brace for: {signature}")
depth = 0
for idx in range(brace_start, len(source)):
char = source[idx]
if char == "{":
depth += 1
elif char == "}":
depth -= 1
if depth == 0:
return source[brace_start : idx + 1]
raise ValueError(f"Unbalanced braces for: {signature}")
def main() -> int:
root = repo_root()
failures: list[str] = []
panel_path = root / "Sources" / "Panels" / "BrowserPanel.swift"
panel_source = panel_path.read_text(encoding="utf-8")
if "private var faviconRefreshGeneration: Int = 0" not in panel_source:
failures.append("BrowserPanel is missing faviconRefreshGeneration state")
refresh_block = extract_block(panel_source, "private func refreshFavicon(from webView: WKWebView)")
if refresh_block.count("isCurrentFaviconRefresh(") < 3:
failures.append("refreshFavicon() no longer checks staleness at each async stage")
current_guard_block = extract_block(panel_source, "private func isCurrentFaviconRefresh(")
if "generation == faviconRefreshGeneration" not in current_guard_block:
failures.append("isCurrentFaviconRefresh() no longer validates refresh generation")
if "webView.url?.absoluteString == pageURLString" in current_guard_block:
failures.append("isCurrentFaviconRefresh() still blocks same-document history URL changes")
loading_block = extract_block(panel_source, "private func handleWebViewLoadingChanged(_ newValue: Bool)")
if "faviconRefreshGeneration &+= 1" not in loading_block:
failures.append("handleWebViewLoadingChanged() no longer invalidates old favicon refreshes")
if "faviconTask?.cancel()" not in loading_block:
failures.append("handleWebViewLoadingChanged() no longer cancels stale favicon tasks")
if "lastFaviconURLString = nil" not in loading_block:
failures.append("handleWebViewLoadingChanged() no longer resets favicon URL cache on new loads")
if failures:
print("FAIL: browser favicon navigation regression guard failed")
for item in failures:
print(f" - {item}")
return 1
print("PASS: browser favicon navigation guard is in place")
return 0
if __name__ == "__main__":
raise SystemExit(main())

View file

@ -4,7 +4,8 @@ Regression test:
1. Focusing a blank browser surface should focus the omnibar.
2. Focusing a pane that contains a blank browser should focus the omnibar.
3. If command palette is open, focusing that blank browser surface must not steal input.
4. Cmd+P switcher focusing an existing blank browser surface should focus the omnibar.
4. Cmd+P switcher should list only workspaces, then switching to a workspace with a
focused blank browser should focus the omnibar.
"""
import json
@ -281,24 +282,47 @@ def main() -> int:
workspace_ids.remove(workspace_id)
time.sleep(0.3)
# Scenario 4: Cmd+P switcher selecting an existing blank browser surface should focus omnibar.
workspace_id = client.new_workspace()
workspace_ids.append(workspace_id)
client.select_workspace(workspace_id)
# Scenario 4: Cmd+P switcher should only list workspaces, and switching to a workspace
# that has a focused blank browser should focus the omnibar.
target_workspace_id = client.new_workspace()
workspace_ids.append(target_workspace_id)
client.select_workspace(target_workspace_id)
time.sleep(0.4)
window_id = current_window_id(client)
if not set_command_palette_visible(client, window_id, False):
raise cmuxError("Failed to reset command palette before scenario 4")
raise cmuxError("Failed to reset command palette before scenario 4 (target setup)")
switcher_browser_id = client.new_surface(panel_type="browser")
time.sleep(0.3)
client.focus_surface_by_panel(switcher_browser_id)
switcher_surfaces = client.list_surfaces()
switcher_terminal_id = next((surface_id for _, surface_id, _ in switcher_surfaces if surface_id != switcher_browser_id), None)
if not switcher_terminal_id:
raise cmuxError("Missing terminal surface for Cmd+P switcher scenario")
did_focus_target_browser = wait_for(
lambda: bool(
browser_address_bar_focus_state(
client,
surface_id=switcher_browser_id,
request_id="browser-focus-switcher-target-setup"
).get("focused")
),
timeout_s=3.0,
interval_s=0.1
)
if not did_focus_target_browser:
raise cmuxError("Failed to focus omnibar on target workspace browser before Cmd+P switch")
client.focus_surface_by_panel(switcher_terminal_id)
source_workspace_id = client.new_workspace()
workspace_ids.append(source_workspace_id)
client.select_workspace(source_workspace_id)
time.sleep(0.4)
window_id = current_window_id(client)
if not set_command_palette_visible(client, window_id, False):
raise cmuxError("Failed to reset command palette before scenario 4 (source setup)")
source_surfaces = client.list_surfaces()
source_terminal_id = next((surface_id for _, surface_id, _ in source_surfaces), None)
if not source_terminal_id:
raise cmuxError("Missing terminal surface for Cmd+P workspace switcher scenario")
client.focus_surface_by_panel(source_terminal_id)
time.sleep(0.2)
client.simulate_shortcut("cmd+p")
@ -316,11 +340,13 @@ def main() -> int:
):
raise cmuxError("Cmd+P did not open command palette switcher")
client.simulate_type("new tab")
time.sleep(0.2)
switcher_results = command_palette_results(client, window_id, limit=100)
switcher_ids = [row.get("command_id") for row in switcher_results if isinstance(row.get("command_id"), str)]
has_surface_rows = any(command_id.startswith("switcher.surface.") for command_id in switcher_ids)
if has_surface_rows:
raise cmuxError("Cmd+P switcher listed unexpected surface rows; expected workspace-only results")
target_command_id = f"switcher.surface.{workspace_id.lower()}.{switcher_browser_id.lower()}"
switcher_results = command_palette_results(client, window_id, limit=50)
target_command_id = f"switcher.workspace.{target_workspace_id.lower()}"
target_index = next(
(
idx for idx, row in enumerate(switcher_results)
@ -329,7 +355,7 @@ def main() -> int:
None
)
if target_index is None:
raise cmuxError(f"Cmd+P switcher did not list target surface command {target_command_id}")
raise cmuxError(f"Cmd+P switcher did not list target workspace command {target_command_id}")
if not move_command_palette_selection_to_index(client, window_id, target_index):
raise cmuxError(f"Failed to move Cmd+P selection to result index {target_index}")
@ -358,9 +384,50 @@ def main() -> int:
interval_s=0.1
)
if not did_focus_switcher_target:
raise cmuxError("Cmd+P switcher focus to blank browser did not focus omnibar")
raise cmuxError("Cmd+P workspace switch did not restore blank browser omnibar focus")
print("PASS: blank-browser focus paths (surface, pane, and Cmd+P switcher) drive omnibar, while command palette visibility blocks focus stealing")
# Scenario 5: Cmd+P switcher should dismiss on Escape reliably.
client.select_workspace(source_workspace_id)
time.sleep(0.4)
window_id = current_window_id(client)
if not set_command_palette_visible(client, window_id, False):
raise cmuxError("Failed to reset command palette before scenario 5")
client.focus_surface_by_panel(source_terminal_id)
time.sleep(0.2)
client.simulate_shortcut("cmd+p")
if not wait_for(
lambda: bool(
v2_call(
client,
"debug.command_palette.visible",
{"window_id": window_id},
request_id="palette-visible-switcher-open-escape"
).get("visible")
),
timeout_s=2.0,
interval_s=0.1
):
raise cmuxError("Cmd+P did not open command palette switcher before Escape scenario")
client.simulate_shortcut("escape")
did_dismiss_switcher_on_escape = wait_for(
lambda: not bool(
v2_call(
client,
"debug.command_palette.visible",
{"window_id": window_id},
request_id="palette-visible-switcher-after-escape"
).get("visible")
),
timeout_s=3.0,
interval_s=0.1
)
if not did_dismiss_switcher_on_escape:
raise cmuxError("Cmd+P Escape did not dismiss command palette switcher")
print("PASS: blank-browser focus paths (surface, pane, Cmd+P Enter switcher, and Cmd+P Escape dismiss) drive omnibar, while command palette visibility blocks focus stealing")
return 0
except cmuxError as exc:

View file

@ -1,125 +0,0 @@
#!/usr/bin/env python3
"""Static regression guards for compact browser omnibar sizing."""
from __future__ import annotations
import re
import subprocess
from pathlib import Path
def repo_root() -> Path:
result = subprocess.run(
["git", "rev-parse", "--show-toplevel"],
capture_output=True,
text=True,
)
if result.returncode == 0:
return Path(result.stdout.strip())
return Path(__file__).resolve().parents[1]
def extract_block(source: str, signature: str) -> str:
start = source.find(signature)
if start < 0:
raise ValueError(f"Missing signature: {signature}")
brace_start = source.find("{", start)
if brace_start < 0:
raise ValueError(f"Missing opening brace for: {signature}")
depth = 0
for idx in range(brace_start, len(source)):
char = source[idx]
if char == "{":
depth += 1
elif char == "}":
depth -= 1
if depth == 0:
return source[brace_start : idx + 1]
raise ValueError(f"Unbalanced braces for: {signature}")
def parse_cgfloat_constant(source: str, name: str) -> float | None:
match = re.search(
rf"private let {re.escape(name)}: CGFloat = ([0-9]+(?:\.[0-9]+)?)",
source,
)
if not match:
return None
return float(match.group(1))
def main() -> int:
root = repo_root()
failures: list[str] = []
view_path = root / "Sources" / "Panels" / "BrowserPanelView.swift"
view_source = view_path.read_text(encoding="utf-8")
hit_size = parse_cgfloat_constant(view_source, "addressBarButtonHitSize")
if hit_size is None:
failures.append("addressBarButtonHitSize constant is missing")
elif hit_size > 26:
failures.append(
f"addressBarButtonHitSize regressed to {hit_size:g}; expected <= 26 for compact omnibar height"
)
vertical_padding = parse_cgfloat_constant(view_source, "addressBarVerticalPadding")
if vertical_padding is None:
failures.append("addressBarVerticalPadding constant is missing")
elif vertical_padding > 4:
failures.append(
f"addressBarVerticalPadding regressed to {vertical_padding:g}; expected <= 4 for compact omnibar height"
)
omnibar_corner_radius = parse_cgfloat_constant(view_source, "omnibarPillCornerRadius")
if omnibar_corner_radius is None:
failures.append("omnibarPillCornerRadius constant is missing")
elif omnibar_corner_radius > 10:
failures.append(
f"omnibarPillCornerRadius regressed to {omnibar_corner_radius:g}; expected <= 10 to keep a squircle profile"
)
address_bar_block = extract_block(view_source, "private var addressBar: some View")
if ".padding(.vertical, addressBarVerticalPadding)" not in address_bar_block:
failures.append("addressBar no longer applies compact vertical padding via addressBarVerticalPadding")
omnibar_field_block = extract_block(view_source, "private var omnibarField: some View")
if omnibar_field_block.count(
"RoundedRectangle(cornerRadius: omnibarPillCornerRadius, style: .continuous)"
) < 2:
failures.append(
"omnibarField no longer uses continuous rounded-rectangle background+stroke tied to omnibarPillCornerRadius"
)
button_bar_block = extract_block(view_source, "private var addressBarButtonBar: some View")
hit_frame_uses = button_bar_block.count("addressBarButtonHitSize")
if hit_frame_uses < 3:
failures.append(
"navigation buttons no longer consistently use addressBarButtonHitSize frames (padding may be lost)"
)
extract_block(view_source, "private struct OmnibarAddressButtonStyle: ButtonStyle")
style_body_block = extract_block(view_source, "private struct OmnibarAddressButtonStyleBody: View")
if "configuration.isPressed" not in style_body_block:
failures.append("OmnibarAddressButtonStyleBody is missing pressed-state styling")
if "isHovered" not in style_body_block or ".onHover" not in style_body_block:
failures.append("OmnibarAddressButtonStyleBody is missing hover-state styling")
style_uses = view_source.count(".buttonStyle(OmnibarAddressButtonStyle())")
if style_uses < 4:
failures.append(
"address bar buttons no longer consistently use OmnibarAddressButtonStyle"
)
if failures:
print("FAIL: browser omnibar compact layout regression guards failed")
for failure in failures:
print(f" - {failure}")
return 1
print("PASS: browser omnibar compact layout regression guards are in place")
return 0
if __name__ == "__main__":
raise SystemExit(main())

View file

@ -0,0 +1,138 @@
#!/usr/bin/env bash
# Regression test for the pinned GhosttyKit artifact verification helper.
set -euo pipefail
ROOT_DIR="$(cd "$(dirname "$0")/.." && pwd)"
SCRIPT="$ROOT_DIR/scripts/download-prebuilt-ghosttykit.sh"
TMP_DIR="$(mktemp -d)"
trap 'rm -rf "$TMP_DIR"' EXIT
WORKFLOWS=(
"$ROOT_DIR/.github/workflows/ci.yml"
"$ROOT_DIR/.github/workflows/nightly.yml"
"$ROOT_DIR/.github/workflows/release.yml"
)
FIXTURE_SHA="7dd589824d4c9bda8265355718800cccaf7189a0"
FIXTURE_DIR="$TMP_DIR/fixture"
SUCCESS_DIR="$TMP_DIR/success"
MISMATCH_DIR="$TMP_DIR/mismatch"
MISSING_ENTRY_DIR="$TMP_DIR/missing-entry"
BIN_DIR="$TMP_DIR/bin"
CHECKSUMS_FILE="$TMP_DIR/ghosttykit-checksums.txt"
SUCCESS_LOG="$TMP_DIR/curl-success.log"
MISMATCH_LOG="$TMP_DIR/curl-mismatch.log"
MISMATCH_OUTPUT="$TMP_DIR/mismatch.out"
MISSING_ENTRY_OUTPUT="$TMP_DIR/missing-entry.out"
mkdir -p "$FIXTURE_DIR/GhosttyKit.xcframework" "$SUCCESS_DIR" "$MISMATCH_DIR" "$MISSING_ENTRY_DIR" "$BIN_DIR"
printf 'fixture\n' > "$FIXTURE_DIR/GhosttyKit.xcframework/marker.txt"
(cd "$FIXTURE_DIR" && tar czf "$TMP_DIR/GhosttyKit.xcframework.tar.gz" GhosttyKit.xcframework)
ACTUAL_SHA256="$(shasum -a 256 "$TMP_DIR/GhosttyKit.xcframework.tar.gz" | awk '{print $1}')"
printf '%s %s\n' "$FIXTURE_SHA" "$ACTUAL_SHA256" > "$CHECKSUMS_FILE"
for workflow in "${WORKFLOWS[@]}"; do
if ! grep -Fq './scripts/download-prebuilt-ghosttykit.sh' "$workflow"; then
echo "FAIL: $workflow must call download-prebuilt-ghosttykit.sh"
exit 1
fi
done
cat > "$BIN_DIR/curl" <<'EOF'
#!/usr/bin/env bash
set -euo pipefail
LOG_FILE="${TEST_CURL_LOG:?}"
FIXTURE_ARCHIVE="${TEST_FIXTURE_ARCHIVE:?}"
OUTPUT=""
while [ "$#" -gt 0 ]; do
case "$1" in
-o)
OUTPUT="$2"
shift 2
;;
*)
printf '%s\n' "$1" >> "$LOG_FILE"
shift
;;
esac
done
if [ -z "$OUTPUT" ]; then
echo "curl stub missing -o output path" >&2
exit 1
fi
cp "$FIXTURE_ARCHIVE" "$OUTPUT"
EOF
chmod +x "$BIN_DIR/curl"
(
cd "$SUCCESS_DIR"
PATH="$BIN_DIR:$PATH" \
TEST_CURL_LOG="$SUCCESS_LOG" \
TEST_FIXTURE_ARCHIVE="$TMP_DIR/GhosttyKit.xcframework.tar.gz" \
GHOSTTY_SHA="$FIXTURE_SHA" \
GHOSTTYKIT_CHECKSUMS_FILE="$CHECKSUMS_FILE" \
"$SCRIPT"
)
if [ ! -f "$SUCCESS_DIR/GhosttyKit.xcframework/marker.txt" ]; then
echo "FAIL: verification helper did not extract GhosttyKit.xcframework"
exit 1
fi
if [ -f "$SUCCESS_DIR/GhosttyKit.xcframework.tar.gz" ]; then
echo "FAIL: verification helper did not clean up the downloaded archive"
exit 1
fi
for expected_arg in --retry --retry-delay --retry-all-errors; do
if ! grep -Fxq -- "$expected_arg" "$SUCCESS_LOG"; then
echo "FAIL: curl invocation missing $expected_arg"
exit 1
fi
done
printf '%s %s\n' "$FIXTURE_SHA" "0000000000000000000000000000000000000000000000000000000000000000" > "$CHECKSUMS_FILE"
if (
cd "$MISMATCH_DIR"
PATH="$BIN_DIR:$PATH" \
TEST_CURL_LOG="$MISMATCH_LOG" \
TEST_FIXTURE_ARCHIVE="$TMP_DIR/GhosttyKit.xcframework.tar.gz" \
GHOSTTY_SHA="$FIXTURE_SHA" \
GHOSTTYKIT_CHECKSUMS_FILE="$CHECKSUMS_FILE" \
"$SCRIPT"
) >"$MISMATCH_OUTPUT" 2>&1; then
echo "FAIL: verification helper succeeded with an invalid pinned checksum"
exit 1
fi
if ! grep -Fq "GhosttyKit.xcframework.tar.gz checksum mismatch" "$MISMATCH_OUTPUT"; then
echo "FAIL: verification helper did not report checksum mismatch"
exit 1
fi
printf '%s %s\n' "0000000000000000000000000000000000000000" "$ACTUAL_SHA256" > "$CHECKSUMS_FILE"
if (
cd "$MISSING_ENTRY_DIR"
PATH="$BIN_DIR:$PATH" \
TEST_CURL_LOG="$MISMATCH_LOG" \
TEST_FIXTURE_ARCHIVE="$TMP_DIR/GhosttyKit.xcframework.tar.gz" \
GHOSTTY_SHA="$FIXTURE_SHA" \
GHOSTTYKIT_CHECKSUMS_FILE="$CHECKSUMS_FILE" \
"$SCRIPT"
) >"$MISSING_ENTRY_OUTPUT" 2>&1; then
echo "FAIL: verification helper succeeded without a pinned checksum entry"
exit 1
fi
if ! grep -Fq "Missing pinned GhosttyKit checksum for ghostty $FIXTURE_SHA" "$MISSING_ENTRY_OUTPUT"; then
echo "FAIL: verification helper did not report a missing pinned checksum entry"
exit 1
fi
echo "PASS: GhosttyKit verification helper enforces pinned checksums"

View file

@ -1,6 +1,6 @@
#!/usr/bin/env bash
# Regression test for https://github.com/manaflow-ai/cmux/issues/385.
# Ensures self-hosted UI tests are never run for fork pull requests.
# Ensures Depot-hosted UI tests are never run for fork pull requests.
set -euo pipefail
ROOT_DIR="$(cd "$(dirname "$0")/.." && pwd)"
@ -9,21 +9,21 @@ WORKFLOW_FILE="$ROOT_DIR/.github/workflows/ci.yml"
EXPECTED_IF="if: github.event_name != 'pull_request' || github.event.pull_request.head.repo.full_name == github.repository"
if ! grep -Fq "$EXPECTED_IF" "$WORKFLOW_FILE"; then
echo "FAIL: Missing fork pull_request guard for ui-tests in $WORKFLOW_FILE"
echo "FAIL: Missing fork pull_request guard for tests in $WORKFLOW_FILE"
echo "Expected line:"
echo " $EXPECTED_IF"
exit 1
fi
if ! awk '
/^ tests:/ { in_tests=1; next }
/^ tests-depot:/ { in_tests=1; next }
in_tests && /^ [^[:space:]]/ { in_tests=0 }
in_tests && /runs-on: self-hosted/ { saw_self_hosted=1 }
in_tests && /runs-on: depot-macos-latest/ { saw_depot=1 }
in_tests && /github.event.pull_request.head.repo.full_name == github.repository/ { saw_guard=1 }
END { exit !(saw_self_hosted && saw_guard) }
END { exit !(saw_depot && saw_guard) }
' "$WORKFLOW_FILE"; then
echo "FAIL: tests block must keep both self-hosted and fork guard"
echo "FAIL: tests-depot block must keep both depot-macos-latest runner and fork guard"
exit 1
fi
echo "PASS: tests self-hosted fork guard is present"
echo "PASS: tests-depot Depot runner fork guard is present"

View file

@ -0,0 +1,29 @@
#!/usr/bin/env bash
# Regression test for universal GhosttyKit and Release build settings.
set -euo pipefail
ROOT_DIR="$(cd "$(dirname "$0")/.." && pwd)"
for file in \
"$ROOT_DIR/.github/workflows/build-ghosttykit.yml" \
"$ROOT_DIR/scripts/setup.sh" \
"$ROOT_DIR/scripts/build-sign-upload.sh"
do
if ! grep -Fq -- '-Dxcframework-target=universal' "$file"; then
echo "FAIL: $file must build GhosttyKit with -Dxcframework-target=universal"
exit 1
fi
done
if ! awk '
/\/\* Release \*\// { in_release=1; next }
in_release && /ONLY_ACTIVE_ARCH = YES;/ { saw_yes=1 }
in_release && /ONLY_ACTIVE_ARCH = NO;/ { saw_no=1 }
in_release && /name = Release;/ { in_release=0 }
END { exit !(saw_no && !saw_yes) }
' "$ROOT_DIR/GhosttyTabs.xcodeproj/project.pbxproj"; then
echo "FAIL: Release configurations in project.pbxproj must use ONLY_ACTIVE_ARCH = NO"
exit 1
fi
echo "PASS: GhosttyKit builds universal and Release configs disable ONLY_ACTIVE_ARCH"

View file

@ -0,0 +1,186 @@
#!/usr/bin/env python3
"""
Regression tests for Resources/bin/claude wrapper hook injection.
"""
from __future__ import annotations
import json
import os
import shutil
import socket
import subprocess
import tempfile
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
SOURCE_WRAPPER = ROOT / "Resources" / "bin" / "claude"
def make_executable(path: Path, content: str) -> None:
path.write_text(content, encoding="utf-8")
path.chmod(0o755)
def read_lines(path: Path) -> list[str]:
if not path.exists():
return []
return [line.rstrip("\n") for line in path.read_text(encoding="utf-8").splitlines()]
def parse_settings_arg(argv: list[str]) -> dict:
if "--settings" not in argv:
return {}
index = argv.index("--settings")
if index + 1 >= len(argv):
return {}
return json.loads(argv[index + 1])
def run_wrapper(*, socket_state: str, argv: list[str]) -> tuple[int, list[str], list[str], str, str]:
with tempfile.TemporaryDirectory(prefix="cmux-claude-wrapper-test-") as td:
tmp = Path(td)
wrapper_dir = tmp / "wrapper-bin"
real_dir = tmp / "real-bin"
wrapper_dir.mkdir(parents=True, exist_ok=True)
real_dir.mkdir(parents=True, exist_ok=True)
wrapper = wrapper_dir / "claude"
shutil.copy2(SOURCE_WRAPPER, wrapper)
wrapper.chmod(0o755)
real_args_log = tmp / "real-args.log"
real_claudecode_log = tmp / "real-claudecode.log"
cmux_log = tmp / "cmux.log"
socket_path = str(tmp / "cmux.sock")
make_executable(
real_dir / "claude",
"""#!/usr/bin/env bash
set -euo pipefail
: > "$FAKE_REAL_ARGS_LOG"
printf '%s\\n' "${CLAUDECODE-__UNSET__}" > "$FAKE_REAL_CLAUDECODE_LOG"
for arg in "$@"; do
printf '%s\\n' "$arg" >> "$FAKE_REAL_ARGS_LOG"
done
""",
)
make_executable(
wrapper_dir / "cmux",
"""#!/usr/bin/env bash
set -euo pipefail
printf '%s timeout=%s\\n' "$*" "${CMUXTERM_CLI_RESPONSE_TIMEOUT_SEC-__UNSET__}" >> "$FAKE_CMUX_LOG"
if [[ "${1:-}" == "--socket" ]]; then
shift 2
fi
if [[ "${1:-}" == "ping" ]]; then
if [[ "${FAKE_CMUX_PING_OK:-0}" == "1" ]]; then
exit 0
fi
exit 1
fi
exit 0
""",
)
test_socket: socket.socket | None = None
if socket_state in {"live", "stale"}:
test_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
test_socket.bind(socket_path)
env = os.environ.copy()
env["PATH"] = f"{wrapper_dir}:{real_dir}:/usr/bin:/bin"
env["CMUX_SURFACE_ID"] = "surface:test"
env["CMUX_SOCKET_PATH"] = socket_path
env["FAKE_REAL_ARGS_LOG"] = str(real_args_log)
env["FAKE_REAL_CLAUDECODE_LOG"] = str(real_claudecode_log)
env["FAKE_CMUX_LOG"] = str(cmux_log)
env["FAKE_CMUX_PING_OK"] = "1" if socket_state == "live" else "0"
env["CLAUDECODE"] = "nested-session-sentinel"
try:
proc = subprocess.run(
["claude", *argv],
cwd=tmp,
env=env,
capture_output=True,
text=True,
check=False,
)
finally:
if test_socket is not None:
test_socket.close()
claudecode_lines = read_lines(real_claudecode_log)
claudecode_value = claudecode_lines[0] if claudecode_lines else ""
return proc.returncode, read_lines(real_args_log), read_lines(cmux_log), proc.stderr.strip(), claudecode_value
def expect(condition: bool, message: str, failures: list[str]) -> None:
if not condition:
failures.append(message)
def test_live_socket_injects_supported_hooks(failures: list[str]) -> None:
code, real_argv, cmux_log, stderr, claudecode = run_wrapper(socket_state="live", argv=["hello"])
expect(code == 0, f"live socket: wrapper exited {code}: {stderr}", failures)
expect("--settings" in real_argv, f"live socket: missing --settings in args: {real_argv}", failures)
expect("--session-id" in real_argv, f"live socket: missing --session-id in args: {real_argv}", failures)
expect(real_argv[-1] == "hello", f"live socket: expected original arg to pass through, got {real_argv}", failures)
expect(any(" ping" in line for line in cmux_log), f"live socket: expected cmux ping, got {cmux_log}", failures)
expect(
any("timeout=0.75" in line for line in cmux_log),
f"live socket: expected bounded ping timeout, got {cmux_log}",
failures,
)
expect(claudecode == "__UNSET__", f"live socket: expected CLAUDECODE unset, got {claudecode!r}", failures)
settings = parse_settings_arg(real_argv)
hooks = settings.get("hooks", {})
expect(set(hooks.keys()) == {"SessionStart", "Stop", "Notification"}, f"unexpected hook keys: {hooks.keys()}", failures)
serialized = json.dumps(settings, sort_keys=True)
expect("UserPromptSubmit" not in serialized, "UserPromptSubmit hook should not be injected", failures)
expect("prompt-submit" not in serialized, "prompt-submit subcommand should not be injected", failures)
def test_missing_socket_skips_hook_injection(failures: list[str]) -> None:
code, real_argv, cmux_log, stderr, claudecode = run_wrapper(socket_state="missing", argv=["hello"])
expect(code == 0, f"missing socket: wrapper exited {code}: {stderr}", failures)
expect(real_argv == ["hello"], f"missing socket: expected passthrough args, got {real_argv}", failures)
expect(cmux_log == [], f"missing socket: expected no cmux calls, got {cmux_log}", failures)
expect(claudecode == "__UNSET__", f"missing socket: expected CLAUDECODE unset, got {claudecode!r}", failures)
def test_stale_socket_skips_hook_injection(failures: list[str]) -> None:
code, real_argv, cmux_log, stderr, claudecode = run_wrapper(socket_state="stale", argv=["hello"])
expect(code == 0, f"stale socket: wrapper exited {code}: {stderr}", failures)
expect(real_argv == ["hello"], f"stale socket: expected passthrough args, got {real_argv}", failures)
expect(any(" ping" in line for line in cmux_log), f"stale socket: expected cmux ping probe, got {cmux_log}", failures)
expect(
any("timeout=0.75" in line for line in cmux_log),
f"stale socket: expected bounded ping timeout, got {cmux_log}",
failures,
)
expect(claudecode == "__UNSET__", f"stale socket: expected CLAUDECODE unset, got {claudecode!r}", failures)
def main() -> int:
failures: list[str] = []
test_live_socket_injects_supported_hooks(failures)
test_missing_socket_skips_hook_injection(failures)
test_stale_socket_skips_hook_injection(failures)
if failures:
print("FAIL: claude wrapper regression checks failed")
for failure in failures:
print(f"- {failure}")
return 1
print("PASS: claude wrapper hooks handle missing/stale sockets and inject only supported hooks")
return 0
if __name__ == "__main__":
raise SystemExit(main())

View file

@ -0,0 +1,150 @@
#!/usr/bin/env python3
"""Regression test: CLI should auto-discover tagged debug sockets from CMUX_TAG."""
from __future__ import annotations
import glob
import os
import shutil
import socket
import subprocess
import threading
def resolve_cmux_cli() -> str:
explicit = os.environ.get("CMUX_CLI_BIN") or os.environ.get("CMUX_CLI")
if explicit and os.path.exists(explicit) and os.access(explicit, os.X_OK):
return explicit
candidates: list[str] = []
candidates.extend(glob.glob(os.path.expanduser("~/Library/Developer/Xcode/DerivedData/*/Build/Products/Debug/cmux")))
candidates.extend(glob.glob("/tmp/cmux-*/Build/Products/Debug/cmux"))
candidates = [p for p in candidates if os.path.exists(p) and os.access(p, os.X_OK)]
if candidates:
candidates.sort(key=os.path.getmtime, reverse=True)
return candidates[0]
in_path = shutil.which("cmux")
if in_path:
return in_path
raise RuntimeError("Unable to find cmux CLI binary. Set CMUX_CLI_BIN.")
class PingServer:
def __init__(self, socket_path: str):
self.socket_path = socket_path
self.ready = threading.Event()
self.error: Exception | None = None
self._thread = threading.Thread(target=self._run, daemon=True)
def start(self) -> None:
self._thread.start()
def wait_ready(self, timeout: float) -> bool:
return self.ready.wait(timeout)
def join(self, timeout: float) -> None:
self._thread.join(timeout=timeout)
def _run(self) -> None:
server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
try:
if os.path.exists(self.socket_path):
os.remove(self.socket_path)
server.bind(self.socket_path)
server.listen(1)
server.settimeout(6.0)
self.ready.set()
# The CLI may probe candidate sockets with a connect-only check before
# issuing the actual command, so handle more than one connection.
for _ in range(4):
conn, _ = server.accept()
with conn:
conn.settimeout(2.0)
data = b""
while b"\n" not in data:
chunk = conn.recv(4096)
if not chunk:
break
data += chunk
if b"ping" in data:
conn.sendall(b"PONG\n")
return
raise RuntimeError("Did not receive ping command on test socket")
except Exception as exc: # pragma: no cover - explicit surface on failure
self.error = exc
self.ready.set()
finally:
server.close()
def main() -> int:
try:
cli_path = resolve_cmux_cli()
except Exception as exc:
print(f"FAIL: {exc}")
return 1
tag = f"cli-autodiscover-{os.getpid()}"
socket_path = f"/tmp/cmux-debug-{tag}.sock"
server = PingServer(socket_path)
server.start()
if not server.wait_ready(2.0):
print("FAIL: socket server did not become ready")
return 1
if server.error is not None:
print(f"FAIL: socket server failed to start: {server.error}")
return 1
env = os.environ.copy()
env["CMUX_SOCKET_PATH"] = "/tmp/cmux.sock"
env["CMUX_TAG"] = tag
env["CMUX_CLI_SENTRY_DISABLED"] = "1"
env["CMUX_CLAUDE_HOOK_SENTRY_DISABLED"] = "1"
try:
proc = subprocess.run(
[cli_path, "ping"],
text=True,
capture_output=True,
env=env,
timeout=8,
check=False,
)
except Exception as exc:
print(f"FAIL: invoking cmux ping failed: {exc}")
return 1
finally:
server.join(timeout=2.0)
try:
os.remove(socket_path)
except OSError:
pass
if server.error is not None:
print(f"FAIL: socket server error: {server.error}")
return 1
if proc.returncode != 0:
print("FAIL: cmux ping returned non-zero status")
print(f"stdout={proc.stdout!r}")
print(f"stderr={proc.stderr!r}")
return 1
if proc.stdout.strip() != "PONG":
print("FAIL: cmux ping did not use auto-discovered socket")
print(f"stdout={proc.stdout!r}")
print(f"stderr={proc.stderr!r}")
return 1
print("PASS: cmux ping auto-discovers tagged socket from CMUX_TAG")
return 0
if __name__ == "__main__":
raise SystemExit(main())

View file

@ -1,115 +0,0 @@
#!/usr/bin/env python3
"""Regression test: CLI socket Sentry telemetry must apply to all commands."""
from __future__ import annotations
import subprocess
from pathlib import Path
def get_repo_root() -> Path:
result = subprocess.run(
["git", "rev-parse", "--show-toplevel"],
capture_output=True,
text=True,
check=False,
)
if result.returncode == 0:
return Path(result.stdout.strip())
return Path.cwd()
def require(content: str, needle: str, message: str, failures: list[str]) -> None:
if needle not in content:
failures.append(message)
def reject(content: str, needle: str, message: str, failures: list[str]) -> None:
if needle in content:
failures.append(message)
def main() -> int:
repo_root = get_repo_root()
cli_path = repo_root / "CLI" / "cmux.swift"
if not cli_path.exists():
print(f"FAIL: missing expected file: {cli_path}")
return 1
content = cli_path.read_text(encoding="utf-8")
failures: list[str] = []
require(
content,
"private final class CLISocketSentryTelemetry {",
"Missing CLISocketSentryTelemetry definition",
failures,
)
require(
content,
'processEnv["CMUX_CLI_SENTRY_DISABLED"] == "1" ||',
"Missing CMUX_CLI_SENTRY_DISABLED kill switch",
failures,
)
require(
content,
'processEnv["CMUX_CLAUDE_HOOK_SENTRY_DISABLED"] == "1"',
"Missing backwards-compatible CMUX_CLAUDE_HOOK_SENTRY_DISABLED kill switch",
failures,
)
require(
content,
"private var shouldEmit: Bool {\n !disabledByEnv\n }",
"Telemetry scope should be command-agnostic (only disabled by env kill switch)",
failures,
)
require(
content,
'let crumb = Breadcrumb(level: .info, category: "cmux.cli")',
"Telemetry breadcrumb category should be cmux.cli",
failures,
)
require(
content,
'"command": command,',
"Base telemetry context must include command name",
failures,
)
require(
content,
"let cliTelemetry = CLISocketSentryTelemetry(",
"CLI should initialize generic socket telemetry",
failures,
)
require(
content,
'cliTelemetry.breadcrumb(\n "socket.connect.attempt",',
"CLI should emit socket.connect.attempt breadcrumb for commands",
failures,
)
reject(
content,
"self.enabled = command == \"claude-hook\"",
"Telemetry regressed to claude-hook-only scope",
failures,
)
reject(
content,
"enabled && !disabledByEnv",
"Telemetry still depends on legacy enabled flag",
failures,
)
if failures:
print("FAIL: CLI socket telemetry scope regression(s) detected")
for failure in failures:
print(f"- {failure}")
return 1
print("PASS: CLI socket telemetry scope is command-agnostic")
return 0
if __name__ == "__main__":
raise SystemExit(main())

View file

@ -1,147 +0,0 @@
#!/usr/bin/env python3
"""Regression tests for CLI subcommand help coverage and accuracy."""
from __future__ import annotations
import re
import subprocess
from pathlib import Path
def get_repo_root() -> Path:
result = subprocess.run(
["git", "rev-parse", "--show-toplevel"],
capture_output=True,
text=True,
check=False,
)
if result.returncode == 0:
return Path(result.stdout.strip())
return Path.cwd()
def require(content: str, needle: str, message: str, failures: list[str]) -> None:
if needle not in content:
failures.append(message)
def extract_switch_commands(content: str, start_index: int = 0) -> tuple[set[str], int]:
marker = "switch command {"
marker_index = content.find(marker, start_index)
if marker_index == -1:
return set(), -1
open_brace = content.find("{", marker_index)
if open_brace == -1:
return set(), -1
depth = 1
cursor = open_brace + 1
while cursor < len(content) and depth > 0:
char = content[cursor]
if char == "{":
depth += 1
elif char == "}":
depth -= 1
cursor += 1
block = content[open_brace + 1:cursor - 1]
commands: set[str] = set()
collecting_case = False
case_lines: list[str] = []
for line in block.splitlines():
stripped = line.strip()
if stripped.startswith("case "):
collecting_case = True
case_lines = [line]
elif collecting_case:
case_lines.append(line)
if collecting_case and ":" in line:
case_text = "\n".join(case_lines)
commands.update(re.findall(r'"([^"]+)"', case_text))
collecting_case = False
case_lines = []
return commands, cursor
def main() -> int:
repo_root = get_repo_root()
cli_path = repo_root / "CLI" / "cmux.swift"
if not cli_path.exists():
print(f"FAIL: missing expected file: {cli_path}")
return 1
content = cli_path.read_text(encoding="utf-8")
failures: list[str] = []
require(
content,
'if commandArgs.contains("--help") || commandArgs.contains("-h") {',
"Subcommand help pre-dispatch gate is missing",
failures,
)
require(
content,
'if dispatchSubcommandHelp(command: command, commandArgs: commandArgs) {',
"Subcommand help dispatch call is missing",
failures,
)
require(
content,
"print(\"Unknown command '\\(command)'. Run 'cmux help' to see available commands.\")",
"Subcommand help fallback unknown-command line is missing",
failures,
)
require(
content,
"print(\"Unknown command '\\(command)'. Run 'cmux help' to see available commands.\")\n return",
"Subcommand help fallback must return before command execution",
failures,
)
dispatch_commands, next_index = extract_switch_commands(content, 0)
subcommand_usage_commands, _ = extract_switch_commands(content, next_index if next_index != -1 else 0)
if not dispatch_commands:
failures.append("Failed to parse main dispatch switch command list")
if not subcommand_usage_commands:
failures.append("Failed to parse subcommandUsage switch command list")
missing_help_entries = sorted(dispatch_commands - subcommand_usage_commands)
if missing_help_entries:
failures.append(
"Missing subcommandUsage entries for dispatch command(s): "
+ ", ".join(missing_help_entries)
)
# Regression checks for concrete help text that previously drifted from dispatch logic.
for needle, message in [
('case "help":', "Missing subcommandUsage entry for help"),
("Usage: cmux help", "help subcommand usage text is missing"),
("Usage: cmux move-workspace-to-window --workspace <id|ref|index> --window <id|ref|index>", "move-workspace-to-window help must document index handles"),
("--tab <id|ref|index> Target tab (accepts tab:<n> or surface:<n>; default: $CMUX_TAB_ID, then $CMUX_SURFACE_ID, then focused tab)", "tab-action help must document CMUX_TAB_ID/CMUX_SURFACE_ID fallback"),
("--workspace <id|ref|index> Workspace to rename (default: current/$CMUX_WORKSPACE_ID)", "rename-workspace help must document CMUX_WORKSPACE_ID fallback"),
("text|html|value|count|box|styles|attr: [--selector <css> | <css>]", "browser get help must document --selector"),
("attr: [--attr <name> | <name>]", "browser get attr help must document --attr"),
("styles: [--property <name>]", "browser get styles help must document --property"),
("role: [--name <text>] [--exact] <role>", "browser find role help must document --name/--exact"),
("text|label|placeholder|alt|title|testid: [--exact] <text>", "browser find text-like help must document --exact"),
("nth: [--index <n> | <n>] [--selector <css> | <css>]", "browser find nth help must document --index/--selector"),
("route <pattern> [--abort] [--body <text>]", "browser network route help must document --abort/--body"),
]:
require(content, needle, message, failures)
if failures:
print("FAIL: CLI subcommand help regression(s) detected")
for failure in failures:
print(f"- {failure}")
return 1
print("PASS: CLI subcommand help coverage and flag/env documentation are present")
return 0
if __name__ == "__main__":
raise SystemExit(main())

View file

@ -1,149 +0,0 @@
#!/usr/bin/env python3
"""Regression test: `cmux tree` command wiring and output contract."""
from __future__ import annotations
import subprocess
from pathlib import Path
def get_repo_root() -> Path:
result = subprocess.run(
["git", "rev-parse", "--show-toplevel"],
capture_output=True,
text=True,
check=False,
)
if result.returncode == 0:
return Path(result.stdout.strip())
return Path.cwd()
def require(content: str, needle: str, message: str, failures: list[str]) -> None:
if needle not in content:
failures.append(message)
def main() -> int:
repo_root = get_repo_root()
cli_path = repo_root / "CLI" / "cmux.swift"
controller_path = repo_root / "Sources" / "TerminalController.swift"
if not cli_path.exists():
print(f"FAIL: missing expected file: {cli_path}")
return 1
if not controller_path.exists():
print(f"FAIL: missing expected file: {controller_path}")
return 1
content = cli_path.read_text(encoding="utf-8")
controller_content = controller_path.read_text(encoding="utf-8")
failures: list[str] = []
require(
content,
'case "tree":\n try runTreeCommand(commandArgs: commandArgs, client: client, jsonOutput: jsonOutput, idFormat: idFormat)',
"Missing `tree` command dispatch",
failures,
)
require(
content,
"tree [--all] [--workspace <id|ref|index>]",
"Top-level usage text missing tree command",
failures,
)
require(
content,
"Usage: cmux tree [flags]",
"Subcommand help for `cmux tree --help` is missing",
failures,
)
require(
content,
"Known flags: --all --workspace <id|ref|index> --json",
"Tree flag validation for --all/--workspace is missing",
failures,
)
require(
content,
"--json Structured JSON output",
"Tree help text should document --json",
failures,
)
require(
content,
'print(jsonString(formatIDs(payload, mode: idFormat)))',
"Tree command JSON output should honor --id-format conversion",
failures,
)
# Data sources needed for full hierarchy + browser URLs.
for method in [
'method: "system.tree"',
'method: "system.identify"',
'method: "window.list"',
'method: "workspace.list"',
'method: "pane.list"',
'method: "surface.list"',
'method: "browser.tab.list"',
'method: "browser.url.get"',
]:
require(
content,
method,
f"Tree command is missing expected API call: {method}",
failures,
)
# Text tree rendering contract.
for glyph in ['"├── "', '"└── "', '""']:
require(
content,
glyph,
f"Tree output missing box-drawing glyph: {glyph}",
failures,
)
for marker in ["[current]", "[selected]", "[focused]", "◀ active", "◀ here"]:
require(
content,
marker,
f"Tree output missing required marker: {marker}",
failures,
)
require(
content,
'surfaceType.lowercased() == "browser"',
"Tree surface rendering should special-case browser surfaces",
failures,
)
require(
content,
'let url = surface["url"] as? String',
"Tree surface rendering should include browser URL when available",
failures,
)
# Server-side one-shot hierarchy path for performance.
for needle, message in [
('case "system.tree":', "Socket router is missing system.tree dispatch"),
('"system.tree"', "Capabilities list should advertise system.tree"),
("private func v2SystemTree(params: [String: Any]) -> V2CallResult {", "Missing v2SystemTree implementation"),
('"active":', "system.tree payload should include focused path"),
('"caller":', "system.tree payload should include caller path"),
('"windows":', "system.tree payload should include hierarchy windows"),
]:
require(controller_content, needle, message, failures)
if failures:
print("FAIL: cmux tree command regression(s) detected")
for failure in failures:
print(f"- {failure}")
return 1
print("PASS: cmux tree command wiring and output contract are present")
return 0
if __name__ == "__main__":
raise SystemExit(main())

View file

@ -1,85 +0,0 @@
#!/usr/bin/env python3
"""Regression test: CLI version output wiring keeps commit metadata support."""
from __future__ import annotations
import subprocess
from pathlib import Path
def get_repo_root() -> Path:
result = subprocess.run(
["git", "rev-parse", "--show-toplevel"],
capture_output=True,
text=True,
check=False,
)
if result.returncode == 0:
return Path(result.stdout.strip())
return Path.cwd()
def require(content: str, needle: str, message: str, failures: list[str]) -> None:
if needle not in content:
failures.append(message)
def main() -> int:
repo_root = get_repo_root()
cli_path = repo_root / "CLI" / "cmux.swift"
if not cli_path.exists():
print(f"FAIL: missing expected file: {cli_path}")
return 1
content = cli_path.read_text(encoding="utf-8")
failures: list[str] = []
require(
content,
'let commit = info["CMUXCommit"].flatMap { normalizedCommitHash($0) }',
"versionSummary no longer reads CMUXCommit metadata",
failures,
)
require(
content,
'return "\\(baseSummary) [\\(commit)]"',
"versionSummary no longer appends commit metadata",
failures,
)
require(
content,
'if let commit = dictionary["CMUXCommit"] as? String,',
"Info.plist parsing no longer reads CMUXCommit",
failures,
)
require(
content,
"if let commit = gitCommitHash(at: current) {",
"Project fallback no longer probes git commit hash",
failures,
)
require(
content,
'["git", "-C", directory.path, "rev-parse", "--short=9", "HEAD"]',
"Git commit probe command changed unexpectedly",
failures,
)
require(
content,
'normalizedCommitHash(ProcessInfo.processInfo.environment["CMUX_COMMIT"])',
"Environment commit fallback (CMUX_COMMIT) is missing",
failures,
)
if failures:
print("FAIL: CLI version commit metadata regression(s) detected")
for failure in failures:
print(f"- {failure}")
return 1
print("PASS: CLI version commit metadata wiring is intact")
return 0
if __name__ == "__main__":
raise SystemExit(main())

View file

@ -0,0 +1,196 @@
#!/usr/bin/env python3
"""
Regression test: `cmux --version` must not scan huge sibling app lists just to
resolve optional version metadata.
"""
from __future__ import annotations
import glob
import os
import plistlib
import shutil
import subprocess
import tempfile
import time
JUNK_APP_COUNT = 40000
RSS_LIMIT_KB = 64 * 1024
TIMEOUT_SECONDS = 10.0
EXPECTED_STDOUT = "cmux 9.9.9 (999)"
def resolve_cmux_cli() -> str:
explicit = os.environ.get("CMUX_CLI_BIN") or os.environ.get("CMUX_CLI")
if explicit and os.path.exists(explicit) and os.access(explicit, os.X_OK):
return explicit
candidates: list[str] = []
candidates.extend(glob.glob(os.path.expanduser("~/Library/Developer/Xcode/DerivedData/*/Build/Products/Debug/cmux")))
candidates.extend(glob.glob("/tmp/cmux-*/Build/Products/Debug/cmux"))
candidates = [p for p in candidates if os.path.exists(p) and os.access(p, os.X_OK)]
if candidates:
candidates.sort(key=os.path.getmtime, reverse=True)
return candidates[0]
in_path = shutil.which("cmux")
if in_path:
return in_path
raise RuntimeError("Unable to find cmux CLI binary. Set CMUX_CLI_BIN.")
def copy_runtime_frameworks(cli_path: str, fixture_contents: str) -> None:
frameworks_dir = os.path.join(fixture_contents, "Frameworks")
os.makedirs(frameworks_dir, exist_ok=True)
search_roots: list[str] = []
current = os.path.dirname(cli_path)
for _ in range(4):
search_roots.append(os.path.join(current, "Frameworks"))
search_roots.append(os.path.join(current, "PackageFrameworks"))
parent = os.path.dirname(current)
if parent == current:
break
current = parent
for search_root in search_roots:
sentry_framework = os.path.join(search_root, "Sentry.framework")
if os.path.isdir(sentry_framework):
shutil.copytree(sentry_framework, os.path.join(frameworks_dir, "Sentry.framework"))
return
def build_fixture(root: str, cli_path: str) -> str:
app_path = os.path.join(root, "cmux.app")
contents_path = os.path.join(app_path, "Contents")
resources_path = os.path.join(contents_path, "Resources")
bin_path = os.path.join(resources_path, "bin")
os.makedirs(bin_path, exist_ok=True)
fixture_cli = os.path.join(bin_path, "cmux")
shutil.copy2(cli_path, fixture_cli)
copy_runtime_frameworks(cli_path, contents_path)
info = {
"CFBundleExecutable": "cmux",
"CFBundleIdentifier": "test.cmux.version-memory-guard",
"CFBundlePackageType": "APPL",
"CFBundleShortVersionString": "9.9.9",
"CFBundleVersion": "999",
}
with open(os.path.join(contents_path, "Info.plist"), "wb") as handle:
plistlib.dump(info, handle)
for index in range(JUNK_APP_COUNT):
open(os.path.join(resources_path, f"junk-{index:05d}.app"), "wb").close()
return fixture_cli
def run_with_limits(cli_path: str, *args: str) -> dict[str, object]:
env = dict(os.environ)
env.pop("CMUX_COMMIT", None)
proc = subprocess.Popen(
[cli_path, *args],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
env=env,
)
started = time.time()
peak_rss_kb = 0
failure_reason: str | None = None
while True:
exit_code = proc.poll()
if exit_code is not None:
stdout, stderr = proc.communicate()
return {
"exit_code": exit_code,
"stdout": stdout.strip(),
"stderr": stderr.strip(),
"elapsed": time.time() - started,
"peak_rss_kb": peak_rss_kb,
"failure_reason": None,
}
try:
rss_kb = int(
subprocess.check_output(
["ps", "-o", "rss=", "-p", str(proc.pid)],
text=True,
).strip()
or "0"
)
except subprocess.CalledProcessError:
rss_kb = 0
peak_rss_kb = max(peak_rss_kb, rss_kb)
elapsed = time.time() - started
if rss_kb > RSS_LIMIT_KB:
failure_reason = f"rss limit exceeded ({rss_kb} KB > {RSS_LIMIT_KB} KB)"
elif elapsed > TIMEOUT_SECONDS:
failure_reason = f"timeout exceeded ({elapsed:.2f}s > {TIMEOUT_SECONDS:.2f}s)"
if failure_reason:
proc.kill()
stdout, stderr = proc.communicate()
return {
"exit_code": proc.returncode,
"stdout": stdout.strip(),
"stderr": stderr.strip(),
"elapsed": elapsed,
"peak_rss_kb": peak_rss_kb,
"failure_reason": failure_reason,
}
time.sleep(0.05)
def main() -> int:
try:
cli_path = resolve_cmux_cli()
except Exception as exc:
print(f"FAIL: {exc}")
return 1
with tempfile.TemporaryDirectory(prefix="cmux-version-memory-guard-") as root:
fixture_cli = build_fixture(root, cli_path)
result = run_with_limits(fixture_cli, "--version")
if result["failure_reason"]:
print("FAIL: `cmux --version` exceeded runtime guard")
print(f"reason={result['failure_reason']}")
print(f"elapsed={result['elapsed']:.2f}s")
print(f"peak_rss_kb={result['peak_rss_kb']}")
print(f"stdout={result['stdout']}")
print(f"stderr={result['stderr']}")
return 1
if result["exit_code"] != 0:
print("FAIL: `cmux --version` exited non-zero")
print(f"exit={result['exit_code']}")
print(f"stdout={result['stdout']}")
print(f"stderr={result['stderr']}")
return 1
if result["stdout"] != EXPECTED_STDOUT:
print("FAIL: unexpected version output")
print(f"stdout={result['stdout']!r}")
print(f"expected={EXPECTED_STDOUT!r}")
return 1
print(
"PASS: `cmux --version` exits within memory/time limits "
f"(peak_rss_kb={result['peak_rss_kb']}, elapsed={result['elapsed']:.2f}s)"
)
return 0
if __name__ == "__main__":
raise SystemExit(main())

View file

@ -1,118 +0,0 @@
#!/usr/bin/env python3
"""Regression test for command-palette socket-listener restart command wiring."""
from __future__ import annotations
import subprocess
from pathlib import Path
def get_repo_root() -> Path:
result = subprocess.run(
["git", "rev-parse", "--show-toplevel"],
capture_output=True,
text=True,
)
if result.returncode == 0:
return Path(result.stdout.strip())
return Path.cwd()
def read_text(path: Path) -> str:
return path.read_text(encoding="utf-8")
def require(content: str, needle: str, message: str, failures: list[str]) -> None:
if needle not in content:
failures.append(message)
def main() -> int:
repo_root = get_repo_root()
content_view_path = repo_root / "Sources" / "ContentView.swift"
app_delegate_path = repo_root / "Sources" / "AppDelegate.swift"
missing_paths = [
str(path)
for path in [content_view_path, app_delegate_path]
if not path.exists()
]
if missing_paths:
print("Missing expected files:")
for path in missing_paths:
print(f" - {path}")
return 1
content_view = read_text(content_view_path)
app_delegate = read_text(app_delegate_path)
failures: list[str] = []
require(
content_view,
'commandId: "palette.restartSocketListener"',
"Missing `palette.restartSocketListener` command contribution",
failures,
)
require(
content_view,
'title: constant("Restart CLI Listener")',
"Missing `Restart CLI Listener` command title",
failures,
)
require(
content_view,
'registry.register(commandId: "palette.restartSocketListener") {',
"Missing command handler registration for `palette.restartSocketListener`",
failures,
)
require(
content_view,
"AppDelegate.shared?.restartSocketListener(nil)",
"Socket restart command handler does not call `AppDelegate.restartSocketListener`",
failures,
)
require(
app_delegate,
"@objc func restartSocketListener(_ sender: Any?) {",
"Missing `AppDelegate.restartSocketListener` action",
failures,
)
require(
app_delegate,
"private func socketListenerConfigurationIfEnabled() -> (mode: SocketControlMode, path: String)? {",
"Missing shared socket listener configuration helper",
failures,
)
require(
app_delegate,
'restartSocketListenerIfEnabled(source: "menu.command")',
"`restartSocketListener` no longer delegates to restart helper",
failures,
)
require(
app_delegate,
"TerminalController.shared.stop()",
"`restartSocketListenerIfEnabled` no longer stops current listener before restart",
failures,
)
require(
app_delegate,
"TerminalController.shared.start(tabManager: tabManager, socketPath: config.path, accessMode: config.mode)",
"`restartSocketListenerIfEnabled` no longer starts listener with current settings",
failures,
)
if failures:
print("FAIL: command-palette socket restart command regression(s) detected")
for failure in failures:
print(f"- {failure}")
return 1
print("PASS: command-palette socket restart command wiring is intact")
return 0
if __name__ == "__main__":
raise SystemExit(main())

View file

@ -1,126 +0,0 @@
#!/usr/bin/env python3
"""Regression test for command-palette update command wiring."""
from __future__ import annotations
import re
import subprocess
from pathlib import Path
def get_repo_root() -> Path:
result = subprocess.run(
["git", "rev-parse", "--show-toplevel"],
capture_output=True,
text=True,
)
if result.returncode == 0:
return Path(result.stdout.strip())
return Path.cwd()
def read_text(path: Path) -> str:
return path.read_text(encoding="utf-8")
def expect_regex(content: str, pattern: str, message: str, failures: list[str]) -> None:
if re.search(pattern, content, flags=re.DOTALL) is None:
failures.append(message)
def main() -> int:
repo_root = get_repo_root()
content_view_path = repo_root / "Sources" / "ContentView.swift"
app_delegate_path = repo_root / "Sources" / "AppDelegate.swift"
controller_path = repo_root / "Sources" / "Update" / "UpdateController.swift"
missing_paths = [
str(path)
for path in [content_view_path, app_delegate_path, controller_path]
if not path.exists()
]
if missing_paths:
print("Missing expected files:")
for path in missing_paths:
print(f" - {path}")
return 1
content_view = read_text(content_view_path)
app_delegate = read_text(app_delegate_path)
controller = read_text(controller_path)
failures: list[str] = []
expect_regex(
content_view,
r'static\s+let\s+updateHasAvailable\s*=\s*"update\.hasAvailable"',
"Missing `CommandPaletteContextKeys.updateHasAvailable`",
failures,
)
expect_regex(
content_view,
r'if\s+case\s+\.updateAvailable\s*=\s*updateViewModel\.effectiveState\s*\{\s*snapshot\.setBool\(CommandPaletteContextKeys\.updateHasAvailable,\s*true\)\s*\}',
"Command palette context no longer tracks update-available state",
failures,
)
expect_regex(
content_view,
r'commandId:\s*"palette\.applyUpdateIfAvailable".*?title:\s*constant\("Apply Update \(If Available\)"\).*?keywords:\s*\[[^\]]*"apply"[^\]]*"install"[^\]]*"update"[^\]]*"available"[^\]]*\].*?when:\s*\{\s*\$0\.bool\(CommandPaletteContextKeys\.updateHasAvailable\)\s*\}',
"Missing or incomplete `palette.applyUpdateIfAvailable` contribution visibility gating",
failures,
)
expect_regex(
content_view,
r'commandId:\s*"palette\.attemptUpdate".*?title:\s*constant\("Attempt Update"\).*?keywords:\s*\[[^\]]*"attempt"[^\]]*"check"[^\]]*"update"[^\]]*\]',
"Missing or incomplete `palette.attemptUpdate` contribution",
failures,
)
expect_regex(
content_view,
r'registry\.register\(commandId:\s*"palette\.applyUpdateIfAvailable"\)\s*\{\s*AppDelegate\.shared\?\.applyUpdateIfAvailable\(nil\)\s*\}',
"Missing handler registration for `palette.applyUpdateIfAvailable`",
failures,
)
expect_regex(
content_view,
r'registry\.register\(commandId:\s*"palette\.attemptUpdate"\)\s*\{\s*AppDelegate\.shared\?\.attemptUpdate\(nil\)\s*\}',
"Missing handler registration for `palette.attemptUpdate`",
failures,
)
expect_regex(
app_delegate,
r'@objc\s+func\s+applyUpdateIfAvailable\(_\s+sender:\s+Any\?\)\s*\{\s*updateViewModel\.overrideState\s*=\s*nil\s*updateController\.installUpdate\(\)\s*\}',
"`AppDelegate.applyUpdateIfAvailable` is missing or does not call `updateController.installUpdate()`",
failures,
)
expect_regex(
app_delegate,
r'@objc\s+func\s+attemptUpdate\(_\s+sender:\s+Any\?\)\s*\{\s*updateViewModel\.overrideState\s*=\s*nil\s*updateController\.attemptUpdate\(\)\s*\}',
"`AppDelegate.attemptUpdate` is missing or does not call `updateController.attemptUpdate()`",
failures,
)
expect_regex(
controller,
r'func\s+attemptUpdate\(\)\s*\{',
"`UpdateController.attemptUpdate()` is missing",
failures,
)
if "state.confirm()" not in controller:
failures.append("`UpdateController.attemptUpdate()` no longer auto-confirms update installation")
if "checkForUpdates()" not in controller:
failures.append("`UpdateController.attemptUpdate()` no longer triggers a check before install")
if failures:
print("FAIL: command-palette update command regression(s) detected")
for failure in failures:
print(f"- {failure}")
return 1
print("PASS: command-palette update commands expose apply + attempt wiring")
return 0
if __name__ == "__main__":
raise SystemExit(main())

View file

@ -1,159 +0,0 @@
#!/usr/bin/env python3
"""
Automated test for ctrl+enter keybind using real keystrokes.
Requires:
- cmux running
- Accessibility permissions for System Events (osascript)
- keybind = ctrl+enter=text:\\r (or \\n/\\x0d) configured in Ghostty config
"""
import os
import sys
import time
import subprocess
from pathlib import Path
from typing import Optional
# Add the directory containing cmux.py to the path
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from cmux import cmux, cmuxError
def run_osascript(script: str) -> subprocess.CompletedProcess[str]:
# Use capture_output so we can detect common permission failures and skip.
result = subprocess.run(
["osascript", "-e", script],
capture_output=True,
text=True,
)
if result.returncode != 0:
raise subprocess.CalledProcessError(
result.returncode,
result.args,
output=result.stdout,
stderr=result.stderr,
)
return result
def is_keystroke_permission_error(err: subprocess.CalledProcessError) -> bool:
text = f"{getattr(err, 'stderr', '') or ''}\n{getattr(err, 'output', '') or ''}"
return "not allowed to send keystrokes" in text or "(1002)" in text
def has_ctrl_enter_keybind(config_text: str) -> bool:
for line in config_text.splitlines():
stripped = line.strip()
if not stripped or stripped.startswith("#"):
continue
if "ctrl+enter" in stripped and "text:" in stripped:
if "\\r" in stripped or "\\n" in stripped or "\\x0d" in stripped:
return True
return False
def find_config_with_keybind() -> Optional[Path]:
home = Path.home()
candidates = [
home / "Library/Application Support/com.mitchellh.ghostty/config.ghostty",
home / "Library/Application Support/com.mitchellh.ghostty/config",
home / ".config/ghostty/config.ghostty",
home / ".config/ghostty/config",
]
for path in candidates:
if not path.exists():
continue
try:
if has_ctrl_enter_keybind(path.read_text(encoding="utf-8")):
return path
except OSError:
continue
return None
def test_ctrl_enter_keybind(client: cmux) -> tuple[bool, str]:
marker = Path("/tmp") / f"ghostty_ctrl_enter_{os.getpid()}"
marker.unlink(missing_ok=True)
# Create a fresh tab to avoid interfering with existing sessions
new_tab_id = client.new_tab()
client.select_tab(new_tab_id)
time.sleep(0.3)
try:
# Make sure the app is focused for keystrokes
bundle_id = cmux.default_bundle_id()
run_osascript(f'tell application id "{bundle_id}" to activate')
time.sleep(0.2)
# Clear any running command
try:
client.send_key("ctrl-c")
time.sleep(0.2)
except Exception:
pass
# Type the command (without pressing Enter)
run_osascript(f'tell application "System Events" to keystroke "touch {marker}"')
time.sleep(0.1)
# Send Ctrl+Enter (key code 36 = Return)
run_osascript('tell application "System Events" to key code 36 using control down')
time.sleep(0.5)
ok = marker.exists()
return ok, ("Ctrl+Enter keybind executed command" if ok else "Marker not created by Ctrl+Enter")
finally:
if marker.exists():
marker.unlink(missing_ok=True)
try:
client.close_tab(new_tab_id)
except Exception:
pass
def run_tests() -> int:
print("=" * 60)
print("cmux Ctrl+Enter Keybind Test")
print("=" * 60)
print()
socket_path = cmux.default_socket_path()
if not os.path.exists(socket_path):
print(f"SKIP: Socket not found at {socket_path}")
print("Tip: start cmux first (or set CMUX_TAG / CMUX_SOCKET_PATH).")
return 0
config_path = find_config_with_keybind()
if not config_path:
print("SKIP: Required keybind not found in Ghostty config.")
print("Expected a line like: keybind = ctrl+enter=text:\\r")
return 0
print(f"Using keybind from: {config_path}")
print()
try:
with cmux() as client:
ok, message = test_ctrl_enter_keybind(client)
status = "" if ok else ""
print(f"{status} {message}")
return 0 if ok else 1
except cmuxError as e:
print(f"SKIP: {e}")
return 0
except subprocess.CalledProcessError as e:
if is_keystroke_permission_error(e):
print("SKIP: osascript/System Events not allowed to send keystrokes (Accessibility permission missing)")
return 0
print(f"Error: osascript failed: {e}")
if getattr(e, "stderr", None):
print(e.stderr.strip())
if getattr(e, "output", None):
print(e.output.strip())
return 1
if __name__ == "__main__":
sys.exit(run_tests())

View file

@ -1,64 +0,0 @@
#!/usr/bin/env python3
"""Static regression checks for re-entrant terminal focus guard.
Guards the fix for split-drag focus churn where:
becomeFirstResponder -> onFocus -> Workspace.focusPanel -> refocus side-effects
could repeatedly re-enter and spike CPU.
"""
from __future__ import annotations
import subprocess
from pathlib import Path
def repo_root() -> Path:
result = subprocess.run(
["git", "rev-parse", "--show-toplevel"],
capture_output=True,
text=True,
)
if result.returncode == 0:
return Path(result.stdout.strip())
return Path(__file__).resolve().parents[1]
def main() -> int:
root = repo_root()
failures: list[str] = []
workspace_path = root / "Sources" / "Workspace.swift"
workspace_source = workspace_path.read_text(encoding="utf-8")
required_workspace_snippets = [
"enum FocusPanelTrigger {",
"case terminalFirstResponder",
"trigger: FocusPanelTrigger = .standard",
"let shouldSuppressReentrantRefocus = trigger == .terminalFirstResponder && selectionAlreadyConverged",
"if let targetPaneId, !shouldSuppressReentrantRefocus {",
"reason=firstResponderAlreadyConverged",
]
for snippet in required_workspace_snippets:
if snippet not in workspace_source:
failures.append(f"Workspace focus guard missing snippet: {snippet}")
workspace_content_view_path = root / "Sources" / "WorkspaceContentView.swift"
workspace_content_view_source = workspace_content_view_path.read_text(encoding="utf-8")
focus_callback_snippet = "workspace.focusPanel(panel.id, trigger: .terminalFirstResponder)"
if focus_callback_snippet not in workspace_content_view_source:
failures.append(
"WorkspaceContentView terminal onFocus callback no longer passes .terminalFirstResponder trigger"
)
if failures:
print("FAIL: focus-panel re-entrant guard regression checks failed")
for item in failures:
print(f" - {item}")
return 1
print("PASS: focus-panel re-entrant guard is in place")
return 0
if __name__ == "__main__":
raise SystemExit(main())

View file

@ -1,166 +0,0 @@
#!/usr/bin/env python3
"""Regression guard for issue #494 (post-wake sidebar git updates freezing)."""
from __future__ import annotations
import subprocess
from pathlib import Path
def get_repo_root() -> Path:
result = subprocess.run(
["git", "rev-parse", "--show-toplevel"],
capture_output=True,
text=True,
)
if result.returncode == 0:
return Path(result.stdout.strip())
return Path.cwd()
def read_text(path: Path) -> str:
return path.read_text(encoding="utf-8")
def require(content: str, needle: str, message: str, failures: list[str]) -> None:
if needle not in content:
failures.append(message)
def main() -> int:
repo_root = get_repo_root()
zsh_path = repo_root / "Resources" / "shell-integration" / "cmux-zsh-integration.zsh"
bash_path = repo_root / "Resources" / "shell-integration" / "cmux-bash-integration.bash"
app_delegate_path = repo_root / "Sources" / "AppDelegate.swift"
required_paths = [zsh_path, bash_path, app_delegate_path]
missing_paths = [str(path) for path in required_paths if not path.exists()]
if missing_paths:
print("Missing expected files:")
for path in missing_paths:
print(f" - {path}")
return 1
zsh_content = read_text(zsh_path)
bash_content = read_text(bash_path)
app_delegate = read_text(app_delegate_path)
failures: list[str] = []
require(
zsh_content,
"_CMUX_GIT_JOB_STARTED_AT",
"zsh integration is missing git probe start tracking",
failures,
)
require(
zsh_content,
"_CMUX_PR_JOB_STARTED_AT",
"zsh integration is missing PR probe start tracking",
failures,
)
require(
zsh_content,
"_CMUX_ASYNC_JOB_TIMEOUT",
"zsh integration is missing async probe timeout guard",
failures,
)
require(
zsh_content,
"now - _CMUX_GIT_JOB_STARTED_AT >= _CMUX_ASYNC_JOB_TIMEOUT",
"zsh integration no longer clears stale git probe PID after timeout",
failures,
)
require(
zsh_content,
"now - _CMUX_PR_JOB_STARTED_AT >= _CMUX_ASYNC_JOB_TIMEOUT",
"zsh integration no longer clears stale PR probe PID after timeout",
failures,
)
require(
zsh_content,
"ncat -w 1 -U \"$CMUX_SOCKET_PATH\" --send-only",
"zsh integration missing ncat socket timeout",
failures,
)
require(
zsh_content,
"socat -T 1 - \"UNIX-CONNECT:$CMUX_SOCKET_PATH\"",
"zsh integration missing socat socket timeout",
failures,
)
require(
bash_content,
"_CMUX_GIT_JOB_STARTED_AT",
"bash integration is missing git probe start tracking",
failures,
)
require(
bash_content,
"_CMUX_PR_JOB_STARTED_AT",
"bash integration is missing PR probe start tracking",
failures,
)
require(
bash_content,
"_CMUX_ASYNC_JOB_TIMEOUT",
"bash integration is missing async probe timeout guard",
failures,
)
require(
bash_content,
"now - _CMUX_GIT_JOB_STARTED_AT >= _CMUX_ASYNC_JOB_TIMEOUT",
"bash integration no longer clears stale git probe PID after timeout",
failures,
)
require(
bash_content,
"now - _CMUX_PR_JOB_STARTED_AT >= _CMUX_ASYNC_JOB_TIMEOUT",
"bash integration no longer clears stale PR probe PID after timeout",
failures,
)
require(
bash_content,
"ncat -w 1 -U \"$CMUX_SOCKET_PATH\" --send-only",
"bash integration missing ncat socket timeout",
failures,
)
require(
bash_content,
"socat -T 1 - \"UNIX-CONNECT:$CMUX_SOCKET_PATH\"",
"bash integration missing socat socket timeout",
failures,
)
require(
app_delegate,
"NSWorkspace.didWakeNotification",
"AppDelegate is missing wake observer for socket listener recovery",
failures,
)
require(
app_delegate,
"restartSocketListenerIfEnabled(source: \"workspace.didWake\")",
"Wake observer no longer re-arms the socket listener",
failures,
)
require(
app_delegate,
"private func restartSocketListenerIfEnabled(source: String)",
"Missing shared socket-listener restart helper",
failures,
)
if failures:
print("FAIL: issue #494 regression(s) detected")
for failure in failures:
print(f"- {failure}")
return 1
print("PASS: issue #494 sleep/wake recovery guards are present")
return 0
if __name__ == "__main__":
raise SystemExit(main())

View file

@ -1,126 +0,0 @@
#!/usr/bin/env python3
"""Regression guard for issue #582 (sidebar git branch updates stalling)."""
from __future__ import annotations
import subprocess
from pathlib import Path
def get_repo_root() -> Path:
result = subprocess.run(
["git", "rev-parse", "--show-toplevel"],
capture_output=True,
text=True,
)
if result.returncode == 0:
return Path(result.stdout.strip())
return Path.cwd()
def extract_function(content: str, signature: str) -> str:
start = content.find(signature)
if start < 0:
return ""
brace = content.find("{", start)
if brace < 0:
return ""
depth = 0
for idx in range(brace, len(content)):
ch = content[idx]
if ch == "{":
depth += 1
elif ch == "}":
depth -= 1
if depth == 0:
return content[start : idx + 1]
return ""
def require(content: str, needle: str, message: str, failures: list[str]) -> None:
if needle not in content:
failures.append(message)
def main() -> int:
repo_root = get_repo_root()
terminal_controller_path = repo_root / "Sources" / "TerminalController.swift"
if not terminal_controller_path.exists():
print(f"Missing expected file: {terminal_controller_path}")
return 1
terminal_controller = terminal_controller_path.read_text(encoding="utf-8")
report_body = extract_function(terminal_controller, "private func reportGitBranch(_ args: String) -> String")
clear_body = extract_function(terminal_controller, "private func clearGitBranch(_ args: String) -> String")
failures: list[str] = []
if not report_body:
failures.append("Unable to locate reportGitBranch implementation")
if not clear_body:
failures.append("Unable to locate clearGitBranch implementation")
if report_body:
require(
report_body,
"if let scope = Self.explicitSocketScope(options: parsed.options)",
"reportGitBranch is missing explicit-scope fast path",
failures,
)
require(
report_body,
"DispatchQueue.main.async",
"reportGitBranch no longer schedules explicit-scope updates with main.async",
failures,
)
require(
report_body,
"tab.updatePanelGitBranch(panelId: scope.panelId",
"reportGitBranch fast path no longer writes branch state to the scoped panel",
failures,
)
require(
report_body,
"DispatchQueue.main.sync",
"reportGitBranch lost sync fallback path for non-explicit/manual calls",
failures,
)
if clear_body:
require(
clear_body,
"if let scope = Self.explicitSocketScope(options: parsed.options)",
"clearGitBranch is missing explicit-scope fast path",
failures,
)
require(
clear_body,
"DispatchQueue.main.async",
"clearGitBranch no longer schedules explicit-scope clears with main.async",
failures,
)
require(
clear_body,
"tab.clearPanelGitBranch(panelId: scope.panelId)",
"clearGitBranch fast path no longer clears branch state for the scoped panel",
failures,
)
require(
clear_body,
"DispatchQueue.main.sync",
"clearGitBranch lost sync fallback path for non-explicit/manual calls",
failures,
)
if failures:
print("FAIL: issue #582 regression(s) detected")
for failure in failures:
print(f"- {failure}")
return 1
print("PASS: issue #582 git branch socket fast path guards are present")
return 0
if __name__ == "__main__":
raise SystemExit(main())

View file

@ -0,0 +1,132 @@
#!/usr/bin/env python3
"""
Regression for issue #734:
cmux wrapper .zshenv should only source Ghostty zsh integration when Ghostty
actually enabled shell integration (signaled by GHOSTTY_ZSH_ZDOTDIR being set).
"""
from __future__ import annotations
import os
import shutil
import subprocess
from pathlib import Path
def _run_case(
*,
wrapper_dir: Path,
home: Path,
orig_zdotdir: Path,
ghostty_resources: Path,
out_path: Path,
ghostty_enabled: bool,
) -> tuple[int, str]:
env = dict(os.environ)
env["HOME"] = str(home)
env["ZDOTDIR"] = str(wrapper_dir)
env["GHOSTTY_RESOURCES_DIR"] = str(ghostty_resources)
env["CMUX_SHELL_INTEGRATION"] = "0"
env["CMUX_TEST_OUT"] = str(out_path)
# Keep input deterministic and local to this test.
for key in (
"GHOSTTY_ZSH_ZDOTDIR",
"CMUX_ZSH_ZDOTDIR",
"CMUX_ORIGINAL_ZDOTDIR",
"GHOSTTY_SHELL_FEATURES",
"GHOSTTY_BIN_DIR",
):
env.pop(key, None)
if ghostty_enabled:
env["GHOSTTY_ZSH_ZDOTDIR"] = str(orig_zdotdir)
else:
env["CMUX_ZSH_ZDOTDIR"] = str(orig_zdotdir)
result = subprocess.run(
["zsh", "-d", "-i", "-c", "true"],
env=env,
capture_output=True,
text=True,
timeout=8,
)
return (result.returncode, (result.stdout or "") + (result.stderr or ""))
def main() -> int:
root = Path(__file__).resolve().parents[1]
wrapper_dir = root / "Resources" / "shell-integration"
if not (wrapper_dir / ".zshenv").exists():
print(f"SKIP: missing wrapper .zshenv at {wrapper_dir}")
return 0
base = Path("/tmp") / f"cmux_issue_734_{os.getpid()}"
try:
shutil.rmtree(base, ignore_errors=True)
base.mkdir(parents=True, exist_ok=True)
home = base / "home"
orig = base / "orig-zdotdir"
resources = base / "ghostty-resources"
home.mkdir(parents=True, exist_ok=True)
orig.mkdir(parents=True, exist_ok=True)
(resources / "shell-integration" / "zsh").mkdir(parents=True, exist_ok=True)
# Keep user startup files inert and local.
for filename in (".zshenv", ".zprofile", ".zshrc"):
(orig / filename).write_text("", encoding="utf-8")
marker = base / "ghostty-sourced.txt"
(resources / "shell-integration" / "zsh" / "ghostty-integration").write_text(
'echo "sourced" >> "$CMUX_TEST_OUT"\n',
encoding="utf-8",
)
rc, out = _run_case(
wrapper_dir=wrapper_dir,
home=home,
orig_zdotdir=orig,
ghostty_resources=resources,
out_path=marker,
ghostty_enabled=False,
)
if rc != 0:
print(f"FAIL: zsh exited non-zero when ghostty_enabled=False rc={rc}")
if out.strip():
print(out.strip())
return 1
if marker.exists():
print("FAIL: ghostty integration sourced when Ghostty shell integration was disabled")
return 1
rc, out = _run_case(
wrapper_dir=wrapper_dir,
home=home,
orig_zdotdir=orig,
ghostty_resources=resources,
out_path=marker,
ghostty_enabled=True,
)
if rc != 0:
print(f"FAIL: zsh exited non-zero when ghostty_enabled=True rc={rc}")
if out.strip():
print(out.strip())
return 1
if not marker.exists():
print("FAIL: ghostty integration not sourced when Ghostty shell integration was enabled")
return 1
contents = marker.read_text(encoding="utf-8")
if "sourced" not in contents:
print("FAIL: expected marker output missing after enabled run")
return 1
print("PASS: wrapper respects Ghostty shell-integration=none via GHOSTTY_ZSH_ZDOTDIR gate")
return 0
finally:
shutil.rmtree(base, ignore_errors=True)
if __name__ == "__main__":
raise SystemExit(main())

View file

@ -1,190 +0,0 @@
#!/usr/bin/env python3
"""
Lint test to catch SwiftUI patterns that cause performance issues.
This test checks for:
1. Text(_:style:) with auto-updating date styles (.time, .timer, .relative)
These cause continuous view updates and can lead to high CPU usage.
"""
from __future__ import annotations
import re
import subprocess
import sys
from pathlib import Path
from typing import List, Tuple
def get_repo_root():
"""Get the repository root directory."""
# Try git first
result = subprocess.run(
["git", "rev-parse", "--show-toplevel"],
capture_output=True,
text=True,
)
if result.returncode == 0:
return Path(result.stdout.strip())
# Fall back to finding GhosttyTabs directory
cwd = Path.cwd()
if cwd.name == "GhosttyTabs" or (cwd / "Sources").exists():
return cwd
if (cwd.parent / "GhosttyTabs").exists():
return cwd.parent / "GhosttyTabs"
# Last resort: use current directory
return cwd
def find_swift_files(repo_root: Path) -> List[Path]:
"""Find all Swift files in Sources directory (excluding vendored code)."""
sources_dir = repo_root / "Sources"
if not sources_dir.exists():
return []
return list(sources_dir.rglob("*.swift"))
def check_autoupdating_text_styles(files: List[Path]) -> List[Tuple[Path, int, str]]:
"""
Check for Text(_:style:) with auto-updating date styles.
These patterns cause continuous SwiftUI view updates:
- Text(date, style: .time) - updates every second/minute
- Text(date, style: .timer) - updates continuously
- Text(date, style: .relative) - updates periodically
- Text(date, style: .offset) - updates periodically
Instead, use static formatting:
- Text(date.formatted(date: .omitted, time: .shortened))
"""
violations = []
# Patterns that indicate auto-updating Text with Date
# The key patterns are: Text(something, style: .time/timer/relative/offset)
problematic_patterns = [
"style: .time",
"style: .timer",
"style: .relative",
"style: .offset",
"style:.time",
"style:.timer",
"style:.relative",
"style:.offset",
]
for file_path in files:
try:
content = file_path.read_text()
lines = content.split('\n')
for line_num, line in enumerate(lines, start=1):
# Skip comments
stripped = line.strip()
if stripped.startswith("//"):
continue
for pattern in problematic_patterns:
if pattern in line:
violations.append((file_path, line_num, line.strip()))
break
except Exception as e:
print(f"Warning: Could not read {file_path}: {e}", file=sys.stderr)
return violations
def check_command_palette_caret_tint(repo_root: Path) -> List[str]:
"""Ensure command palette text inputs keep a white caret tint."""
content_view = repo_root / "Sources" / "ContentView.swift"
if not content_view.exists():
return [f"Missing expected file: {content_view}"]
try:
content = content_view.read_text()
except Exception as e:
return [f"Could not read {content_view}: {e}"]
checks = [
(
"search input",
r"TextField\(commandPaletteSearchPlaceholder, text: \$commandPaletteQuery\)(?P<body>.*?)"
r"\.focused\(\$isCommandPaletteSearchFocused\)",
),
(
"rename input",
r"TextField\(target\.placeholder, text: \$commandPaletteRenameDraft\)(?P<body>.*?)"
r"\.focused\(\$isCommandPaletteRenameFocused\)",
),
]
violations: List[str] = []
for label, pattern in checks:
match = re.search(pattern, content, flags=re.DOTALL)
if not match:
violations.append(
f"Could not locate command palette {label} TextField block in Sources/ContentView.swift"
)
continue
body = match.group("body")
if ".tint(.white)" not in body:
violations.append(
f"Command palette {label} TextField must use `.tint(.white)` in Sources/ContentView.swift"
)
return violations
def main():
"""Run the lint checks."""
repo_root = get_repo_root()
swift_files = find_swift_files(repo_root)
print(f"Checking {len(swift_files)} Swift files for performance issues...")
# Check for auto-updating Text styles
style_violations = check_autoupdating_text_styles(swift_files)
tint_violations = check_command_palette_caret_tint(repo_root)
has_failures = False
if style_violations:
has_failures = True
print("\n❌ LINT FAILURES: Auto-updating Text styles found")
print("=" * 60)
print("These patterns cause continuous SwiftUI view updates and high CPU usage:")
print()
for file_path, line_num, line in style_violations:
rel_path = file_path.relative_to(repo_root)
print(f" {rel_path}:{line_num}")
print(f" {line}")
print()
print("FIX: Replace with static formatting:")
print(" Instead of: Text(date, style: .time)")
print(" Use: Text(date.formatted(date: .omitted, time: .shortened))")
print()
if tint_violations:
has_failures = True
print("\n❌ LINT FAILURES: Command palette caret tint drifted")
print("=" * 60)
print("The command palette search and rename text fields must keep a white caret:")
print()
for message in tint_violations:
print(f" {message}")
print()
print("FIX: Set command palette TextField tint modifiers to `.white`.")
print()
if has_failures:
return 1
print("✅ No linted SwiftUI pattern regressions found")
return 0
if __name__ == "__main__":
sys.exit(main())

View file

@ -1,64 +0,0 @@
#!/usr/bin/env python3
"""Regression test: cmux advertises and allows microphone access."""
from __future__ import annotations
import plistlib
import subprocess
from pathlib import Path
def get_repo_root() -> Path:
result = subprocess.run(
["git", "rev-parse", "--show-toplevel"],
capture_output=True,
text=True,
check=False,
)
if result.returncode == 0:
return Path(result.stdout.strip())
return Path.cwd()
def load_plist(path: Path, failures: list[str]) -> dict:
if not path.exists():
failures.append(f"Missing expected file: {path}")
return {}
with path.open("rb") as f:
return plistlib.load(f)
def main() -> int:
repo_root = get_repo_root()
failures: list[str] = []
info = load_plist(repo_root / "Resources" / "Info.plist", failures)
entitlements = load_plist(repo_root / "cmux.entitlements", failures)
mic_usage = info.get("NSMicrophoneUsageDescription")
if not isinstance(mic_usage, str) or not mic_usage.strip():
failures.append(
"Resources/Info.plist must define a non-empty NSMicrophoneUsageDescription"
)
elif mic_usage.strip() != "A program running within cmux would like to use your microphone.":
failures.append(
"Resources/Info.plist NSMicrophoneUsageDescription should match the Ghostty-style wording"
)
if entitlements.get("com.apple.security.device.audio-input") is not True:
failures.append(
"cmux.entitlements must set com.apple.security.device.audio-input to true"
)
if failures:
print("FAIL: microphone access metadata regression(s) detected")
for failure in failures:
print(f"- {failure}")
return 1
print("PASS: microphone usage description and entitlement are present")
return 0
if __name__ == "__main__":
raise SystemExit(main())

View file

@ -0,0 +1,99 @@
#!/usr/bin/env bash
# Regression test for dual nightly macOS tracks.
set -euo pipefail
ROOT_DIR="$(cd "$(dirname "$0")/.." && pwd)"
WORKFLOW_FILE="$ROOT_DIR/.github/workflows/nightly.yml"
if ! awk '
/^ - name: Build Apple Silicon app \(Release\)/ { in_arm=1; next }
/^ - name: Build universal app \(Release\)/ { in_universal=1; next }
in_arm && /^ - name:/ { in_arm=0 }
in_universal && /^ - name:/ { in_universal=0 }
in_arm && /-destination '\''platform=macOS,arch=arm64'\''/ { saw_arm_destination=1 }
in_arm && /ARCHS="arm64"/ { saw_arm_archs=1 }
in_arm && /ONLY_ACTIVE_ARCH=YES/ { saw_arm_only_active_arch=1 }
in_universal && /-destination '\''generic\/platform=macOS'\''/ { saw_universal_destination=1 }
in_universal && /ARCHS="arm64 x86_64"/ { saw_universal_archs=1 }
in_universal && /ONLY_ACTIVE_ARCH=NO/ { saw_universal_only_active_arch=1 }
END {
exit !(saw_arm_destination && saw_arm_archs && saw_arm_only_active_arch && saw_universal_destination && saw_universal_archs && saw_universal_only_active_arch)
}
' "$WORKFLOW_FILE"; then
echo "FAIL: nightly workflow must force Apple Silicon nightly to arm64-only and universal nightly to both slices"
exit 1
fi
if ! awk '
/^ - name: Verify nightly binary architectures/ { in_verify=1; next }
in_verify && /^ - name:/ { in_verify=0 }
in_verify && /lipo -archs "\$ARM_APP_BINARY"/ { saw_arm_app=1 }
in_verify && /lipo -archs "\$ARM_CLI_BINARY"/ { saw_arm_cli=1 }
in_verify && /lipo -archs "\$APP_BINARY"/ { saw_app=1 }
in_verify && /lipo -archs "\$CLI_BINARY"/ { saw_cli=1 }
in_verify && /\[\[ "\$ARM_APP_ARCHS" == "arm64" \]\]/ { saw_arm_app_assert=1 }
in_verify && /\[\[ "\$ARM_CLI_ARCHS" == "arm64" \]\]/ { saw_arm_cli_assert=1 }
END { exit !(saw_arm_app && saw_arm_cli && saw_app && saw_cli && saw_arm_app_assert && saw_arm_cli_assert) }
' "$WORKFLOW_FILE"; then
echo "FAIL: nightly workflow must verify arm-only and universal slices with lipo"
exit 1
fi
if ! grep -Fq 'com.cmuxterm.app.nightly.universal' "$WORKFLOW_FILE"; then
echo "FAIL: nightly workflow must set a distinct .universal bundle ID"
exit 1
fi
if ! grep -Fq 'https://github.com/manaflow-ai/cmux/releases/download/nightly/appcast-universal.xml' "$WORKFLOW_FILE"; then
echo "FAIL: nightly workflow must publish a separate universal appcast feed"
exit 1
fi
if ! grep -Fq './scripts/sparkle_generate_appcast.sh "$NIGHTLY_UNIVERSAL_DMG_IMMUTABLE" nightly appcast-universal.xml' "$WORKFLOW_FILE"; then
echo "FAIL: nightly workflow must generate a separate universal appcast"
exit 1
fi
if ! grep -Fq "core.setOutput('should_publish', isMainRef ? 'true' : 'false');" "$WORKFLOW_FILE"; then
echo "FAIL: nightly decide step must expose should_publish based on whether the ref is main"
exit 1
fi
if ! awk '
/^ - name: Upload branch nightly artifacts/ { in_upload=1; next }
in_upload && /^ - name:/ { in_upload=0 }
in_upload && /if: needs\.decide\.outputs\.should_publish != '\''true'\''/ { saw_if=1 }
in_upload && /uses: actions\/upload-artifact@ea165f8d65b6e75b540449e92b4886f43607fa02 # v4/ { saw_upload=1 }
in_upload && /cmux-nightly-macos\*\.dmg/ { saw_arm_artifacts=1 }
in_upload && /cmux-nightly-universal-macos\*\.dmg/ { saw_universal_artifacts=1 }
in_upload && /appcast-universal\.xml/ { saw_universal_appcast=1 }
END { exit !(saw_if && saw_upload && saw_arm_artifacts && saw_universal_artifacts && saw_universal_appcast) }
' "$WORKFLOW_FILE"; then
echo "FAIL: non-main nightly runs must upload both nightly variants and both appcasts"
exit 1
fi
if ! awk '
/^ - name: Move nightly tag to built commit/ { in_move=1; next }
in_move && /^ - name:/ { in_move=0 }
in_move && /if: needs\.decide\.outputs\.should_publish == '\''true'\''/ { saw_move_if=1 }
END { exit !saw_move_if }
' "$WORKFLOW_FILE"; then
echo "FAIL: moving the nightly tag must be gated to main nightly publishes"
exit 1
fi
if ! awk '
/^ - name: Publish nightly release assets/ { in_publish=1; next }
in_publish && /^ - name:/ { in_publish=0 }
in_publish && /if: needs\.decide\.outputs\.should_publish == '\''true'\''/ { saw_publish_if=1 }
in_publish && /cmux-nightly-universal-macos-\$\{\{ github\.run_id \}\}\*\.dmg/ { saw_universal_immutable=1 }
in_publish && /cmux-nightly-universal-macos\.dmg/ { saw_universal_stable=1 }
in_publish && /appcast-universal\.xml/ { saw_universal_appcast=1 }
END { exit !(saw_publish_if && saw_universal_immutable && saw_universal_stable && saw_universal_appcast) }
' "$WORKFLOW_FILE"; then
echo "FAIL: main nightly publish must include the universal assets and appcast"
exit 1
fi
echo "PASS: nightly workflow keeps separate Apple Silicon and universal nightly tracks"

View file

@ -33,7 +33,10 @@ def run_wrapper(
intercept_setting: str | None,
legacy_open_setting: str | None = None,
whitelist: str | None,
external_patterns: str | None = None,
fail_urls: list[str] | None = None,
local_files: list[str] | None = None,
python_bin: str | None = None,
) -> tuple[list[str], list[str], int, str]:
with tempfile.TemporaryDirectory(prefix="cmux-open-wrapper-test-") as td:
tmp = Path(td)
@ -85,6 +88,13 @@ case "$key" in
fi
exit 1
;;
browserExternalOpenPatterns)
if [[ "${FAKE_DEFAULTS_EXTERNAL_PATTERNS+x}" == "x" ]]; then
printf '%s' "$FAKE_DEFAULTS_EXTERNAL_PATTERNS"
exit 0
fi
exit 1
;;
*)
exit 1
;;
@ -113,6 +123,12 @@ exit 0
""",
)
if local_files:
for relative_path in local_files:
target = tmp / relative_path
target.parent.mkdir(parents=True, exist_ok=True)
target.write_text("<!doctype html><title>fixture</title>", encoding="utf-8")
env = os.environ.copy()
env["CMUX_SOCKET_PATH"] = "/tmp/cmux-open-wrapper-test.sock"
env["CMUX_BUNDLE_ID"] = "com.cmuxterm.app.debug.test"
@ -120,6 +136,10 @@ exit 0
env["CMUX_OPEN_WRAPPER_DEFAULTS"] = str(defaults)
env["FAKE_OPEN_LOG"] = str(open_log)
env["FAKE_CMUX_LOG"] = str(cmux_log)
if python_bin is None:
env.pop("CMUX_OPEN_WRAPPER_PYTHON3", None)
else:
env["CMUX_OPEN_WRAPPER_PYTHON3"] = python_bin
if intercept_setting is None:
env.pop("FAKE_DEFAULTS_INTERCEPT_OPEN", None)
@ -136,6 +156,11 @@ exit 0
else:
env["FAKE_DEFAULTS_WHITELIST"] = whitelist
if external_patterns is None:
env.pop("FAKE_DEFAULTS_EXTERNAL_PATTERNS", None)
else:
env["FAKE_DEFAULTS_EXTERNAL_PATTERNS"] = external_patterns
if fail_urls:
env["FAKE_CMUX_FAIL_URLS"] = ",".join(fail_urls)
else:
@ -143,6 +168,7 @@ exit 0
result = subprocess.run(
["/bin/bash", str(wrapper), *args],
cwd=tmp,
env=env,
capture_output=True,
text=True,
@ -213,6 +239,95 @@ def test_whitelist_match_routes_to_cmux(failures: list[str]) -> None:
expect(cmux_log == [f"browser open {url}"], f"whitelist match: unexpected cmux log {cmux_log}", failures)
def test_external_literal_pattern_is_deferred_to_app(failures: list[str]) -> None:
url = "https://platform.openai.com/account/usage"
open_log, cmux_log, code, stderr = run_wrapper(
args=[url],
intercept_setting="1",
whitelist="",
external_patterns="platform.openai.com/account/usage",
)
expect(code == 0, f"external literal deferred: wrapper exited {code}: {stderr}", failures)
expect(
cmux_log == [f"browser open {url}"],
f"external literal deferred: expected wrapper to pass URL to cmux, got {cmux_log}",
failures,
)
expect(
open_log == [],
f"external literal deferred: system open should not be called by wrapper, got {open_log}",
failures,
)
def test_external_regex_pattern_is_deferred_to_app(failures: list[str]) -> None:
url = "https://foo.example.com/billing"
open_log, cmux_log, code, stderr = run_wrapper(
args=[url],
intercept_setting="1",
whitelist="*.example.com",
external_patterns=r"re:^https?://[^/]*\.example\.com/(billing|usage)",
)
expect(code == 0, f"external regex deferred: wrapper exited {code}: {stderr}", failures)
expect(
cmux_log == [f"browser open {url}"],
f"external regex deferred: expected wrapper to pass URL to cmux, got {cmux_log}",
failures,
)
expect(
open_log == [],
f"external regex deferred: system open should not be called by wrapper, got {open_log}",
failures,
)
def test_external_regex_with_icu_features_is_deferred_to_app(failures: list[str]) -> None:
url = "https://example.com/usage/42"
open_log, cmux_log, code, stderr = run_wrapper(
args=[url],
intercept_setting="1",
whitelist="example.com",
external_patterns=r"re:^https://example\.com/usage/\d+$",
)
expect(code == 0, f"external regex icu deferred: wrapper exited {code}: {stderr}", failures)
expect(
cmux_log == [f"browser open {url}"],
f"external regex icu deferred: expected wrapper to pass URL to cmux, got {cmux_log}",
failures,
)
expect(
open_log == [],
f"external regex icu deferred: system open should not be called by wrapper, got {open_log}",
failures,
)
def test_external_invalid_regex_is_ignored_silently(failures: list[str]) -> None:
url = "https://example.com/path"
open_log, cmux_log, code, stderr = run_wrapper(
args=[url],
intercept_setting="1",
whitelist="",
external_patterns=r"re:[unclosed",
)
expect(code == 0, f"external invalid regex: wrapper exited {code}: {stderr}", failures)
expect(
cmux_log == [f"browser open {url}"],
f"external invalid regex: expected cmux open for {url}, got {cmux_log}",
failures,
)
expect(
open_log == [],
f"external invalid regex: expected no system open calls, got {open_log}",
failures,
)
expect(
"invalid regular expression" not in stderr.lower(),
f"external invalid regex: stderr should stay clean, got {stderr!r}",
failures,
)
def test_partial_failures_only_fallback_failed_urls(failures: list[str]) -> None:
good = "https://api.example.com"
failed = "https://fail.example.com"
@ -282,6 +397,149 @@ def test_uppercase_scheme_routes_to_cmux(failures: list[str]) -> None:
expect(cmux_log == [f"browser open {url}"], f"uppercase scheme: unexpected cmux log {cmux_log}", failures)
def test_local_html_file_routes_to_cmux(failures: list[str]) -> None:
filename = "fixtures/hello page.HTML"
open_log, cmux_log, code, stderr = run_wrapper(
args=[filename],
intercept_setting="1",
whitelist="",
local_files=[filename],
)
expect(code == 0, f"local html file: wrapper exited {code}: {stderr}", failures)
expect(open_log == [], f"local html file: system open should not be called, got {open_log}", failures)
expect(len(cmux_log) == 1, f"local html file: expected exactly one cmux call, got {cmux_log}", failures)
if cmux_log:
expect(
cmux_log[0].startswith("browser open file://"),
f"local html file: expected file:// target, got {cmux_log[0]}",
failures,
)
expect(
"hello%20page.HTML" in cmux_log[0],
f"local html file: expected URL-encoded filename in cmux target, got {cmux_log[0]}",
failures,
)
def test_file_url_html_routes_to_cmux(failures: list[str]) -> None:
url = "file:///tmp/cmux-open-wrapper-fixture.html"
open_log, cmux_log, code, stderr = run_wrapper(
args=[url],
intercept_setting="1",
whitelist="",
)
expect(code == 0, f"file url html: wrapper exited {code}: {stderr}", failures)
expect(open_log == [], f"file url html: system open should not be called, got {open_log}", failures)
expect(cmux_log == [f"browser open {url}"], f"file url html: unexpected cmux log {cmux_log}", failures)
def test_file_url_html_routes_to_cmux_without_python_binary(failures: list[str]) -> None:
url = "file:///tmp/cmux-open-wrapper-fixture.html"
open_log, cmux_log, code, stderr = run_wrapper(
args=[url],
intercept_setting="1",
whitelist="",
python_bin="/definitely/missing/python3",
)
expect(code == 0, f"file url html no-python fallback: wrapper exited {code}: {stderr}", failures)
expect(
open_log == [],
f"file url html no-python fallback: system open should not be called, got {open_log}",
failures,
)
expect(
cmux_log == [f"browser open {url}"],
f"file url html no-python fallback: unexpected cmux log {cmux_log}",
failures,
)
def test_local_html_file_routes_to_cmux_without_python_binary(failures: list[str]) -> None:
filename = "fixtures/no python fallback.html"
open_log, cmux_log, code, stderr = run_wrapper(
args=[filename],
intercept_setting="1",
whitelist="",
local_files=[filename],
python_bin="/definitely/missing/python3",
)
expect(code == 0, f"local html no-python fallback: wrapper exited {code}: {stderr}", failures)
expect(open_log == [], f"local html no-python fallback: system open should not be called, got {open_log}", failures)
expect(
len(cmux_log) == 1,
f"local html no-python fallback: expected exactly one cmux call, got {cmux_log}",
failures,
)
if cmux_log:
expect(
cmux_log[0].startswith("browser open file://"),
f"local html no-python fallback: expected file:// target, got {cmux_log[0]}",
failures,
)
expect(
"no%20python%20fallback.html" in cmux_log[0],
f"local html no-python fallback: expected URL-encoded filename, got {cmux_log[0]}",
failures,
)
def test_domain_like_html_argument_passthrough(failures: list[str]) -> None:
arg = "example.com/report.html"
open_log, cmux_log, code, stderr = run_wrapper(
args=[arg],
intercept_setting="1",
whitelist="",
)
expect(code == 0, f"domain-like html argument: wrapper exited {code}: {stderr}", failures)
expect(
cmux_log == [],
f"domain-like html argument: cmux should not be called, got {cmux_log}",
failures,
)
expect(
open_log == [arg],
f"domain-like html argument: expected system open [{arg}], got {open_log}",
failures,
)
def test_non_file_scheme_html_passthrough(failures: list[str]) -> None:
url = "ftp://example.com/report.html"
open_log, cmux_log, code, stderr = run_wrapper(
args=[url],
intercept_setting="1",
whitelist="",
)
expect(code == 0, f"non-file scheme html: wrapper exited {code}: {stderr}", failures)
expect(cmux_log == [], f"non-file scheme html: cmux should not be called, got {cmux_log}", failures)
expect(open_log == [url], f"non-file scheme html: expected system open [{url}], got {open_log}", failures)
def test_mailto_html_passthrough(failures: list[str]) -> None:
url = "mailto:help@example.com?subject=report.html"
open_log, cmux_log, code, stderr = run_wrapper(
args=[url],
intercept_setting="1",
whitelist="",
)
expect(code == 0, f"mailto html: wrapper exited {code}: {stderr}", failures)
expect(cmux_log == [], f"mailto html: cmux should not be called, got {cmux_log}", failures)
expect(open_log == [url], f"mailto html: expected system open [{url}], got {open_log}", failures)
def test_local_non_html_file_passthrough(failures: list[str]) -> None:
filename = "fixtures/readme.md"
open_log, cmux_log, code, stderr = run_wrapper(
args=[filename],
intercept_setting="1",
whitelist="",
local_files=[filename],
)
expect(code == 0, f"local non-html file: wrapper exited {code}: {stderr}", failures)
expect(cmux_log == [], f"local non-html file: cmux should not be called, got {cmux_log}", failures)
expect(open_log == [filename], f"local non-html file: expected system open [{filename}], got {open_log}", failures)
def test_unicode_whitelist_matches_punycode_url(failures: list[str]) -> None:
url = "https://xn--bcher-kva.example/path"
open_log, cmux_log, code, stderr = run_wrapper(
@ -312,10 +570,22 @@ def main() -> int:
test_toggle_disabled_case_insensitive_passthrough(failures)
test_whitelist_miss_passthrough(failures)
test_whitelist_match_routes_to_cmux(failures)
test_external_literal_pattern_is_deferred_to_app(failures)
test_external_regex_pattern_is_deferred_to_app(failures)
test_external_regex_with_icu_features_is_deferred_to_app(failures)
test_external_invalid_regex_is_ignored_silently(failures)
test_partial_failures_only_fallback_failed_urls(failures)
test_legacy_toggle_fallback_passthrough(failures)
test_legacy_toggle_fallback_case_insensitive_passthrough(failures)
test_uppercase_scheme_routes_to_cmux(failures)
test_local_html_file_routes_to_cmux(failures)
test_file_url_html_routes_to_cmux(failures)
test_file_url_html_routes_to_cmux_without_python_binary(failures)
test_local_html_file_routes_to_cmux_without_python_binary(failures)
test_domain_like_html_argument_passthrough(failures)
test_non_file_scheme_html_passthrough(failures)
test_mailto_html_passthrough(failures)
test_local_non_html_file_passthrough(failures)
test_unicode_whitelist_matches_punycode_url(failures)
test_punycode_whitelist_matches_unicode_url(failures)

View file

@ -72,6 +72,7 @@ def _wait_for_git_branch(
expected: str,
timeout: float = 12.0,
interval: float = 0.15,
allow_force_fallback: bool = True,
) -> dict[str, str]:
def pred():
state = _parse_sidebar_state(client.sidebar_state())
@ -82,6 +83,8 @@ def _wait_for_git_branch(
try:
return _wait_for(pred, timeout=timeout, interval=interval, label=f"git_branch={expected!r}")
except AssertionError as original_error:
if not allow_force_fallback:
raise original_error
# VM shells can occasionally skip a prompt hook; force a one-shot report so
# the remainder of the flow can still validate transition behavior.
try:
@ -180,6 +183,18 @@ def main() -> int:
_send_cd_and_wait(client, repo)
_wait_for_git_branch(client, "main")
# Branch changes during a long-running foreground command should still
# propagate before the prompt returns (agent-style workflows).
client.send("bash -lc 'git checkout -b feature/agent-live >/dev/null 2>&1; sleep 6'\n")
_wait_for_git_branch(
client,
"feature/agent-live",
timeout=3.5,
interval=0.1,
allow_force_fallback=False,
)
time.sleep(6.3)
# Branch change should update.
# Cover alias/non-`git ...` command paths too (regression: branch could
# stick for ~3s when switching via alias/tools like `gh pr checkout`).

View file

@ -1,46 +0,0 @@
#!/usr/bin/env python3
"""
Regression test for the default sidebar active workspace indicator style.
"""
from __future__ import annotations
import re
import subprocess
import sys
from pathlib import Path
def get_repo_root() -> Path:
result = subprocess.run(
["git", "rev-parse", "--show-toplevel"],
capture_output=True,
text=True,
)
if result.returncode == 0:
return Path(result.stdout.strip())
return Path.cwd()
def main() -> int:
repo_root = get_repo_root()
tab_manager = repo_root / "Sources" / "TabManager.swift"
if not tab_manager.exists():
print(f"FAIL: Missing file {tab_manager}")
return 1
content = tab_manager.read_text(encoding="utf-8")
pattern = r"static let defaultStyle:\s*SidebarActiveTabIndicatorStyle\s*=\s*\.leftRail\b"
if re.search(pattern, content) is None:
rel = tab_manager.relative_to(repo_root)
print(f"FAIL: Expected default style `.leftRail` in {rel}")
return 1
print("PASS: sidebar indicator default style is left rail")
return 0
if __name__ == "__main__":
sys.exit(main())

View file

@ -0,0 +1,213 @@
#!/usr/bin/env python3
"""
End-to-end test for split CWD inheritance.
Verifies that new split panes and new workspace tabs inherit the current
working directory from the source terminal.
Requires:
- cmux running with allowAll socket mode
- bash shell integration sourced (cmux-bash-integration.bash)
Run with a tagged instance:
CMUX_TAG=<tag> python3 tests/test_split_cwd_inheritance.py
"""
from __future__ import annotations
import os
import sys
import time
from pathlib import Path
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from cmux import cmux # noqa: E402
def _parse_sidebar_state(text: str) -> dict[str, str]:
data: dict[str, str] = {}
for raw in (text or "").splitlines():
line = raw.rstrip("\n")
if not line or line.startswith(" "):
continue
if "=" not in line:
continue
k, v = line.split("=", 1)
data[k.strip()] = v.strip()
return data
def _wait_for(predicate, timeout: float, interval: float, label: str):
start = time.time()
last_error: Exception | None = None
while time.time() - start < timeout:
try:
value = predicate()
if value:
return value
except Exception as e:
last_error = e
time.sleep(interval)
extra = ""
if last_error is not None:
extra = f" Last error: {last_error}"
raise AssertionError(f"Timed out waiting for {label}.{extra}")
def _wait_for_focused_cwd(
client: cmux,
expected: str,
timeout: float = 12.0,
exclude_panel: str | None = None,
) -> dict[str, str]:
"""Wait for focused_cwd to match expected.
If exclude_panel is given, also require that focused_panel differs from
that value ensuring we're checking the *new* pane, not the original.
"""
def pred():
state = _parse_sidebar_state(client.sidebar_state())
cwd = state.get("focused_cwd", "")
if cwd != expected:
return None
if exclude_panel and state.get("focused_panel", "") == exclude_panel:
return None
return state
label = f"focused_cwd={expected!r}"
if exclude_panel:
label += f" (panel != {exclude_panel})"
return _wait_for(pred, timeout=timeout, interval=0.3, label=label)
def _send_cd_and_wait(
client: cmux,
target: str,
timeout: float = 12.0,
) -> dict[str, str]:
"""cd to target and wait for sidebar focused_cwd to reflect it."""
client.send(f"cd {target}\n")
return _wait_for_focused_cwd(client, target, timeout=timeout)
def main() -> int:
tag = os.environ.get("CMUX_TAG", "")
socket_path = None
if tag:
socket_path = f"/tmp/cmux-debug-{tag}.sock"
client = cmux(socket_path=socket_path)
client.connect()
# Use resolved paths to avoid /tmp -> /private/tmp symlink mismatch on macOS
test_dir_a = str(Path("/tmp/cmux_split_cwd_test_a").resolve())
test_dir_b = str(Path("/tmp/cmux_split_cwd_test_b").resolve())
os.makedirs(test_dir_a, exist_ok=True)
os.makedirs(test_dir_b, exist_ok=True)
passed = 0
failed = 0
def check(name: str, condition: bool, detail: str = ""):
nonlocal passed, failed
if condition:
print(f" PASS {name}")
passed += 1
else:
print(f" FAIL {name}{': ' + detail if detail else ''}")
failed += 1
print("=== Split CWD Inheritance Tests ===")
# --- Setup: cd to test_dir_a in workspace 1 ---
print(" [setup] cd to test_dir_a and wait for shell integration...")
_send_cd_and_wait(client, test_dir_a)
state = _parse_sidebar_state(client.sidebar_state())
check("setup: focused_cwd is test_dir_a", state.get("focused_cwd") == test_dir_a,
f"got {state.get('focused_cwd')!r}")
# --- Test 1: New split inherits test_dir_a ---
print(" [test1] creating right split from test_dir_a...")
# Record the original panel so we can verify focus moves to the NEW pane.
original_panel = state.get("focused_panel", "")
split_result = client.new_split("right")
if not split_result:
check("split created", False)
print(f"\n{passed} passed, {failed} failed")
client.close()
return 1
check("split created", True)
# Wait for the NEW pane (different panel ID) to report test_dir_a.
time.sleep(4) # wait for new bash to start + run PROMPT_COMMAND
try:
state = _wait_for_focused_cwd(
client, test_dir_a, timeout=15.0, exclude_panel=original_panel,
)
new_panel = state.get("focused_panel", "")
check("test1: focus moved to new pane", new_panel != original_panel,
f"original={original_panel!r}, current={new_panel!r}")
check("test1: split inherited test_dir_a",
state.get("focused_cwd") == test_dir_a,
f"focused_cwd={state.get('focused_cwd')!r}")
except AssertionError:
state = _parse_sidebar_state(client.sidebar_state())
check("test1: split inherited test_dir_a", False,
f"focused_cwd={state.get('focused_cwd')!r}, focused_panel={state.get('focused_panel')!r}")
# --- Test 2: New workspace tab inherits CWD ---
# First cd to test_dir_b so we have a different dir to inherit
print(" [test2] cd to test_dir_b, then creating new workspace tab...")
_send_cd_and_wait(client, test_dir_b)
state = _parse_sidebar_state(client.sidebar_state())
original_tab = state.get("tab", "")
tab_result = client.new_tab()
if not tab_result:
check("new tab created", False)
print(f"\n{passed} passed, {failed} failed")
client.close()
return 1
check("new tab created", True)
# New workspace should be a different tab AND inherit test_dir_b
time.sleep(4)
try:
def _new_tab_with_cwd():
s = _parse_sidebar_state(client.sidebar_state())
tab_id = s.get("tab", "")
cwd = s.get("focused_cwd", "")
if tab_id != original_tab and cwd == test_dir_b:
return s
return None
state = _wait_for(
_new_tab_with_cwd, timeout=15.0, interval=0.3,
label=f"new tab with focused_cwd={test_dir_b!r}",
)
check("test2: focus moved to new tab", state.get("tab") != original_tab,
f"original={original_tab!r}, current={state.get('tab')!r}")
check("test2: new workspace inherited test_dir_b",
state.get("focused_cwd") == test_dir_b,
f"focused_cwd={state.get('focused_cwd')!r}")
except AssertionError:
state = _parse_sidebar_state(client.sidebar_state())
check("test2: new workspace inherited test_dir_b", False,
f"focused_cwd={state.get('focused_cwd')!r}, tab={state.get('tab')!r}")
print(f"\n{passed} passed, {failed} failed")
client.close()
# Cleanup
for d in [test_dir_a, test_dir_b]:
try:
os.rmdir(d)
except OSError:
pass
return 1 if failed else 0
if __name__ == "__main__":
sys.exit(main())

View file

@ -1,113 +0,0 @@
#!/usr/bin/env python3
"""Static regression checks for terminal tiny-pane resize/overflow fixes.
Guards the key invariants for issue #348:
1) Terminal portal sync must stabilize layout and clamp hosted frames to host bounds.
2) Surface sizing must prefer live bounds over stale pending values when available.
"""
from __future__ import annotations
import subprocess
from pathlib import Path
def repo_root() -> Path:
result = subprocess.run(
["git", "rev-parse", "--show-toplevel"],
capture_output=True,
text=True,
)
if result.returncode == 0:
return Path(result.stdout.strip())
return Path(__file__).resolve().parents[1]
def extract_block(source: str, signature: str) -> str:
start = source.find(signature)
if start < 0:
raise ValueError(f"Missing signature: {signature}")
brace_start = source.find("{", start)
if brace_start < 0:
raise ValueError(f"Missing opening brace for: {signature}")
depth = 0
for idx in range(brace_start, len(source)):
char = source[idx]
if char == "{":
depth += 1
elif char == "}":
depth -= 1
if depth == 0:
return source[brace_start : idx + 1]
raise ValueError(f"Unbalanced braces for: {signature}")
def main() -> int:
root = repo_root()
failures: list[str] = []
portal_path = root / "Sources" / "TerminalWindowPortal.swift"
portal_source = portal_path.read_text(encoding="utf-8")
if "hostView.layer?.masksToBounds = true" not in portal_source:
failures.append("WindowTerminalPortal init no longer enables hostView layer clipping")
if "hostView.postsFrameChangedNotifications = true" not in portal_source:
failures.append("WindowTerminalPortal init no longer enables hostView frame-change notifications")
if "hostView.postsBoundsChangedNotifications = true" not in portal_source:
failures.append("WindowTerminalPortal init no longer enables hostView bounds-change notifications")
if "private func synchronizeLayoutHierarchy()" not in portal_source:
failures.append("WindowTerminalPortal missing synchronizeLayoutHierarchy()")
if "private func synchronizeHostFrameToReference() -> Bool" not in portal_source:
failures.append("WindowTerminalPortal missing synchronizeHostFrameToReference()")
if "hostedView.reconcileGeometryNow()" not in extract_block(
portal_source,
"func bind(hostedView: GhosttySurfaceScrollView, to anchorView: NSView, visibleInUI: Bool, zPriority: Int = 0)",
):
failures.append("bind() no longer pre-reconciles hosted geometry before attach")
sync_block = extract_block(portal_source, "private func synchronizeHostedView(withId hostedId: ObjectIdentifier)")
for required in [
"let hostBounds = hostView.bounds",
"let clampedFrame = frameInHost.intersection(hostBounds)",
"let targetFrame = (hasFiniteFrame && hasVisibleIntersection) ? clampedFrame : frameInHost",
"hostedView.reconcileGeometryNow()",
"hostedView.refreshSurfaceNow()",
]:
if required not in sync_block:
failures.append(f"terminal portal sync missing: {required}")
if (
"scheduleDeferredFullSynchronizeAll()" not in sync_block
and "scheduleTransientRecoveryRetryIfNeeded(" not in sync_block
):
failures.append(
"terminal portal sync no longer schedules deferred recovery for transient geometry states"
)
terminal_view_path = root / "Sources" / "GhosttyTerminalView.swift"
terminal_view_source = terminal_view_path.read_text(encoding="utf-8")
resolved_block = extract_block(terminal_view_source, "private func resolvedSurfaceSize(preferred size: CGSize?) -> CGSize")
bounds_index = resolved_block.find("let currentBounds = bounds.size")
pending_index = resolved_block.find("if let pending = pendingSurfaceSize")
if bounds_index < 0 or pending_index < 0 or bounds_index > pending_index:
failures.append("resolvedSurfaceSize() no longer prefers bounds before pendingSurfaceSize")
update_block = extract_block(terminal_view_source, "private func updateSurfaceSize(size: CGSize? = nil)")
if "let size = resolvedSurfaceSize(preferred: size)" not in update_block:
failures.append("updateSurfaceSize() no longer resolves size via resolvedSurfaceSize()")
if failures:
print("FAIL: terminal resize/portal regression guards failed")
for item in failures:
print(f" - {item}")
return 1
print("PASS: terminal resize/portal regression guards are in place")
return 0
if __name__ == "__main__":
raise SystemExit(main())

View file

@ -1,54 +0,0 @@
#!/usr/bin/env python3
"""
Verify update UI timing constants so update indicators are visible long enough.
"""
from pathlib import Path
import re
import sys
ROOT = Path(__file__).resolve().parents[1]
TIMING_FILE = ROOT / "Sources" / "Update" / "UpdateTiming.swift"
def read_constants(text: str) -> dict[str, float]:
constants = {}
pattern = re.compile(r"static let (\w+): TimeInterval = ([0-9.]+)")
for match in pattern.finditer(text):
constants[match.group(1)] = float(match.group(2))
return constants
def main() -> int:
if not TIMING_FILE.exists():
print(f"Missing {TIMING_FILE}")
return 1
constants = read_constants(TIMING_FILE.read_text())
required = {
"minimumCheckDisplayDuration": 2.0,
"noUpdateDisplayDuration": 5.0,
}
failures = []
for name, expected in required.items():
actual = constants.get(name)
if actual is None:
failures.append(f"{name} missing")
continue
if actual != expected:
failures.append(f"{name} = {actual} (expected {expected})")
if failures:
print("Update timing test failed:")
for failure in failures:
print(f" - {failure}")
return 1
print("Update timing test passed.")
return 0
if __name__ == "__main__":
sys.exit(main())

View file

@ -0,0 +1,466 @@
#!/usr/bin/env python3
"""
Regression harness: compare typing latency before and after workspace churn.
Scenario A (baseline):
1) Keep only the first workspace.
2) Seed shell history.
3) Measure per-key latency for repeated Up-arrow shortcuts.
Scenario B (churn):
1) Keep only the first workspace.
2) Create N workspaces.
3) Visit every workspace (simulates clicking each tab), then return to the first.
4) Seed shell history.
5) Measure Up-arrow latency again.
The test fails when churn latency regresses too far relative to baseline.
"""
from __future__ import annotations
import os
import select
import socket
import statistics
import subprocess
import sys
import threading
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Callable, Optional
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from cmux import cmux, cmuxError
NEW_WORKSPACES = int(os.environ.get("CMUX_LAG_NEW_WORKSPACES", "20"))
SWITCH_PASSES = int(os.environ.get("CMUX_LAG_SWITCH_PASSES", "1"))
SWITCH_DELAY_S = float(os.environ.get("CMUX_LAG_SWITCH_DELAY_S", "0.06"))
HISTORY_SEED_LINES = int(os.environ.get("CMUX_LAG_HISTORY_LINES", "120"))
KEY_EVENTS = int(os.environ.get("CMUX_LAG_KEY_EVENTS", "180"))
KEY_DELAY_S = float(os.environ.get("CMUX_LAG_KEY_DELAY_S", "0.0"))
KEY_COMBO = os.environ.get("CMUX_LAG_KEY_COMBO", "up")
MAX_P95_RATIO = float(os.environ.get("CMUX_LAG_MAX_P95_RATIO", "1.70"))
MAX_AVG_RATIO = float(os.environ.get("CMUX_LAG_MAX_AVG_RATIO", "1.70"))
MAX_CHURN_P95_MS = float(os.environ.get("CMUX_LAG_MAX_CHURN_P95_MS", "35.0"))
MAX_P95_DELTA_MS = float(os.environ.get("CMUX_LAG_MAX_P95_DELTA_MS", "20.0"))
MAX_AVG_DELTA_MS = float(os.environ.get("CMUX_LAG_MAX_AVG_DELTA_MS", "12.0"))
MIN_BASELINE_P95_MS_FOR_RATIO = float(os.environ.get("CMUX_LAG_MIN_BASELINE_P95_MS_FOR_RATIO", "6.0"))
MIN_BASELINE_AVG_MS_FOR_RATIO = float(os.environ.get("CMUX_LAG_MIN_BASELINE_AVG_MS_FOR_RATIO", "4.0"))
MAX_CPU_PERCENT = float(os.environ.get("CMUX_LAG_MAX_CPU_PERCENT", "180.0"))
ENFORCE_CPU = os.environ.get("CMUX_LAG_ENFORCE_CPU", "0") == "1"
ALLOW_MAIN_SOCKET = os.environ.get("CMUX_LAG_ALLOW_MAIN_SOCKET", "0") == "1"
@dataclass
class LatencyStats:
n: int
avg_ms: float
p50_ms: float
p95_ms: float
p99_ms: float
max_ms: float
class RawSocketClient:
def __init__(self, socket_path: str):
self.socket_path = socket_path
self.sock: Optional[socket.socket] = None
self.recv_buffer = ""
def connect(self) -> None:
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.settimeout(3.0)
sock.connect(self.socket_path)
self.sock = sock
def close(self) -> None:
if self.sock is not None:
try:
self.sock.close()
finally:
self.sock = None
def __enter__(self) -> RawSocketClient:
self.connect()
return self
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
self.close()
def command(self, command: str, timeout_s: float = 2.0) -> str:
if self.sock is None:
raise cmuxError("Raw socket client not connected")
self.sock.sendall((command + "\n").encode("utf-8"))
deadline = time.time() + timeout_s
while True:
if "\n" in self.recv_buffer:
line, self.recv_buffer = self.recv_buffer.split("\n", 1)
return line
remaining = deadline - time.time()
if remaining <= 0:
raise cmuxError(f"Timed out waiting for response to: {command}")
ready, _, _ = select.select([self.sock], [], [], remaining)
if not ready:
raise cmuxError(f"Timed out waiting for response to: {command}")
chunk = self.sock.recv(8192)
if not chunk:
raise cmuxError("Socket closed while waiting for response")
self.recv_buffer += chunk.decode("utf-8", errors="replace")
def wait_for(predicate: Callable[[], bool], timeout_s: float, step_s: float = 0.05) -> None:
start = time.time()
while time.time() - start < timeout_s:
if predicate():
return
time.sleep(step_s)
raise cmuxError("Timed out waiting for condition")
def percentile(values: list[float], p: float) -> float:
if not values:
return 0.0
if len(values) == 1:
return values[0]
sorted_values = sorted(values)
idx = (len(sorted_values) - 1) * p
lower = int(idx)
upper = min(lower + 1, len(sorted_values) - 1)
fraction = idx - lower
return sorted_values[lower] * (1 - fraction) + sorted_values[upper] * fraction
def compute_stats(values_ms: list[float]) -> LatencyStats:
return LatencyStats(
n=len(values_ms),
avg_ms=statistics.mean(values_ms) if values_ms else 0.0,
p50_ms=percentile(values_ms, 0.50),
p95_ms=percentile(values_ms, 0.95),
p99_ms=percentile(values_ms, 0.99),
max_ms=max(values_ms) if values_ms else 0.0,
)
def get_cmux_pid_for_socket(socket_path: Optional[str]) -> Optional[int]:
if socket_path and os.path.exists(socket_path):
result = subprocess.run(["lsof", "-t", socket_path], capture_output=True, text=True)
if result.returncode == 0:
for line in result.stdout.strip().split("\n"):
line = line.strip()
if not line:
continue
try:
pid = int(line)
except ValueError:
continue
if pid != os.getpid():
return pid
result = subprocess.run(
["pgrep", "-f", r"cmux DEV.*\.app/Contents/MacOS/cmux DEV"],
capture_output=True,
text=True,
)
if result.returncode != 0:
return None
lines = [line.strip() for line in result.stdout.splitlines() if line.strip()]
return int(lines[0]) if lines else None
def resolve_target_socket() -> str:
socket_path = os.environ.get("CMUX_SOCKET_PATH")
if not socket_path:
raise cmuxError(
"CMUX_SOCKET_PATH is required. Point it to a tagged dev socket (for example /tmp/cmux-debug-<tag>.sock)."
)
base = os.path.basename(socket_path)
if not ALLOW_MAIN_SOCKET and base in {"cmux.sock", "cmux-debug.sock"}:
raise cmuxError(
f"Refusing to run against main socket '{socket_path}'. Set CMUX_SOCKET_PATH to a tagged dev instance."
)
return socket_path
def get_cpu(pid: int) -> float:
result = subprocess.run(["ps", "-p", str(pid), "-o", "%cpu="], capture_output=True, text=True)
if result.returncode != 0:
return 0.0
try:
return float(result.stdout.strip())
except ValueError:
return 0.0
class CPUMonitor:
def __init__(self, pid: int, interval_s: float = 0.2):
self.pid = pid
self.interval_s = interval_s
self._stop = threading.Event()
self._thread = threading.Thread(target=self._run, daemon=True)
self.samples: list[float] = []
def _run(self) -> None:
while not self._stop.is_set():
self.samples.append(get_cpu(self.pid))
time.sleep(self.interval_s)
def start(self) -> None:
self._thread.start()
def stop(self) -> None:
self._stop.set()
self._thread.join(timeout=2.0)
def keep_only_first_workspace(client: cmux) -> str:
workspaces = sorted(client.list_workspaces(), key=lambda row: row[0])
if not workspaces:
first_id = client.new_workspace()
client.select_workspace(first_id)
return first_id
first_id = workspaces[0][1]
client.select_workspace(first_id)
for _index, wid, _title, _selected in reversed(workspaces[1:]):
if wid == first_id:
continue
client.close_workspace(wid)
def only_first() -> bool:
current = sorted(client.list_workspaces(), key=lambda row: row[0])
return len(current) == 1 and current[0][1] == first_id
wait_for(only_first, timeout_s=6.0)
return first_id
def create_workspaces(client: cmux, count: int) -> list[str]:
created: list[str] = []
for _ in range(count):
wid = client.new_workspace()
created.append(wid)
time.sleep(0.04)
return created
def cycle_all_workspaces(client: cmux, passes: int, delay_s: float) -> list[str]:
ids = [wid for _idx, wid, _title, _selected in sorted(client.list_workspaces(), key=lambda row: row[0])]
for _ in range(passes):
for wid in ids:
client.select_workspace(wid)
time.sleep(delay_s)
return ids
def focused_terminal_panel(client: cmux) -> str:
surfaces = client.list_surfaces()
if not surfaces:
raise cmuxError("No surfaces available in selected workspace")
focused = next(((idx, sid) for idx, sid, is_focused in surfaces if is_focused), None)
if focused is None:
idx, sid, _ = surfaces[0]
client.focus_surface(idx)
return sid
return focused[1]
def seed_history(client: cmux, lines: int) -> None:
for i in range(lines):
client.send_line(f"echo cmux-lag-seed-{i}")
def run_shortcut_latency_burst(
socket_path: str,
combo: str,
count: int,
delay_s: float,
) -> list[float]:
latencies_ms: list[float] = []
with RawSocketClient(socket_path) as raw:
# Warm up the command path and responder chain.
for _ in range(5):
response = raw.command(f"simulate_shortcut {combo}")
if not response.startswith("OK"):
raise cmuxError(response)
for _ in range(count):
start = time.perf_counter()
response = raw.command(f"simulate_shortcut {combo}")
elapsed_ms = (time.perf_counter() - start) * 1000.0
if not response.startswith("OK"):
raise cmuxError(response)
latencies_ms.append(elapsed_ms)
if delay_s > 0:
time.sleep(delay_s)
return latencies_ms
def maybe_write_sample(pid: Optional[int], prefix: str) -> Optional[Path]:
if pid is None:
return None
out = Path(f"/tmp/{prefix}_{pid}.txt")
result = subprocess.run(["sample", str(pid), "2"], capture_output=True, text=True)
out.write_text(result.stdout + result.stderr)
return out
def print_stats(label: str, stats: LatencyStats) -> None:
print(f"\n{label}")
print(f" events: {stats.n}")
print(f" avg_ms: {stats.avg_ms:.2f}")
print(f" p50_ms: {stats.p50_ms:.2f}")
print(f" p95_ms: {stats.p95_ms:.2f}")
print(f" p99_ms: {stats.p99_ms:.2f}")
print(f" max_ms: {stats.max_ms:.2f}")
def run_baseline_scenario(client: cmux, socket_path: str) -> tuple[str, LatencyStats]:
first_workspace_id = keep_only_first_workspace(client)
client.select_workspace(first_workspace_id)
panel_id = focused_terminal_panel(client)
seed_history(client, HISTORY_SEED_LINES)
latencies = run_shortcut_latency_burst(
socket_path=socket_path,
combo=KEY_COMBO,
count=KEY_EVENTS,
delay_s=KEY_DELAY_S,
)
return panel_id, compute_stats(latencies)
def run_churn_scenario(client: cmux, socket_path: str, first_workspace_id: str) -> tuple[str, LatencyStats]:
first_workspace_id = keep_only_first_workspace(client)
_ = create_workspaces(client, NEW_WORKSPACES)
ordered_ids = cycle_all_workspaces(client, SWITCH_PASSES, SWITCH_DELAY_S)
if first_workspace_id in ordered_ids:
client.select_workspace(first_workspace_id)
elif ordered_ids:
client.select_workspace(ordered_ids[0])
panel_id = focused_terminal_panel(client)
seed_history(client, HISTORY_SEED_LINES)
latencies = run_shortcut_latency_burst(
socket_path=socket_path,
combo=KEY_COMBO,
count=KEY_EVENTS,
delay_s=KEY_DELAY_S,
)
return panel_id, compute_stats(latencies)
def main() -> int:
print("=" * 64)
print("Workspace Churn + Up-Arrow Latency Regression")
print("=" * 64)
client: Optional[cmux] = None
pid: Optional[int] = None
first_workspace_id: Optional[str] = None
try:
target_socket = resolve_target_socket()
client = cmux(socket_path=target_socket)
client.connect()
print(f"Using socket: {client.socket_path}")
pid = get_cmux_pid_for_socket(client.socket_path)
if pid is None:
print("SKIP: cmux process not found for socket")
return 0
cpu_monitor = CPUMonitor(pid)
cpu_monitor.start()
first_workspace_id = keep_only_first_workspace(client)
baseline_panel_id, baseline = run_baseline_scenario(client, client.socket_path)
print(f"Baseline panel: {baseline_panel_id}")
churn_panel_id, churn = run_churn_scenario(client, client.socket_path, first_workspace_id)
print(f"Churn panel: {churn_panel_id}")
cpu_monitor.stop()
cpu_samples = cpu_monitor.samples
cpu_avg = statistics.mean(cpu_samples) if cpu_samples else 0.0
cpu_max = max(cpu_samples) if cpu_samples else 0.0
print_stats("Baseline", baseline)
print_stats("After workspace churn", churn)
p95_ratio = churn.p95_ms / max(baseline.p95_ms, 0.001)
avg_ratio = churn.avg_ms / max(baseline.avg_ms, 0.001)
p95_delta_ms = churn.p95_ms - baseline.p95_ms
avg_delta_ms = churn.avg_ms - baseline.avg_ms
enforce_p95_ratio = baseline.p95_ms >= MIN_BASELINE_P95_MS_FOR_RATIO
enforce_avg_ratio = baseline.avg_ms >= MIN_BASELINE_AVG_MS_FOR_RATIO
print("\nComparison")
print(
f" p95_ratio: {p95_ratio:.2f}x (max {MAX_P95_RATIO:.2f}x, "
f"enabled when baseline p95 >= {MIN_BASELINE_P95_MS_FOR_RATIO:.2f}ms)"
)
print(
f" avg_ratio: {avg_ratio:.2f}x (max {MAX_AVG_RATIO:.2f}x, "
f"enabled when baseline avg >= {MIN_BASELINE_AVG_MS_FOR_RATIO:.2f}ms)"
)
print(f" churn_p95_ms: {churn.p95_ms:.2f} (max {MAX_CHURN_P95_MS:.2f})")
print(f" p95_delta_ms: {p95_delta_ms:.2f} (max {MAX_P95_DELTA_MS:.2f})")
print(f" avg_delta_ms: {avg_delta_ms:.2f} (max {MAX_AVG_DELTA_MS:.2f})")
print(f" cpu_avg_pct: {cpu_avg:.2f}")
print(f" cpu_max_pct: {cpu_max:.2f}")
failures: list[str] = []
if enforce_p95_ratio and p95_ratio > MAX_P95_RATIO:
failures.append(f"p95 ratio {p95_ratio:.2f}x > {MAX_P95_RATIO:.2f}x")
if enforce_avg_ratio and avg_ratio > MAX_AVG_RATIO:
failures.append(f"avg ratio {avg_ratio:.2f}x > {MAX_AVG_RATIO:.2f}x")
if p95_delta_ms > MAX_P95_DELTA_MS:
failures.append(f"p95 delta {p95_delta_ms:.2f}ms > {MAX_P95_DELTA_MS:.2f}ms")
if avg_delta_ms > MAX_AVG_DELTA_MS:
failures.append(f"avg delta {avg_delta_ms:.2f}ms > {MAX_AVG_DELTA_MS:.2f}ms")
if churn.p95_ms > MAX_CHURN_P95_MS:
failures.append(f"churn p95 {churn.p95_ms:.2f}ms > {MAX_CHURN_P95_MS:.2f}ms")
if ENFORCE_CPU and cpu_max > MAX_CPU_PERCENT:
failures.append(f"cpu max {cpu_max:.2f}% > {MAX_CPU_PERCENT:.2f}%")
if failures:
print("\nFAIL")
for item in failures:
print(f" - {item}")
sample_path = maybe_write_sample(pid, "cmux_workspace_churn_up_arrow_lag")
if sample_path:
print(f" sample_path: {sample_path}")
return 1
print("\nPASS")
return 0
except cmuxError as e:
print(f"FAIL: {e}")
sample_path = maybe_write_sample(pid, "cmux_workspace_churn_up_arrow_error")
if sample_path:
print(f"sample_path: {sample_path}")
return 1
finally:
if client is not None:
try:
if first_workspace_id:
client.select_workspace(first_workspace_id)
keep_only_first_workspace(client)
except Exception:
pass
client.close()
if __name__ == "__main__":
raise SystemExit(main())