add Claude hook session mapping test

This commit is contained in:
Lawrence Chen 2026-02-15 21:50:06 -08:00
parent 4312f917f0
commit 8f399d1a82

View file

@ -0,0 +1,228 @@
#!/usr/bin/env python3
"""
E2E regression test for Claude hook session mapping.
Validates:
1) session-start records session_id -> workspace/surface mapping on disk
2) notification updates mapped session state
3) stop consumes the mapping and emits a richer completion notification
"""
from __future__ import annotations
import glob
import json
import os
import shutil
import subprocess
import sys
import tempfile
import time
import uuid
from pathlib import Path
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from cmux import cmux, cmuxError
def resolve_cmux_cli() -> str:
explicit = os.environ.get("CMUX_CLI_BIN") or os.environ.get("CMUX_CLI")
if explicit and os.path.exists(explicit) and os.access(explicit, os.X_OK):
return explicit
candidates: list[str] = []
candidates.extend(glob.glob(os.path.expanduser("~/Library/Developer/Xcode/DerivedData/*/Build/Products/Debug/cmux")))
candidates.extend(glob.glob("/tmp/cmux-*/Build/Products/Debug/cmux"))
candidates = [p for p in candidates if os.path.exists(p) and os.access(p, os.X_OK)]
if candidates:
candidates.sort(key=os.path.getmtime, reverse=True)
return candidates[0]
in_path = shutil.which("cmux")
if in_path:
return in_path
raise RuntimeError("Unable to find cmux CLI binary. Set CMUX_CLI_BIN.")
def run_claude_hook(
cli_path: str,
socket_path: str,
subcommand: str,
payload: dict,
env: dict[str, str],
) -> str:
proc = subprocess.run(
[cli_path, "--socket", socket_path, "claude-hook", subcommand],
input=json.dumps(payload),
text=True,
capture_output=True,
env=env,
check=False,
)
if proc.returncode != 0:
raise RuntimeError(
f"cmux claude-hook {subcommand} failed:\n"
f"exit={proc.returncode}\nstdout={proc.stdout}\nstderr={proc.stderr}"
)
return proc.stdout.strip()
def wait_for_notification_count(client: cmux, minimum: int, timeout: float = 4.0) -> list[dict]:
start = time.time()
items: list[dict] = []
while time.time() - start < timeout:
items = client.list_notifications()
if len(items) >= minimum:
return items
time.sleep(0.05)
return items
def latest_notification_with_subtitle(items: list[dict], subtitle: str) -> dict | None:
for item in items:
if item.get("subtitle") == subtitle:
return item
return None
def fail(message: str) -> int:
print(f"FAIL: {message}")
return 1
def main() -> int:
try:
cli_path = resolve_cmux_cli()
except Exception as exc:
return fail(str(exc))
state_path = Path(tempfile.gettempdir()) / f"cmux_claude_hook_state_{os.getpid()}.json"
lock_path = Path(str(state_path) + ".lock")
try:
if state_path.exists():
state_path.unlink()
if lock_path.exists():
lock_path.unlink()
except OSError:
pass
project_dir = Path(tempfile.gettempdir()) / f"cmux_claude_map_project_{os.getpid()}"
project_dir.mkdir(parents=True, exist_ok=True)
session_id = f"sess-{uuid.uuid4().hex}"
last_message = "Please approve deploy migration"
try:
with cmux() as client:
client.set_app_focus(False)
client.clear_notifications()
workspace_id = client.new_workspace()
surfaces = client.list_surfaces()
if not surfaces:
return fail("Expected at least one surface in new workspace")
focused = next((s for s in surfaces if s[2]), surfaces[0])
surface_id = focused[1]
hook_env = os.environ.copy()
hook_env["CMUX_SOCKET_PATH"] = client.socket_path
hook_env["CMUX_WORKSPACE_ID"] = workspace_id
hook_env["CMUX_SURFACE_ID"] = surface_id
hook_env["CMUX_CLAUDE_HOOK_STATE_PATH"] = str(state_path)
run_claude_hook(
cli_path,
client.socket_path,
"session-start",
{
"session_id": session_id,
"cwd": str(project_dir),
},
hook_env,
)
if not state_path.exists():
return fail(f"Expected state file at {state_path}")
with state_path.open("r", encoding="utf-8") as f:
state_data = json.load(f)
session_row = (state_data.get("sessions") or {}).get(session_id)
if not session_row:
return fail("Expected mapped session row after session-start")
if session_row.get("workspaceId") != workspace_id:
return fail("Mapped workspaceId did not match active workspace")
if session_row.get("surfaceId") != surface_id:
return fail("Mapped surfaceId did not match active surface")
run_claude_hook(
cli_path,
client.socket_path,
"notification",
{
"session_id": session_id,
"message": last_message,
"type": "permission",
},
hook_env,
)
items = wait_for_notification_count(client, minimum=1)
if not items:
return fail("Expected at least one notification after claude-hook notification")
permission_notification = latest_notification_with_subtitle(items, "Permission")
if permission_notification is None:
return fail("Expected a Permission subtitle notification")
if permission_notification.get("surface_id") != surface_id:
return fail("Expected notification to route to mapped surface")
if last_message not in permission_notification.get("body", ""):
return fail("Expected notification body to include mapped last message")
run_claude_hook(
cli_path,
client.socket_path,
"stop",
{
"session_id": session_id,
},
hook_env,
)
items = wait_for_notification_count(client, minimum=2)
completed_notification = latest_notification_with_subtitle(items, "Completed")
if completed_notification is None:
return fail("Expected a Completed subtitle notification on stop")
body = completed_notification.get("body", "")
if project_dir.name not in body:
return fail("Expected stop notification body to include project directory name")
if "Last:" not in body:
return fail("Expected stop notification body to include last activity summary")
if "approve deploy migration" not in body.lower():
return fail("Expected stop notification body to include last Claude message context")
if completed_notification.get("surface_id") != surface_id:
return fail("Expected stop notification to target mapped surface")
with state_path.open("r", encoding="utf-8") as f:
post_stop_state = json.load(f)
if session_id in (post_stop_state.get("sessions") or {}):
return fail("Expected session mapping to be consumed on stop")
print("PASS: Claude hook session mapping + stop summary notification")
return 0
except (cmuxError, RuntimeError) as exc:
return fail(str(exc))
finally:
try:
if state_path.exists():
state_path.unlink()
if lock_path.exists():
lock_path.unlink()
except OSError:
pass
if __name__ == "__main__":
raise SystemExit(main())