diff --git a/Sources/TerminalWindowPortal.swift b/Sources/TerminalWindowPortal.swift index a2f0c8c8..4c0dc9c3 100644 --- a/Sources/TerminalWindowPortal.swift +++ b/Sources/TerminalWindowPortal.swift @@ -535,6 +535,10 @@ private final class SplitDividerOverlayView: NSView { @MainActor final class WindowTerminalPortal: NSObject { + private static let tinyHideThreshold: CGFloat = 1 + private static let minimumRevealWidth: CGFloat = 24 + private static let minimumRevealHeight: CGFloat = 18 + private weak var window: NSWindow? private let hostView = WindowTerminalHostView(frame: .zero) private let dividerOverlayView = SplitDividerOverlayView(frame: .zero) @@ -886,7 +890,12 @@ final class WindowTerminalPortal: NSObject { frameInHost.size.width.isFinite && frameInHost.size.height.isFinite let anchorHidden = Self.isHiddenOrAncestorHidden(anchorView) - let tinyFrame = frameInHost.width <= 1 || frameInHost.height <= 1 + let tinyFrame = + frameInHost.width <= Self.tinyHideThreshold || + frameInHost.height <= Self.tinyHideThreshold + let revealReadyForDisplay = + frameInHost.width >= Self.minimumRevealWidth && + frameInHost.height >= Self.minimumRevealHeight let outsideHostBounds = !frameInHost.intersects(hostView.bounds) let shouldHide = !entry.visibleInUI || @@ -894,6 +903,7 @@ final class WindowTerminalPortal: NSObject { tinyFrame || !hasFiniteFrame || outsideHostBounds + let shouldDeferReveal = !shouldHide && hostedView.isHidden && !revealReadyForDisplay let oldFrame = hostedView.frame #if DEBUG @@ -920,7 +930,7 @@ final class WindowTerminalPortal: NSObject { dlog( "portal.hidden hosted=\(portalDebugToken(hostedView)) value=1 " + "visibleInUI=\(entry.visibleInUI ? 1 : 0) anchorHidden=\(anchorHidden ? 1 : 0) " + - "tiny=\(tinyFrame ? 1 : 0) finite=\(hasFiniteFrame ? 1 : 0) " + + "tiny=\(tinyFrame ? 1 : 0) revealReady=\(revealReadyForDisplay ? 1 : 0) finite=\(hasFiniteFrame ? 1 : 0) " + "outside=\(outsideHostBounds ? 1 : 0) frame=\(portalDebugFrame(frameInHost))" ) #endif @@ -935,17 +945,29 @@ final class WindowTerminalPortal: NSObject { if abs(oldFrame.size.width - frameInHost.size.width) > 0.5 || abs(oldFrame.size.height - frameInHost.size.height) > 0.5, - !shouldHide { + !shouldHide, + (!hostedView.isHidden || revealReadyForDisplay) { hostedView.reconcileGeometryNow() } } - if !shouldHide, hostedView.isHidden { + if shouldDeferReveal { +#if DEBUG + if !Self.rectApproximatelyEqual(oldFrame, frameInHost) { + dlog( + "portal.hidden.deferReveal hosted=\(portalDebugToken(hostedView)) " + + "frame=\(portalDebugFrame(frameInHost)) min=\(Int(Self.minimumRevealWidth))x\(Int(Self.minimumRevealHeight))" + ) + } +#endif + } + + if !shouldHide, hostedView.isHidden, revealReadyForDisplay { #if DEBUG dlog( "portal.hidden hosted=\(portalDebugToken(hostedView)) value=0 " + "visibleInUI=\(entry.visibleInUI ? 1 : 0) anchorHidden=\(anchorHidden ? 1 : 0) " + - "tiny=\(tinyFrame ? 1 : 0) finite=\(hasFiniteFrame ? 1 : 0) " + + "tiny=\(tinyFrame ? 1 : 0) revealReady=\(revealReadyForDisplay ? 1 : 0) finite=\(hasFiniteFrame ? 1 : 0) " + "outside=\(outsideHostBounds ? 1 : 0) frame=\(portalDebugFrame(frameInHost))" ) #endif diff --git a/cmuxTests/CmuxWebViewKeyEquivalentTests.swift b/cmuxTests/CmuxWebViewKeyEquivalentTests.swift index 8341c5f1..92e0fcd0 100644 --- a/cmuxTests/CmuxWebViewKeyEquivalentTests.swift +++ b/cmuxTests/CmuxWebViewKeyEquivalentTests.swift @@ -4364,6 +4364,14 @@ final class GhosttySurfaceOverlayTests: XCTestCase { @MainActor final class TerminalWindowPortalLifecycleTests: XCTestCase { + private func realizeWindowLayout(_ window: NSWindow) { + window.makeKeyAndOrderFront(nil) + window.displayIfNeeded() + window.contentView?.layoutSubtreeIfNeeded() + RunLoop.current.run(until: Date().addingTimeInterval(0.05)) + window.contentView?.layoutSubtreeIfNeeded() + } + func testPortalHostInstallsAboveContentViewForVisibility() { let window = NSWindow( contentRect: NSRect(x: 0, y: 0, width: 320, height: 240), @@ -4553,6 +4561,50 @@ final class TerminalWindowPortalLifecycleTests: XCTestCase { "Promoting z-priority should bring an already-visible terminal to front" ) } + + func testHiddenPortalDefersRevealUntilFrameHasUsableSize() { + let window = NSWindow( + contentRect: NSRect(x: 0, y: 0, width: 700, height: 420), + styleMask: [.titled, .closable], + backing: .buffered, + defer: false + ) + defer { window.orderOut(nil) } + + let portal = WindowTerminalPortal(window: window) + realizeWindowLayout(window) + guard let contentView = window.contentView else { + XCTFail("Expected content view") + return + } + + let anchor = NSView(frame: NSRect(x: 40, y: 40, width: 280, height: 220)) + contentView.addSubview(anchor) + + let hosted = GhosttySurfaceScrollView( + surfaceView: GhosttyNSView(frame: NSRect(x: 0, y: 0, width: 120, height: 80)) + ) + portal.bind(hostedView: hosted, to: anchor, visibleInUI: true) + XCTAssertFalse(hosted.isHidden, "Healthy geometry should be visible") + + // Collapse to a tiny frame first. + anchor.frame = NSRect(x: 160.5, y: 1037.0, width: 79.0, height: 0.0) + portal.synchronizeHostedViewForAnchor(anchor) + XCTAssertTrue(hosted.isHidden, "Tiny geometry should hide the portal-hosted terminal") + + // Then restore to a non-zero but still too-small frame. It should remain hidden. + anchor.frame = NSRect(x: 160.9, y: 1026.5, width: 93.6, height: 10.3) + portal.synchronizeHostedViewForAnchor(anchor) + XCTAssertTrue( + hosted.isHidden, + "Portal should defer reveal until geometry reaches a usable size" + ) + + // Once the frame is large enough again, reveal should resume. + anchor.frame = NSRect(x: 40, y: 40, width: 180, height: 40) + portal.synchronizeHostedViewForAnchor(anchor) + XCTAssertFalse(hosted.isHidden, "Portal should unhide after geometry is usable") + } } @MainActor diff --git a/tests_v2/test_split_cmd_d_ctrl_d_geometry_fuzz.py b/tests_v2/test_split_cmd_d_ctrl_d_geometry_fuzz.py new file mode 100644 index 00000000..bc0cf9f3 --- /dev/null +++ b/tests_v2/test_split_cmd_d_ctrl_d_geometry_fuzz.py @@ -0,0 +1,211 @@ +#!/usr/bin/env python3 +""" +Fuzz regression: rapid Cmd+D / Ctrl+D churn must not shift the outer bonsplit container frame. + +This targets the user-reported visual shift/flash while spamming split + close. +We treat any drift in x/y/width/height of the outer container frame as a failure. +""" + +from collections import deque +import os +import random +import sys +import time +from pathlib import Path + +sys.path.insert(0, str(Path(__file__).parent)) +from cmux import cmux, cmuxError + + +SOCKET_PATH = os.environ.get("CMUX_SOCKET", "/tmp/cmux-debug.sock") +FUZZ_SEED = int(os.environ.get("CMUX_SPLIT_FUZZ_SEED", "424242")) +FUZZ_STEPS = int(os.environ.get("CMUX_SPLIT_FUZZ_STEPS", "1400")) +SAMPLES_PER_STEP = int(os.environ.get("CMUX_SPLIT_FUZZ_SAMPLES", "4")) +SAMPLE_INTERVAL_S = float(os.environ.get("CMUX_SPLIT_FUZZ_SAMPLE_INTERVAL_S", "0.0015")) +ACTION_JITTER_MAX_S = float(os.environ.get("CMUX_SPLIT_FUZZ_ACTION_JITTER_MAX_S", "0.0035")) +BURST_MAX = int(os.environ.get("CMUX_SPLIT_FUZZ_BURST_MAX", "3")) +MAX_PANES = int(os.environ.get("CMUX_SPLIT_FUZZ_MAX_PANES", "10")) +EPSILON = float(os.environ.get("CMUX_SPLIT_FUZZ_EPSILON", "0.0")) +TRACE_TAIL = int(os.environ.get("CMUX_SPLIT_FUZZ_TRACE_TAIL", "40")) +ASSERT_NO_UNDERFLOW = os.environ.get("CMUX_SPLIT_FUZZ_ASSERT_NO_UNDERFLOW", "0") == "1" +ASSERT_NO_EMPTY_PANEL = os.environ.get("CMUX_SPLIT_FUZZ_ASSERT_NO_EMPTY_PANEL", "0") == "1" + + +def _pane_count(layout_payload: dict) -> int: + layout = layout_payload.get("layout") or {} + panes = layout.get("panes") or [] + return len(panes) + + +def _largest_split_frame(layout_payload: dict) -> dict: + selected = layout_payload.get("selectedPanels") or [] + best = None + best_area = -1.0 + + for row in selected: + for split in row.get("splitViews") or []: + frame = split.get("frame") + if not frame: + continue + + try: + x = float(frame.get("x", 0.0)) + y = float(frame.get("y", 0.0)) + width = float(frame.get("width", 0.0)) + height = float(frame.get("height", 0.0)) + except (TypeError, ValueError): + continue + + if width <= 0.0 or height <= 0.0: + continue + + area = width * height + if area > best_area: + best_area = area + best = {"x": x, "y": y, "width": width, "height": height} + + if best is None: + raise cmuxError(f"layout_debug contains no usable split-view frame: {layout_payload}") + return best + + +def _container_frame(layout_payload: dict) -> dict: + container = (layout_payload.get("layout") or {}).get("containerFrame") + if container: + try: + return { + "x": float(container.get("x", 0.0)), + "y": float(container.get("y", 0.0)), + "width": float(container.get("width", 0.0)), + "height": float(container.get("height", 0.0)), + } + except (TypeError, ValueError): + pass + + # Back-compat fallback for older payloads that don't expose containerFrame. + return _largest_split_frame(layout_payload) + + +def _assert_same_frame( + current: dict, + baseline: dict, + *, + step: int, + sample: int, + action: str, + seed: int, + action_index: int, + trace: list[str], +) -> None: + deltas = { + key: abs(float(current[key]) - float(baseline[key])) + for key in ("x", "y", "width", "height") + } + shifted = {k: v for k, v in deltas.items() if v > EPSILON} + if shifted: + raise cmuxError( + "Outer split container shifted during fuzz churn " + f"(step={step}, sample={sample}, action={action}, action_index={action_index}, seed={seed}, " + f"baseline={baseline}, current={current}, deltas={deltas}, epsilon={EPSILON})" + f"; recent_actions={trace}" + ) + + +def _warm_start_split(c: cmux) -> dict: + # Ensure we have at least one split so the container frame exists in layout_debug. + c.simulate_shortcut("cmd+d") + deadline = time.time() + 2.0 + last = None + while time.time() < deadline: + payload = c.layout_debug() + last = payload + if _pane_count(payload) >= 2: + return payload + time.sleep(0.02) + raise cmuxError(f"Timed out waiting for first split to appear: {last}") + + +def main() -> int: + rng = random.Random(FUZZ_SEED) + recent_actions: deque[str] = deque(maxlen=max(8, TRACE_TAIL)) + total_actions = 0 + + with cmux(SOCKET_PATH) as c: + ws = c.new_workspace() + c.select_workspace(ws) + c.activate_app() + time.sleep(0.2) + + c.reset_bonsplit_underflow_count() + c.reset_empty_panel_count() + + initial = _warm_start_split(c) + baseline = _container_frame(initial) + if _pane_count(initial) < 2: + raise cmuxError("Expected at least 2 panes after warm start split") + + for step in range(1, FUZZ_STEPS + 1): + burst = rng.randint(1, max(1, BURST_MAX)) + + for burst_index in range(1, burst + 1): + before = c.layout_debug() + pane_count = _pane_count(before) + + if pane_count <= 2: + action = "cmd+d" + elif pane_count >= MAX_PANES: + action = "ctrl+d" + else: + # Bias toward split to keep churn dense while still frequently collapsing via ctrl+d. + action = "cmd+d" if rng.random() < 0.60 else "ctrl+d" + + if action == "cmd+d": + c.simulate_shortcut("cmd+d") + else: + # Ctrl+D equivalent sent directly to the focused terminal surface. + c.send_ctrl_d() + + total_actions += 1 + recent_actions.append( + f"step={step}/burst={burst_index}/{burst} panes_before={pane_count} action={action}" + ) + + # Random micro-jitter to emulate uneven key-repeat timing while keeping churn fast. + if ACTION_JITTER_MAX_S > 0: + time.sleep(rng.uniform(0.0, ACTION_JITTER_MAX_S)) + + # Sample repeatedly after each burst to catch transient shifts. + for sample in range(0, SAMPLES_PER_STEP + 1): + payload = c.layout_debug() + current = _container_frame(payload) + _assert_same_frame( + current, + baseline, + step=step, + sample=sample, + action="burst", + seed=FUZZ_SEED, + action_index=total_actions, + trace=list(recent_actions), + ) + if SAMPLE_INTERVAL_S > 0: + time.sleep(rng.uniform(0.0, SAMPLE_INTERVAL_S)) + + underflows = c.bonsplit_underflow_count() + if ASSERT_NO_UNDERFLOW and underflows != 0: + raise cmuxError(f"bonsplit arranged-subview underflow observed during fuzz run: {underflows}") + + flashes = c.empty_panel_count() + if ASSERT_NO_EMPTY_PANEL and flashes != 0: + raise cmuxError(f"EmptyPanelView appeared during fuzz run (count={flashes})") + + print( + "PASS: cmd+d/ctrl+d fuzz geometry invariant " + f"(seed={FUZZ_SEED}, steps={FUZZ_STEPS}, samples={SAMPLES_PER_STEP}, burst_max={BURST_MAX}, " + f"actions={total_actions}, epsilon={EPSILON}, underflows={underflows}, empty_panel={flashes})" + ) + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/tests_v2/test_split_cmd_d_ctrl_d_two_pane_frame_guard.py b/tests_v2/test_split_cmd_d_ctrl_d_two_pane_frame_guard.py new file mode 100644 index 00000000..b4413d83 --- /dev/null +++ b/tests_v2/test_split_cmd_d_ctrl_d_two_pane_frame_guard.py @@ -0,0 +1,487 @@ +#!/usr/bin/env python3 +""" +Focused fuzz regression for rapid Cmd+D / Ctrl+D churn in a strict 1<->2 pane loop. + +Intent: + - Keep topology limited to one pane or two left/right panes only. + - Run across multiple fresh workspaces. + - Sample layout as fast as the debug socket allows during transitions/holds. + - Fail immediately if outer container x/y/width/height drifts at any sampled frame. +""" + +from collections import deque +import os +import random +import sys +import time +from pathlib import Path + +sys.path.insert(0, str(Path(__file__).parent)) +from cmux import cmux, cmuxError + + +SOCKET_PATH = os.environ.get("CMUX_SOCKET", "/tmp/cmux-debug.sock") +FUZZ_SEED = int(os.environ.get("CMUX_SPLIT_2PANE_SEED", "20260223")) +WORKSPACES = int(os.environ.get("CMUX_SPLIT_2PANE_WORKSPACES", "3")) +CYCLES_PER_WORKSPACE = int(os.environ.get("CMUX_SPLIT_2PANE_CYCLES", "220")) +TRANSITION_TIMEOUT_S = float(os.environ.get("CMUX_SPLIT_2PANE_TIMEOUT_S", "2.0")) +HOLD_MIN_S = float(os.environ.get("CMUX_SPLIT_2PANE_HOLD_MIN_S", "0.003")) +HOLD_MAX_S = float(os.environ.get("CMUX_SPLIT_2PANE_HOLD_MAX_S", "0.018")) +PRE_ACTION_JITTER_MAX_S = float(os.environ.get("CMUX_SPLIT_2PANE_ACTION_JITTER_MAX_S", "0.002")) +EPSILON = float(os.environ.get("CMUX_SPLIT_2PANE_EPSILON", "0.0")) +TRACE_TAIL = int(os.environ.get("CMUX_SPLIT_2PANE_TRACE_TAIL", "64")) +LAYOUT_POLL_SLEEP_S = float(os.environ.get("CMUX_SPLIT_2PANE_POLL_SLEEP_S", "0.0008")) +LAYOUT_TIMEOUT_RETRIES = int(os.environ.get("CMUX_SPLIT_2PANE_LAYOUT_TIMEOUT_RETRIES", "4")) +LAYOUT_TIMEOUT_RETRY_SLEEP_S = float(os.environ.get("CMUX_SPLIT_2PANE_LAYOUT_TIMEOUT_RETRY_SLEEP_S", "0.0015")) +MAX_LAYOUT_TIMEOUTS = int(os.environ.get("CMUX_SPLIT_2PANE_MAX_LAYOUT_TIMEOUTS", "80")) +CTRL_D_RETRY_INTERVAL_S = float(os.environ.get("CMUX_SPLIT_2PANE_CTRL_D_RETRY_INTERVAL_S", "0.18")) +CTRL_D_MAX_EXTRA = int(os.environ.get("CMUX_SPLIT_2PANE_CTRL_D_MAX_EXTRA", "6")) + + +def _pane_count(layout_payload: dict) -> int: + layout = layout_payload.get("layout") or {} + return len(layout.get("panes") or []) + + +def _largest_split_frame(layout_payload: dict) -> dict: + selected = layout_payload.get("selectedPanels") or [] + best = None + best_area = -1.0 + for row in selected: + for split in row.get("splitViews") or []: + frame = split.get("frame") + if not frame: + continue + try: + x = float(frame.get("x", 0.0)) + y = float(frame.get("y", 0.0)) + width = float(frame.get("width", 0.0)) + height = float(frame.get("height", 0.0)) + except (TypeError, ValueError): + continue + if width <= 0.0 or height <= 0.0: + continue + area = width * height + if area > best_area: + best_area = area + best = {"x": x, "y": y, "width": width, "height": height} + if best is None: + raise cmuxError(f"layout_debug contains no usable split-view frame: {layout_payload}") + return best + + +def _container_frame(layout_payload: dict) -> dict: + container = (layout_payload.get("layout") or {}).get("containerFrame") + if container: + try: + return { + "x": float(container.get("x", 0.0)), + "y": float(container.get("y", 0.0)), + "width": float(container.get("width", 0.0)), + "height": float(container.get("height", 0.0)), + } + except (TypeError, ValueError): + pass + return _largest_split_frame(layout_payload) + + +def _pane_frames_sorted_x(layout_payload: dict) -> list[dict]: + layout = layout_payload.get("layout") or {} + panes = layout.get("panes") or [] + frames: list[dict] = [] + for pane in panes: + frame = pane.get("frame") or {} + try: + frames.append( + { + "pane_id": str(pane.get("paneId") or ""), + "x": float(frame.get("x", 0.0)), + "y": float(frame.get("y", 0.0)), + "width": float(frame.get("width", 0.0)), + "height": float(frame.get("height", 0.0)), + } + ) + except (TypeError, ValueError): + continue + return sorted(frames, key=lambda p: (p["x"], p["y"])) + + +def _assert_same_frame( + *, + current: dict, + baseline: dict, + workspace_index: int, + cycle: int, + phase: str, + sample: int, + trace: list[str], +) -> None: + deltas = { + key: abs(float(current[key]) - float(baseline[key])) + for key in ("x", "y", "width", "height") + } + shifted = {k: v for k, v in deltas.items() if v > EPSILON} + if shifted: + raise cmuxError( + "Container frame shifted " + f"(workspace={workspace_index}, cycle={cycle}, phase={phase}, sample={sample}, " + f"baseline={baseline}, current={current}, deltas={deltas}, epsilon={EPSILON}); " + f"recent_actions={trace}" + ) + + +def _assert_two_panes_left_right(layout_payload: dict, *, workspace_index: int, cycle: int, trace: list[str]) -> None: + panes = _pane_frames_sorted_x(layout_payload) + if len(panes) != 2: + raise cmuxError( + f"Expected exactly 2 panes in two-pane phase, got {len(panes)} " + f"(workspace={workspace_index}, cycle={cycle}); panes={panes}; recent_actions={trace}" + ) + + left, right = panes[0], panes[1] + if left["width"] <= 0.0 or left["height"] <= 0.0 or right["width"] <= 0.0 or right["height"] <= 0.0: + raise cmuxError( + f"Collapsed pane in two-pane phase (workspace={workspace_index}, cycle={cycle}): " + f"left={left} right={right}; recent_actions={trace}" + ) + + if left["x"] >= right["x"]: + raise cmuxError( + f"Two-pane geometry is not left/right (workspace={workspace_index}, cycle={cycle}): " + f"left={left} right={right}; recent_actions={trace}" + ) + + +def _selected_panel_by_pane(layout_payload: dict) -> dict[str, str]: + out: dict[str, str] = {} + for row in layout_payload.get("selectedPanels") or []: + pane_id = str(row.get("paneId") or "") + panel_id = str(row.get("panelId") or "") + if pane_id and panel_id: + out[pane_id] = panel_id + return out + + +def _rightmost_pane_id(layout_payload: dict) -> str: + panes = _pane_frames_sorted_x(layout_payload) + if len(panes) < 2: + raise cmuxError(f"Expected at least 2 panes to resolve rightmost pane: {panes}") + pane_id = str(panes[-1].get("pane_id") or "") + if not pane_id: + raise cmuxError(f"Rightmost pane is missing pane_id: {panes[-1]}") + return pane_id + + +def _rightmost_panel_id(layout_payload: dict) -> str: + pane_id = _rightmost_pane_id(layout_payload) + selected = _selected_panel_by_pane(layout_payload) + panel_id = str(selected.get(pane_id) or "") + if not panel_id: + raise cmuxError(f"Missing selected panel for rightmost pane: pane_id={pane_id}, selected={selected}") + return panel_id + + +def _safe_layout_debug(c: cmux, *, timeout_state: dict[str, int], context: str) -> dict: + for attempt in range(0, max(0, LAYOUT_TIMEOUT_RETRIES) + 1): + try: + return c.layout_debug() + except cmuxError as exc: + if "timed out waiting for response" not in str(exc).lower(): + raise + + timeout_state["count"] = timeout_state.get("count", 0) + 1 + count = timeout_state["count"] + if count > max(0, MAX_LAYOUT_TIMEOUTS): + raise cmuxError( + f"Exceeded layout_debug timeout budget (count={count}, max={MAX_LAYOUT_TIMEOUTS}, context={context})" + ) from exc + + if attempt >= max(0, LAYOUT_TIMEOUT_RETRIES): + raise cmuxError( + f"layout_debug timed out after retries (attempts={attempt + 1}, count={count}, context={context})" + ) from exc + + if LAYOUT_TIMEOUT_RETRY_SLEEP_S > 0: + time.sleep(LAYOUT_TIMEOUT_RETRY_SLEEP_S) + + raise cmuxError(f"layout_debug retry loop exhausted unexpectedly (context={context})") + + +def _sample_while( + c: cmux, + *, + baseline: dict, + deadline: float, + workspace_index: int, + cycle: int, + phase: str, + trace: list[str], + timeout_state: dict[str, int], +) -> int: + sampled = 0 + while time.time() < deadline: + payload = _safe_layout_debug( + c, + timeout_state=timeout_state, + context=f"sample workspace={workspace_index} cycle={cycle} phase={phase} sample={sampled}", + ) + current = _container_frame(payload) + _assert_same_frame( + current=current, + baseline=baseline, + workspace_index=workspace_index, + cycle=cycle, + phase=phase, + sample=sampled, + trace=trace, + ) + + panes_now = _pane_count(payload) + if panes_now > 2: + raise cmuxError( + f"Observed >2 panes in strict two-pane fuzz " + f"(workspace={workspace_index}, cycle={cycle}, phase={phase}, panes={panes_now}); " + f"recent_actions={trace}" + ) + sampled += 1 + if LAYOUT_POLL_SLEEP_S > 0: + time.sleep(LAYOUT_POLL_SLEEP_S) + return sampled + + +def _wait_for_panes( + c: cmux, + *, + target_panes: int, + baseline: dict, + workspace_index: int, + cycle: int, + phase: str, + timeout_s: float, + trace: list[str], + timeout_state: dict[str, int], +) -> tuple[dict, int]: + deadline = time.time() + timeout_s + sampled = 0 + last = None + + while time.time() < deadline: + payload = _safe_layout_debug( + c, + timeout_state=timeout_state, + context=f"wait workspace={workspace_index} cycle={cycle} phase={phase} sample={sampled}", + ) + last = payload + current = _container_frame(payload) + _assert_same_frame( + current=current, + baseline=baseline, + workspace_index=workspace_index, + cycle=cycle, + phase=phase, + sample=sampled, + trace=trace, + ) + + panes_now = _pane_count(payload) + if panes_now > 2: + raise cmuxError( + f"Observed >2 panes in strict two-pane fuzz while waiting " + f"(workspace={workspace_index}, cycle={cycle}, phase={phase}, panes={panes_now}); " + f"recent_actions={trace}" + ) + if panes_now == target_panes: + return payload, sampled + 1 + sampled += 1 + if LAYOUT_POLL_SLEEP_S > 0: + time.sleep(LAYOUT_POLL_SLEEP_S) + + raise cmuxError( + f"Timed out waiting for {target_panes} panes " + f"(workspace={workspace_index}, cycle={cycle}, phase={phase}, sampled={sampled}, " + f"last_panes={_pane_count(last or {})}, timeout_s={timeout_s}); recent_actions={trace}" + ) + + +def _wait_for_single_pane_after_ctrl_d( + c: cmux, + *, + baseline: dict, + workspace_index: int, + cycle: int, + phase: str, + timeout_s: float, + recent_actions: deque[str], + timeout_state: dict[str, int], +) -> tuple[dict, int, int]: + deadline = time.time() + timeout_s + sampled = 0 + extra_ctrl_d = 0 + last = None + next_retry_at = time.time() + max(0.0, CTRL_D_RETRY_INTERVAL_S) + + while time.time() < deadline: + payload = _safe_layout_debug( + c, + timeout_state=timeout_state, + context=f"wait workspace={workspace_index} cycle={cycle} phase={phase} sample={sampled}", + ) + last = payload + current = _container_frame(payload) + trace = list(recent_actions) + _assert_same_frame( + current=current, + baseline=baseline, + workspace_index=workspace_index, + cycle=cycle, + phase=phase, + sample=sampled, + trace=trace, + ) + + panes_now = _pane_count(payload) + if panes_now > 2: + raise cmuxError( + f"Observed >2 panes in strict two-pane fuzz while waiting " + f"(workspace={workspace_index}, cycle={cycle}, phase={phase}, panes={panes_now}); " + f"recent_actions={trace}" + ) + if panes_now == 1: + return payload, sampled + 1, extra_ctrl_d + + now = time.time() + if panes_now == 2 and extra_ctrl_d < max(0, CTRL_D_MAX_EXTRA) and now >= next_retry_at: + retry_right_panel_id = _rightmost_panel_id(payload) + try: + c.send_key_surface(retry_right_panel_id, "ctrl-d") + except cmuxError as exc: + # Pane/surface can disappear between layout sample and send call under heavy churn. + # Skip this retry tick and re-sample. + if "not_found" in str(exc).lower(): + next_retry_at = now + max(0.0, CTRL_D_RETRY_INTERVAL_S) + sampled += 1 + if LAYOUT_POLL_SLEEP_S > 0: + time.sleep(LAYOUT_POLL_SLEEP_S) + continue + raise + extra_ctrl_d += 1 + recent_actions.append( + f"ws={workspace_index} cycle={cycle} action=ctrl+d(extra:{extra_ctrl_d}/{CTRL_D_MAX_EXTRA},surface={retry_right_panel_id})" + ) + next_retry_at = now + max(0.0, CTRL_D_RETRY_INTERVAL_S) + + sampled += 1 + if LAYOUT_POLL_SLEEP_S > 0: + time.sleep(LAYOUT_POLL_SLEEP_S) + + raise cmuxError( + f"Timed out waiting for 1 pane after ctrl+d " + f"(workspace={workspace_index}, cycle={cycle}, phase={phase}, sampled={sampled}, " + f"extra_ctrl_d={extra_ctrl_d}, last_panes={_pane_count(last or {})}, timeout_s={timeout_s}); " + f"recent_actions={list(recent_actions)}" + ) + + +def main() -> int: + rng = random.Random(FUZZ_SEED) + recent_actions: deque[str] = deque(maxlen=max(8, TRACE_TAIL)) + total_samples = 0 + total_cycles = 0 + total_extra_ctrl_d = 0 + timeout_state: dict[str, int] = {"count": 0} + + with cmux(SOCKET_PATH) as c: + c.activate_app() + + for workspace_index in range(1, WORKSPACES + 1): + ws = c.new_workspace() + c.select_workspace(ws) + c.activate_app() + time.sleep(0.08) + + start = _safe_layout_debug(c, timeout_state=timeout_state, context=f"workspace={workspace_index} start") + baseline = _container_frame(start) + start_panes = _pane_count(start) + if start_panes != 1: + raise cmuxError(f"New workspace did not start as single pane (workspace={workspace_index}, panes={start_panes})") + + for cycle in range(1, CYCLES_PER_WORKSPACE + 1): + total_cycles += 1 + + if PRE_ACTION_JITTER_MAX_S > 0: + time.sleep(rng.uniform(0.0, PRE_ACTION_JITTER_MAX_S)) + + recent_actions.append(f"ws={workspace_index} cycle={cycle} action=cmd+d") + c.simulate_shortcut("cmd+d") + + after_split, sampled = _wait_for_panes( + c, + target_panes=2, + baseline=baseline, + workspace_index=workspace_index, + cycle=cycle, + phase="after_cmd+d", + timeout_s=TRANSITION_TIMEOUT_S, + trace=list(recent_actions), + timeout_state=timeout_state, + ) + total_samples += sampled + _assert_two_panes_left_right(after_split, workspace_index=workspace_index, cycle=cycle, trace=list(recent_actions)) + + hold_split = rng.uniform(HOLD_MIN_S, HOLD_MAX_S) + total_samples += _sample_while( + c, + baseline=baseline, + deadline=time.time() + hold_split, + workspace_index=workspace_index, + cycle=cycle, + phase="hold_2pane", + trace=list(recent_actions), + timeout_state=timeout_state, + ) + + if PRE_ACTION_JITTER_MAX_S > 0: + time.sleep(rng.uniform(0.0, PRE_ACTION_JITTER_MAX_S)) + + right_panel_id = _rightmost_panel_id(after_split) + recent_actions.append(f"ws={workspace_index} cycle={cycle} action=ctrl+d(surface={right_panel_id})") + c.send_key_surface(right_panel_id, "ctrl-d") + + _, sampled, extra_ctrl_d = _wait_for_single_pane_after_ctrl_d( + c, + baseline=baseline, + workspace_index=workspace_index, + cycle=cycle, + phase="after_ctrl+d", + timeout_s=TRANSITION_TIMEOUT_S, + recent_actions=recent_actions, + timeout_state=timeout_state, + ) + total_samples += sampled + total_extra_ctrl_d += extra_ctrl_d + + hold_single = rng.uniform(HOLD_MIN_S, HOLD_MAX_S) + total_samples += _sample_while( + c, + baseline=baseline, + deadline=time.time() + hold_single, + workspace_index=workspace_index, + cycle=cycle, + phase="hold_1pane", + trace=list(recent_actions), + timeout_state=timeout_state, + ) + + c.close_workspace(ws) + time.sleep(0.05) + + print( + "PASS: strict two-pane cmd+d/ctrl+d frame guard " + f"(seed={FUZZ_SEED}, workspaces={WORKSPACES}, cycles={total_cycles}, samples={total_samples}, " + f"extra_ctrl_d={total_extra_ctrl_d}, epsilon={EPSILON}, layout_timeouts={timeout_state.get('count', 0)})" + ) + return 0 + + +if __name__ == "__main__": + raise SystemExit(main())