Add VM split-churn fuzz tests and harden portal reveal gating

This commit is contained in:
Lawrence Chen 2026-02-23 15:28:23 -08:00
parent b34b3a530a
commit 310d807767
4 changed files with 777 additions and 5 deletions

View file

@ -535,6 +535,10 @@ private final class SplitDividerOverlayView: NSView {
@MainActor
final class WindowTerminalPortal: NSObject {
private static let tinyHideThreshold: CGFloat = 1
private static let minimumRevealWidth: CGFloat = 24
private static let minimumRevealHeight: CGFloat = 18
private weak var window: NSWindow?
private let hostView = WindowTerminalHostView(frame: .zero)
private let dividerOverlayView = SplitDividerOverlayView(frame: .zero)
@ -886,7 +890,12 @@ final class WindowTerminalPortal: NSObject {
frameInHost.size.width.isFinite &&
frameInHost.size.height.isFinite
let anchorHidden = Self.isHiddenOrAncestorHidden(anchorView)
let tinyFrame = frameInHost.width <= 1 || frameInHost.height <= 1
let tinyFrame =
frameInHost.width <= Self.tinyHideThreshold ||
frameInHost.height <= Self.tinyHideThreshold
let revealReadyForDisplay =
frameInHost.width >= Self.minimumRevealWidth &&
frameInHost.height >= Self.minimumRevealHeight
let outsideHostBounds = !frameInHost.intersects(hostView.bounds)
let shouldHide =
!entry.visibleInUI ||
@ -894,6 +903,7 @@ final class WindowTerminalPortal: NSObject {
tinyFrame ||
!hasFiniteFrame ||
outsideHostBounds
let shouldDeferReveal = !shouldHide && hostedView.isHidden && !revealReadyForDisplay
let oldFrame = hostedView.frame
#if DEBUG
@ -920,7 +930,7 @@ final class WindowTerminalPortal: NSObject {
dlog(
"portal.hidden hosted=\(portalDebugToken(hostedView)) value=1 " +
"visibleInUI=\(entry.visibleInUI ? 1 : 0) anchorHidden=\(anchorHidden ? 1 : 0) " +
"tiny=\(tinyFrame ? 1 : 0) finite=\(hasFiniteFrame ? 1 : 0) " +
"tiny=\(tinyFrame ? 1 : 0) revealReady=\(revealReadyForDisplay ? 1 : 0) finite=\(hasFiniteFrame ? 1 : 0) " +
"outside=\(outsideHostBounds ? 1 : 0) frame=\(portalDebugFrame(frameInHost))"
)
#endif
@ -935,17 +945,29 @@ final class WindowTerminalPortal: NSObject {
if abs(oldFrame.size.width - frameInHost.size.width) > 0.5 ||
abs(oldFrame.size.height - frameInHost.size.height) > 0.5,
!shouldHide {
!shouldHide,
(!hostedView.isHidden || revealReadyForDisplay) {
hostedView.reconcileGeometryNow()
}
}
if !shouldHide, hostedView.isHidden {
if shouldDeferReveal {
#if DEBUG
if !Self.rectApproximatelyEqual(oldFrame, frameInHost) {
dlog(
"portal.hidden.deferReveal hosted=\(portalDebugToken(hostedView)) " +
"frame=\(portalDebugFrame(frameInHost)) min=\(Int(Self.minimumRevealWidth))x\(Int(Self.minimumRevealHeight))"
)
}
#endif
}
if !shouldHide, hostedView.isHidden, revealReadyForDisplay {
#if DEBUG
dlog(
"portal.hidden hosted=\(portalDebugToken(hostedView)) value=0 " +
"visibleInUI=\(entry.visibleInUI ? 1 : 0) anchorHidden=\(anchorHidden ? 1 : 0) " +
"tiny=\(tinyFrame ? 1 : 0) finite=\(hasFiniteFrame ? 1 : 0) " +
"tiny=\(tinyFrame ? 1 : 0) revealReady=\(revealReadyForDisplay ? 1 : 0) finite=\(hasFiniteFrame ? 1 : 0) " +
"outside=\(outsideHostBounds ? 1 : 0) frame=\(portalDebugFrame(frameInHost))"
)
#endif

View file

@ -4364,6 +4364,14 @@ final class GhosttySurfaceOverlayTests: XCTestCase {
@MainActor
final class TerminalWindowPortalLifecycleTests: XCTestCase {
private func realizeWindowLayout(_ window: NSWindow) {
window.makeKeyAndOrderFront(nil)
window.displayIfNeeded()
window.contentView?.layoutSubtreeIfNeeded()
RunLoop.current.run(until: Date().addingTimeInterval(0.05))
window.contentView?.layoutSubtreeIfNeeded()
}
func testPortalHostInstallsAboveContentViewForVisibility() {
let window = NSWindow(
contentRect: NSRect(x: 0, y: 0, width: 320, height: 240),
@ -4553,6 +4561,50 @@ final class TerminalWindowPortalLifecycleTests: XCTestCase {
"Promoting z-priority should bring an already-visible terminal to front"
)
}
func testHiddenPortalDefersRevealUntilFrameHasUsableSize() {
let window = NSWindow(
contentRect: NSRect(x: 0, y: 0, width: 700, height: 420),
styleMask: [.titled, .closable],
backing: .buffered,
defer: false
)
defer { window.orderOut(nil) }
let portal = WindowTerminalPortal(window: window)
realizeWindowLayout(window)
guard let contentView = window.contentView else {
XCTFail("Expected content view")
return
}
let anchor = NSView(frame: NSRect(x: 40, y: 40, width: 280, height: 220))
contentView.addSubview(anchor)
let hosted = GhosttySurfaceScrollView(
surfaceView: GhosttyNSView(frame: NSRect(x: 0, y: 0, width: 120, height: 80))
)
portal.bind(hostedView: hosted, to: anchor, visibleInUI: true)
XCTAssertFalse(hosted.isHidden, "Healthy geometry should be visible")
// Collapse to a tiny frame first.
anchor.frame = NSRect(x: 160.5, y: 1037.0, width: 79.0, height: 0.0)
portal.synchronizeHostedViewForAnchor(anchor)
XCTAssertTrue(hosted.isHidden, "Tiny geometry should hide the portal-hosted terminal")
// Then restore to a non-zero but still too-small frame. It should remain hidden.
anchor.frame = NSRect(x: 160.9, y: 1026.5, width: 93.6, height: 10.3)
portal.synchronizeHostedViewForAnchor(anchor)
XCTAssertTrue(
hosted.isHidden,
"Portal should defer reveal until geometry reaches a usable size"
)
// Once the frame is large enough again, reveal should resume.
anchor.frame = NSRect(x: 40, y: 40, width: 180, height: 40)
portal.synchronizeHostedViewForAnchor(anchor)
XCTAssertFalse(hosted.isHidden, "Portal should unhide after geometry is usable")
}
}
@MainActor

View file

@ -0,0 +1,211 @@
#!/usr/bin/env python3
"""
Fuzz regression: rapid Cmd+D / Ctrl+D churn must not shift the outer bonsplit container frame.
This targets the user-reported visual shift/flash while spamming split + close.
We treat any drift in x/y/width/height of the outer container frame as a failure.
"""
from collections import deque
import os
import random
import sys
import time
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent))
from cmux import cmux, cmuxError
SOCKET_PATH = os.environ.get("CMUX_SOCKET", "/tmp/cmux-debug.sock")
FUZZ_SEED = int(os.environ.get("CMUX_SPLIT_FUZZ_SEED", "424242"))
FUZZ_STEPS = int(os.environ.get("CMUX_SPLIT_FUZZ_STEPS", "1400"))
SAMPLES_PER_STEP = int(os.environ.get("CMUX_SPLIT_FUZZ_SAMPLES", "4"))
SAMPLE_INTERVAL_S = float(os.environ.get("CMUX_SPLIT_FUZZ_SAMPLE_INTERVAL_S", "0.0015"))
ACTION_JITTER_MAX_S = float(os.environ.get("CMUX_SPLIT_FUZZ_ACTION_JITTER_MAX_S", "0.0035"))
BURST_MAX = int(os.environ.get("CMUX_SPLIT_FUZZ_BURST_MAX", "3"))
MAX_PANES = int(os.environ.get("CMUX_SPLIT_FUZZ_MAX_PANES", "10"))
EPSILON = float(os.environ.get("CMUX_SPLIT_FUZZ_EPSILON", "0.0"))
TRACE_TAIL = int(os.environ.get("CMUX_SPLIT_FUZZ_TRACE_TAIL", "40"))
ASSERT_NO_UNDERFLOW = os.environ.get("CMUX_SPLIT_FUZZ_ASSERT_NO_UNDERFLOW", "0") == "1"
ASSERT_NO_EMPTY_PANEL = os.environ.get("CMUX_SPLIT_FUZZ_ASSERT_NO_EMPTY_PANEL", "0") == "1"
def _pane_count(layout_payload: dict) -> int:
layout = layout_payload.get("layout") or {}
panes = layout.get("panes") or []
return len(panes)
def _largest_split_frame(layout_payload: dict) -> dict:
selected = layout_payload.get("selectedPanels") or []
best = None
best_area = -1.0
for row in selected:
for split in row.get("splitViews") or []:
frame = split.get("frame")
if not frame:
continue
try:
x = float(frame.get("x", 0.0))
y = float(frame.get("y", 0.0))
width = float(frame.get("width", 0.0))
height = float(frame.get("height", 0.0))
except (TypeError, ValueError):
continue
if width <= 0.0 or height <= 0.0:
continue
area = width * height
if area > best_area:
best_area = area
best = {"x": x, "y": y, "width": width, "height": height}
if best is None:
raise cmuxError(f"layout_debug contains no usable split-view frame: {layout_payload}")
return best
def _container_frame(layout_payload: dict) -> dict:
container = (layout_payload.get("layout") or {}).get("containerFrame")
if container:
try:
return {
"x": float(container.get("x", 0.0)),
"y": float(container.get("y", 0.0)),
"width": float(container.get("width", 0.0)),
"height": float(container.get("height", 0.0)),
}
except (TypeError, ValueError):
pass
# Back-compat fallback for older payloads that don't expose containerFrame.
return _largest_split_frame(layout_payload)
def _assert_same_frame(
current: dict,
baseline: dict,
*,
step: int,
sample: int,
action: str,
seed: int,
action_index: int,
trace: list[str],
) -> None:
deltas = {
key: abs(float(current[key]) - float(baseline[key]))
for key in ("x", "y", "width", "height")
}
shifted = {k: v for k, v in deltas.items() if v > EPSILON}
if shifted:
raise cmuxError(
"Outer split container shifted during fuzz churn "
f"(step={step}, sample={sample}, action={action}, action_index={action_index}, seed={seed}, "
f"baseline={baseline}, current={current}, deltas={deltas}, epsilon={EPSILON})"
f"; recent_actions={trace}"
)
def _warm_start_split(c: cmux) -> dict:
# Ensure we have at least one split so the container frame exists in layout_debug.
c.simulate_shortcut("cmd+d")
deadline = time.time() + 2.0
last = None
while time.time() < deadline:
payload = c.layout_debug()
last = payload
if _pane_count(payload) >= 2:
return payload
time.sleep(0.02)
raise cmuxError(f"Timed out waiting for first split to appear: {last}")
def main() -> int:
rng = random.Random(FUZZ_SEED)
recent_actions: deque[str] = deque(maxlen=max(8, TRACE_TAIL))
total_actions = 0
with cmux(SOCKET_PATH) as c:
ws = c.new_workspace()
c.select_workspace(ws)
c.activate_app()
time.sleep(0.2)
c.reset_bonsplit_underflow_count()
c.reset_empty_panel_count()
initial = _warm_start_split(c)
baseline = _container_frame(initial)
if _pane_count(initial) < 2:
raise cmuxError("Expected at least 2 panes after warm start split")
for step in range(1, FUZZ_STEPS + 1):
burst = rng.randint(1, max(1, BURST_MAX))
for burst_index in range(1, burst + 1):
before = c.layout_debug()
pane_count = _pane_count(before)
if pane_count <= 2:
action = "cmd+d"
elif pane_count >= MAX_PANES:
action = "ctrl+d"
else:
# Bias toward split to keep churn dense while still frequently collapsing via ctrl+d.
action = "cmd+d" if rng.random() < 0.60 else "ctrl+d"
if action == "cmd+d":
c.simulate_shortcut("cmd+d")
else:
# Ctrl+D equivalent sent directly to the focused terminal surface.
c.send_ctrl_d()
total_actions += 1
recent_actions.append(
f"step={step}/burst={burst_index}/{burst} panes_before={pane_count} action={action}"
)
# Random micro-jitter to emulate uneven key-repeat timing while keeping churn fast.
if ACTION_JITTER_MAX_S > 0:
time.sleep(rng.uniform(0.0, ACTION_JITTER_MAX_S))
# Sample repeatedly after each burst to catch transient shifts.
for sample in range(0, SAMPLES_PER_STEP + 1):
payload = c.layout_debug()
current = _container_frame(payload)
_assert_same_frame(
current,
baseline,
step=step,
sample=sample,
action="burst",
seed=FUZZ_SEED,
action_index=total_actions,
trace=list(recent_actions),
)
if SAMPLE_INTERVAL_S > 0:
time.sleep(rng.uniform(0.0, SAMPLE_INTERVAL_S))
underflows = c.bonsplit_underflow_count()
if ASSERT_NO_UNDERFLOW and underflows != 0:
raise cmuxError(f"bonsplit arranged-subview underflow observed during fuzz run: {underflows}")
flashes = c.empty_panel_count()
if ASSERT_NO_EMPTY_PANEL and flashes != 0:
raise cmuxError(f"EmptyPanelView appeared during fuzz run (count={flashes})")
print(
"PASS: cmd+d/ctrl+d fuzz geometry invariant "
f"(seed={FUZZ_SEED}, steps={FUZZ_STEPS}, samples={SAMPLES_PER_STEP}, burst_max={BURST_MAX}, "
f"actions={total_actions}, epsilon={EPSILON}, underflows={underflows}, empty_panel={flashes})"
)
return 0
if __name__ == "__main__":
raise SystemExit(main())

View file

@ -0,0 +1,487 @@
#!/usr/bin/env python3
"""
Focused fuzz regression for rapid Cmd+D / Ctrl+D churn in a strict 1<->2 pane loop.
Intent:
- Keep topology limited to one pane or two left/right panes only.
- Run across multiple fresh workspaces.
- Sample layout as fast as the debug socket allows during transitions/holds.
- Fail immediately if outer container x/y/width/height drifts at any sampled frame.
"""
from collections import deque
import os
import random
import sys
import time
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent))
from cmux import cmux, cmuxError
SOCKET_PATH = os.environ.get("CMUX_SOCKET", "/tmp/cmux-debug.sock")
FUZZ_SEED = int(os.environ.get("CMUX_SPLIT_2PANE_SEED", "20260223"))
WORKSPACES = int(os.environ.get("CMUX_SPLIT_2PANE_WORKSPACES", "3"))
CYCLES_PER_WORKSPACE = int(os.environ.get("CMUX_SPLIT_2PANE_CYCLES", "220"))
TRANSITION_TIMEOUT_S = float(os.environ.get("CMUX_SPLIT_2PANE_TIMEOUT_S", "2.0"))
HOLD_MIN_S = float(os.environ.get("CMUX_SPLIT_2PANE_HOLD_MIN_S", "0.003"))
HOLD_MAX_S = float(os.environ.get("CMUX_SPLIT_2PANE_HOLD_MAX_S", "0.018"))
PRE_ACTION_JITTER_MAX_S = float(os.environ.get("CMUX_SPLIT_2PANE_ACTION_JITTER_MAX_S", "0.002"))
EPSILON = float(os.environ.get("CMUX_SPLIT_2PANE_EPSILON", "0.0"))
TRACE_TAIL = int(os.environ.get("CMUX_SPLIT_2PANE_TRACE_TAIL", "64"))
LAYOUT_POLL_SLEEP_S = float(os.environ.get("CMUX_SPLIT_2PANE_POLL_SLEEP_S", "0.0008"))
LAYOUT_TIMEOUT_RETRIES = int(os.environ.get("CMUX_SPLIT_2PANE_LAYOUT_TIMEOUT_RETRIES", "4"))
LAYOUT_TIMEOUT_RETRY_SLEEP_S = float(os.environ.get("CMUX_SPLIT_2PANE_LAYOUT_TIMEOUT_RETRY_SLEEP_S", "0.0015"))
MAX_LAYOUT_TIMEOUTS = int(os.environ.get("CMUX_SPLIT_2PANE_MAX_LAYOUT_TIMEOUTS", "80"))
CTRL_D_RETRY_INTERVAL_S = float(os.environ.get("CMUX_SPLIT_2PANE_CTRL_D_RETRY_INTERVAL_S", "0.18"))
CTRL_D_MAX_EXTRA = int(os.environ.get("CMUX_SPLIT_2PANE_CTRL_D_MAX_EXTRA", "6"))
def _pane_count(layout_payload: dict) -> int:
layout = layout_payload.get("layout") or {}
return len(layout.get("panes") or [])
def _largest_split_frame(layout_payload: dict) -> dict:
selected = layout_payload.get("selectedPanels") or []
best = None
best_area = -1.0
for row in selected:
for split in row.get("splitViews") or []:
frame = split.get("frame")
if not frame:
continue
try:
x = float(frame.get("x", 0.0))
y = float(frame.get("y", 0.0))
width = float(frame.get("width", 0.0))
height = float(frame.get("height", 0.0))
except (TypeError, ValueError):
continue
if width <= 0.0 or height <= 0.0:
continue
area = width * height
if area > best_area:
best_area = area
best = {"x": x, "y": y, "width": width, "height": height}
if best is None:
raise cmuxError(f"layout_debug contains no usable split-view frame: {layout_payload}")
return best
def _container_frame(layout_payload: dict) -> dict:
container = (layout_payload.get("layout") or {}).get("containerFrame")
if container:
try:
return {
"x": float(container.get("x", 0.0)),
"y": float(container.get("y", 0.0)),
"width": float(container.get("width", 0.0)),
"height": float(container.get("height", 0.0)),
}
except (TypeError, ValueError):
pass
return _largest_split_frame(layout_payload)
def _pane_frames_sorted_x(layout_payload: dict) -> list[dict]:
layout = layout_payload.get("layout") or {}
panes = layout.get("panes") or []
frames: list[dict] = []
for pane in panes:
frame = pane.get("frame") or {}
try:
frames.append(
{
"pane_id": str(pane.get("paneId") or ""),
"x": float(frame.get("x", 0.0)),
"y": float(frame.get("y", 0.0)),
"width": float(frame.get("width", 0.0)),
"height": float(frame.get("height", 0.0)),
}
)
except (TypeError, ValueError):
continue
return sorted(frames, key=lambda p: (p["x"], p["y"]))
def _assert_same_frame(
*,
current: dict,
baseline: dict,
workspace_index: int,
cycle: int,
phase: str,
sample: int,
trace: list[str],
) -> None:
deltas = {
key: abs(float(current[key]) - float(baseline[key]))
for key in ("x", "y", "width", "height")
}
shifted = {k: v for k, v in deltas.items() if v > EPSILON}
if shifted:
raise cmuxError(
"Container frame shifted "
f"(workspace={workspace_index}, cycle={cycle}, phase={phase}, sample={sample}, "
f"baseline={baseline}, current={current}, deltas={deltas}, epsilon={EPSILON}); "
f"recent_actions={trace}"
)
def _assert_two_panes_left_right(layout_payload: dict, *, workspace_index: int, cycle: int, trace: list[str]) -> None:
panes = _pane_frames_sorted_x(layout_payload)
if len(panes) != 2:
raise cmuxError(
f"Expected exactly 2 panes in two-pane phase, got {len(panes)} "
f"(workspace={workspace_index}, cycle={cycle}); panes={panes}; recent_actions={trace}"
)
left, right = panes[0], panes[1]
if left["width"] <= 0.0 or left["height"] <= 0.0 or right["width"] <= 0.0 or right["height"] <= 0.0:
raise cmuxError(
f"Collapsed pane in two-pane phase (workspace={workspace_index}, cycle={cycle}): "
f"left={left} right={right}; recent_actions={trace}"
)
if left["x"] >= right["x"]:
raise cmuxError(
f"Two-pane geometry is not left/right (workspace={workspace_index}, cycle={cycle}): "
f"left={left} right={right}; recent_actions={trace}"
)
def _selected_panel_by_pane(layout_payload: dict) -> dict[str, str]:
out: dict[str, str] = {}
for row in layout_payload.get("selectedPanels") or []:
pane_id = str(row.get("paneId") or "")
panel_id = str(row.get("panelId") or "")
if pane_id and panel_id:
out[pane_id] = panel_id
return out
def _rightmost_pane_id(layout_payload: dict) -> str:
panes = _pane_frames_sorted_x(layout_payload)
if len(panes) < 2:
raise cmuxError(f"Expected at least 2 panes to resolve rightmost pane: {panes}")
pane_id = str(panes[-1].get("pane_id") or "")
if not pane_id:
raise cmuxError(f"Rightmost pane is missing pane_id: {panes[-1]}")
return pane_id
def _rightmost_panel_id(layout_payload: dict) -> str:
pane_id = _rightmost_pane_id(layout_payload)
selected = _selected_panel_by_pane(layout_payload)
panel_id = str(selected.get(pane_id) or "")
if not panel_id:
raise cmuxError(f"Missing selected panel for rightmost pane: pane_id={pane_id}, selected={selected}")
return panel_id
def _safe_layout_debug(c: cmux, *, timeout_state: dict[str, int], context: str) -> dict:
for attempt in range(0, max(0, LAYOUT_TIMEOUT_RETRIES) + 1):
try:
return c.layout_debug()
except cmuxError as exc:
if "timed out waiting for response" not in str(exc).lower():
raise
timeout_state["count"] = timeout_state.get("count", 0) + 1
count = timeout_state["count"]
if count > max(0, MAX_LAYOUT_TIMEOUTS):
raise cmuxError(
f"Exceeded layout_debug timeout budget (count={count}, max={MAX_LAYOUT_TIMEOUTS}, context={context})"
) from exc
if attempt >= max(0, LAYOUT_TIMEOUT_RETRIES):
raise cmuxError(
f"layout_debug timed out after retries (attempts={attempt + 1}, count={count}, context={context})"
) from exc
if LAYOUT_TIMEOUT_RETRY_SLEEP_S > 0:
time.sleep(LAYOUT_TIMEOUT_RETRY_SLEEP_S)
raise cmuxError(f"layout_debug retry loop exhausted unexpectedly (context={context})")
def _sample_while(
c: cmux,
*,
baseline: dict,
deadline: float,
workspace_index: int,
cycle: int,
phase: str,
trace: list[str],
timeout_state: dict[str, int],
) -> int:
sampled = 0
while time.time() < deadline:
payload = _safe_layout_debug(
c,
timeout_state=timeout_state,
context=f"sample workspace={workspace_index} cycle={cycle} phase={phase} sample={sampled}",
)
current = _container_frame(payload)
_assert_same_frame(
current=current,
baseline=baseline,
workspace_index=workspace_index,
cycle=cycle,
phase=phase,
sample=sampled,
trace=trace,
)
panes_now = _pane_count(payload)
if panes_now > 2:
raise cmuxError(
f"Observed >2 panes in strict two-pane fuzz "
f"(workspace={workspace_index}, cycle={cycle}, phase={phase}, panes={panes_now}); "
f"recent_actions={trace}"
)
sampled += 1
if LAYOUT_POLL_SLEEP_S > 0:
time.sleep(LAYOUT_POLL_SLEEP_S)
return sampled
def _wait_for_panes(
c: cmux,
*,
target_panes: int,
baseline: dict,
workspace_index: int,
cycle: int,
phase: str,
timeout_s: float,
trace: list[str],
timeout_state: dict[str, int],
) -> tuple[dict, int]:
deadline = time.time() + timeout_s
sampled = 0
last = None
while time.time() < deadline:
payload = _safe_layout_debug(
c,
timeout_state=timeout_state,
context=f"wait workspace={workspace_index} cycle={cycle} phase={phase} sample={sampled}",
)
last = payload
current = _container_frame(payload)
_assert_same_frame(
current=current,
baseline=baseline,
workspace_index=workspace_index,
cycle=cycle,
phase=phase,
sample=sampled,
trace=trace,
)
panes_now = _pane_count(payload)
if panes_now > 2:
raise cmuxError(
f"Observed >2 panes in strict two-pane fuzz while waiting "
f"(workspace={workspace_index}, cycle={cycle}, phase={phase}, panes={panes_now}); "
f"recent_actions={trace}"
)
if panes_now == target_panes:
return payload, sampled + 1
sampled += 1
if LAYOUT_POLL_SLEEP_S > 0:
time.sleep(LAYOUT_POLL_SLEEP_S)
raise cmuxError(
f"Timed out waiting for {target_panes} panes "
f"(workspace={workspace_index}, cycle={cycle}, phase={phase}, sampled={sampled}, "
f"last_panes={_pane_count(last or {})}, timeout_s={timeout_s}); recent_actions={trace}"
)
def _wait_for_single_pane_after_ctrl_d(
c: cmux,
*,
baseline: dict,
workspace_index: int,
cycle: int,
phase: str,
timeout_s: float,
recent_actions: deque[str],
timeout_state: dict[str, int],
) -> tuple[dict, int, int]:
deadline = time.time() + timeout_s
sampled = 0
extra_ctrl_d = 0
last = None
next_retry_at = time.time() + max(0.0, CTRL_D_RETRY_INTERVAL_S)
while time.time() < deadline:
payload = _safe_layout_debug(
c,
timeout_state=timeout_state,
context=f"wait workspace={workspace_index} cycle={cycle} phase={phase} sample={sampled}",
)
last = payload
current = _container_frame(payload)
trace = list(recent_actions)
_assert_same_frame(
current=current,
baseline=baseline,
workspace_index=workspace_index,
cycle=cycle,
phase=phase,
sample=sampled,
trace=trace,
)
panes_now = _pane_count(payload)
if panes_now > 2:
raise cmuxError(
f"Observed >2 panes in strict two-pane fuzz while waiting "
f"(workspace={workspace_index}, cycle={cycle}, phase={phase}, panes={panes_now}); "
f"recent_actions={trace}"
)
if panes_now == 1:
return payload, sampled + 1, extra_ctrl_d
now = time.time()
if panes_now == 2 and extra_ctrl_d < max(0, CTRL_D_MAX_EXTRA) and now >= next_retry_at:
retry_right_panel_id = _rightmost_panel_id(payload)
try:
c.send_key_surface(retry_right_panel_id, "ctrl-d")
except cmuxError as exc:
# Pane/surface can disappear between layout sample and send call under heavy churn.
# Skip this retry tick and re-sample.
if "not_found" in str(exc).lower():
next_retry_at = now + max(0.0, CTRL_D_RETRY_INTERVAL_S)
sampled += 1
if LAYOUT_POLL_SLEEP_S > 0:
time.sleep(LAYOUT_POLL_SLEEP_S)
continue
raise
extra_ctrl_d += 1
recent_actions.append(
f"ws={workspace_index} cycle={cycle} action=ctrl+d(extra:{extra_ctrl_d}/{CTRL_D_MAX_EXTRA},surface={retry_right_panel_id})"
)
next_retry_at = now + max(0.0, CTRL_D_RETRY_INTERVAL_S)
sampled += 1
if LAYOUT_POLL_SLEEP_S > 0:
time.sleep(LAYOUT_POLL_SLEEP_S)
raise cmuxError(
f"Timed out waiting for 1 pane after ctrl+d "
f"(workspace={workspace_index}, cycle={cycle}, phase={phase}, sampled={sampled}, "
f"extra_ctrl_d={extra_ctrl_d}, last_panes={_pane_count(last or {})}, timeout_s={timeout_s}); "
f"recent_actions={list(recent_actions)}"
)
def main() -> int:
rng = random.Random(FUZZ_SEED)
recent_actions: deque[str] = deque(maxlen=max(8, TRACE_TAIL))
total_samples = 0
total_cycles = 0
total_extra_ctrl_d = 0
timeout_state: dict[str, int] = {"count": 0}
with cmux(SOCKET_PATH) as c:
c.activate_app()
for workspace_index in range(1, WORKSPACES + 1):
ws = c.new_workspace()
c.select_workspace(ws)
c.activate_app()
time.sleep(0.08)
start = _safe_layout_debug(c, timeout_state=timeout_state, context=f"workspace={workspace_index} start")
baseline = _container_frame(start)
start_panes = _pane_count(start)
if start_panes != 1:
raise cmuxError(f"New workspace did not start as single pane (workspace={workspace_index}, panes={start_panes})")
for cycle in range(1, CYCLES_PER_WORKSPACE + 1):
total_cycles += 1
if PRE_ACTION_JITTER_MAX_S > 0:
time.sleep(rng.uniform(0.0, PRE_ACTION_JITTER_MAX_S))
recent_actions.append(f"ws={workspace_index} cycle={cycle} action=cmd+d")
c.simulate_shortcut("cmd+d")
after_split, sampled = _wait_for_panes(
c,
target_panes=2,
baseline=baseline,
workspace_index=workspace_index,
cycle=cycle,
phase="after_cmd+d",
timeout_s=TRANSITION_TIMEOUT_S,
trace=list(recent_actions),
timeout_state=timeout_state,
)
total_samples += sampled
_assert_two_panes_left_right(after_split, workspace_index=workspace_index, cycle=cycle, trace=list(recent_actions))
hold_split = rng.uniform(HOLD_MIN_S, HOLD_MAX_S)
total_samples += _sample_while(
c,
baseline=baseline,
deadline=time.time() + hold_split,
workspace_index=workspace_index,
cycle=cycle,
phase="hold_2pane",
trace=list(recent_actions),
timeout_state=timeout_state,
)
if PRE_ACTION_JITTER_MAX_S > 0:
time.sleep(rng.uniform(0.0, PRE_ACTION_JITTER_MAX_S))
right_panel_id = _rightmost_panel_id(after_split)
recent_actions.append(f"ws={workspace_index} cycle={cycle} action=ctrl+d(surface={right_panel_id})")
c.send_key_surface(right_panel_id, "ctrl-d")
_, sampled, extra_ctrl_d = _wait_for_single_pane_after_ctrl_d(
c,
baseline=baseline,
workspace_index=workspace_index,
cycle=cycle,
phase="after_ctrl+d",
timeout_s=TRANSITION_TIMEOUT_S,
recent_actions=recent_actions,
timeout_state=timeout_state,
)
total_samples += sampled
total_extra_ctrl_d += extra_ctrl_d
hold_single = rng.uniform(HOLD_MIN_S, HOLD_MAX_S)
total_samples += _sample_while(
c,
baseline=baseline,
deadline=time.time() + hold_single,
workspace_index=workspace_index,
cycle=cycle,
phase="hold_1pane",
trace=list(recent_actions),
timeout_state=timeout_state,
)
c.close_workspace(ws)
time.sleep(0.05)
print(
"PASS: strict two-pane cmd+d/ctrl+d frame guard "
f"(seed={FUZZ_SEED}, workspaces={WORKSPACES}, cycles={total_cycles}, samples={total_samples}, "
f"extra_ctrl_d={total_extra_ctrl_d}, epsilon={EPSILON}, layout_timeouts={timeout_state.get('count', 0)})"
)
return 0
if __name__ == "__main__":
raise SystemExit(main())