cmux/tests/test_issue_1138_sidebar_pr_polling.py
2026-03-10 00:40:57 -07:00

397 lines
13 KiB
Python

#!/usr/bin/env python3
"""
Regression coverage for issue #1138.
Validates that shell integration:
1) keeps polling PR state while idle and recovers after a transient gh failure
2) resolves the current branch PR via `gh pr view` instead of repository-wide
branch-name matching
3) clears stale PR state when the branch changes and the new probe fails
4) recovers when a gh probe wedges longer than the async timeout
5) keeps polling in bash after prompt-render helper commands run
6) tears down the timed-out gh probe instead of leaking it in the background
"""
from __future__ import annotations
import os
import shutil
import socket
import subprocess
import textwrap
from pathlib import Path
class BoundUnixSocket:
def __init__(self, path: Path) -> None:
self.path = path
self.sock: socket.socket | None = None
def __enter__(self) -> "BoundUnixSocket":
self.path.parent.mkdir(parents=True, exist_ok=True)
self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
self.sock.bind(str(self.path))
self.sock.listen(1)
return self
def __exit__(self, exc_type, exc, tb) -> None:
if self.sock is not None:
self.sock.close()
try:
self.path.unlink()
except FileNotFoundError:
pass
def _write_executable(path: Path, contents: str) -> None:
path.write_text(contents, encoding="utf-8")
path.chmod(0o755)
def _git_stub() -> str:
return textwrap.dedent(
"""\
#!/bin/sh
repo_path="$PWD"
if [ "$1" = "-C" ]; then
repo_path="$2"
shift
shift
fi
head_file="$repo_path/.git/HEAD"
branch=""
if [ -f "$head_file" ]; then
head_line="$(cat "$head_file")"
case "$head_line" in
ref:\ refs/heads/*)
branch="${head_line#ref: refs/heads/}"
;;
esac
fi
if [ "$1" = "branch" ] && [ "$2" = "--show-current" ]; then
if [ -n "$branch" ]; then
printf '%s\\n' "$branch"
fi
exit 0
fi
if [ "$1" = "status" ] && [ "$2" = "--porcelain" ] && [ "$3" = "-uno" ]; then
exit 0
fi
printf 'unexpected git args: %s\\n' "$*" >&2
exit 1
"""
)
def _gh_stub() -> str:
return textwrap.dedent(
"""\
#!/bin/sh
args_log="${CMUX_TEST_GH_ARGS_LOG:?}"
count_file="${CMUX_TEST_GH_COUNT_FILE:?}"
pid_file="${CMUX_TEST_GH_PID_FILE:-}"
scenario="${CMUX_TEST_SCENARIO:?}"
head_file="${CMUX_TEST_HEAD_FILE:?}"
printf '%s\\n' "$*" >> "$args_log"
count=0
if [ -f "$count_file" ]; then
count="$(cat "$count_file")"
fi
count=$((count + 1))
printf '%s\\n' "$count" > "$count_file"
if [ "$1" != "pr" ] || [ "$2" != "view" ]; then
printf 'unexpected gh args: %s\\n' "$*" >&2
exit 9
fi
branch=""
if [ -f "$head_file" ]; then
head_line="$(cat "$head_file")"
case "$head_line" in
ref:\ refs/heads/*)
branch="${head_line#ref: refs/heads/}"
;;
esac
fi
case "$scenario" in
prompt_helper_idle)
printf '1138\\tOPEN\\thttps://github.com/manaflow-ai/cmux/pull/1138\\n'
;;
transient_same_context)
if [ "$count" -eq 1 ]; then
printf 'rate limit exceeded\\n' >&2
exit 1
fi
printf '1138\\tOPEN\\thttps://github.com/manaflow-ai/cmux/pull/1138\\n'
;;
branch_switch_clear)
if [ "$branch" = "feature/old" ]; then
printf '111\\tOPEN\\thttps://github.com/manaflow-ai/cmux/pull/111\\n'
exit 0
fi
if [ "$branch" = "feature/new" ]; then
printf 'network unavailable\\n' >&2
exit 1
fi
printf 'no pull requests found for branch "%s"\\n' "$branch" >&2
exit 1
;;
timeout_recovery)
if [ "$count" -eq 1 ]; then
if [ -n "$pid_file" ]; then
printf '%s\\n' "$$" > "$pid_file"
fi
sleep "${CMUX_TEST_HANG_SECONDS:-4}"
exit 0
fi
printf '1138\\tOPEN\\thttps://github.com/manaflow-ai/cmux/pull/1138\\n'
;;
*)
printf 'unknown scenario: %s\\n' "$scenario" >&2
exit 2
;;
esac
"""
)
def _shell_command(kind: str, scenario: str) -> str:
shared = {
"prompt_helper_idle": (
'cd "$CMUX_TEST_REPO"\n'
'_CMUX_PR_POLL_INTERVAL=1\n'
'_cmux_prompt_entry\n'
': "$(/bin/printf helper)"\n'
'sleep 3\n'
'_cmux_cleanup\n'
),
"transient_same_context": (
'cd "$CMUX_TEST_REPO"\n'
'_CMUX_PR_POLL_INTERVAL=1\n'
'_cmux_prompt_entry\n'
'sleep 3\n'
'_cmux_cleanup\n'
),
"branch_switch_clear": (
'cd "$CMUX_TEST_REPO"\n'
'_CMUX_PR_POLL_INTERVAL=10\n'
'_cmux_prompt_entry\n'
'sleep 1\n'
'printf \'ref: refs/heads/feature/new\\n\' > "$CMUX_TEST_HEAD_FILE"\n'
'_cmux_prompt_entry\n'
'sleep 2\n'
'_cmux_cleanup\n'
),
"timeout_recovery": (
'cd "$CMUX_TEST_REPO"\n'
'_CMUX_PR_POLL_INTERVAL=1\n'
'_CMUX_ASYNC_JOB_TIMEOUT=1\n'
'_cmux_prompt_entry\n'
'sleep 4\n'
'_cmux_cleanup\n'
),
}[scenario]
if kind == "zsh":
return textwrap.dedent(
f"""\
source "$CMUX_TEST_SCRIPT"
_cmux_send() {{ print -r -- "$1" >> "$CMUX_TEST_SEND_LOG"; }}
_cmux_prompt_entry() {{ _cmux_precmd; }}
_cmux_cleanup() {{ _cmux_zshexit; }}
{shared}"""
)
if kind == "bash":
return textwrap.dedent(
f"""\
source "$CMUX_TEST_SCRIPT"
_cmux_send() {{ printf '%s\\n' "$1" >> "$CMUX_TEST_SEND_LOG"; }}
_cmux_prompt_entry() {{ _cmux_prompt_command; }}
_cmux_cleanup() {{ type _cmux_bash_cleanup >/dev/null 2>&1 && _cmux_bash_cleanup; }}
{shared}"""
)
raise ValueError(f"Unsupported shell kind: {kind}")
def _read_lines(path: Path) -> list[str]:
if not path.exists():
return []
return [line.strip() for line in path.read_text(encoding="utf-8").splitlines() if line.strip()]
def _report_line(number: int) -> str:
return (
f"report_pr {number} https://github.com/manaflow-ai/cmux/pull/{number} "
"--state=open --tab=00000000-0000-0000-0000-000000000001 "
"--panel=00000000-0000-0000-0000-000000000002"
)
def _pid_exists(pid: int) -> bool:
try:
os.kill(pid, 0)
except ProcessLookupError:
return False
except PermissionError:
return True
return True
def _run_case(base: Path, *, shell: str, shell_args: list[str], script: Path, scenario: str) -> tuple[int, str]:
bindir = base / "bin"
repo = base / "repo"
repo_git = repo / ".git"
socket_path = base / "cmux.sock"
send_log = base / f"{shell}-{scenario}-send.log"
gh_count_file = base / f"{shell}-{scenario}-gh-count.txt"
gh_args_log = base / f"{shell}-{scenario}-gh-args.log"
gh_pid_file = base / f"{shell}-{scenario}-gh-pid.txt"
head_file = repo_git / "HEAD"
bindir.mkdir(parents=True, exist_ok=True)
repo_git.mkdir(parents=True, exist_ok=True)
initial_branch = "feature/old" if scenario == "branch_switch_clear" else "feature/issue-1138"
head_file.write_text(f"ref: refs/heads/{initial_branch}\n", encoding="utf-8")
_write_executable(bindir / "git", _git_stub())
_write_executable(bindir / "gh", _gh_stub())
env = dict(os.environ)
env["PATH"] = f"{bindir}:{env.get('PATH', '')}"
env["CMUX_SOCKET_PATH"] = str(socket_path)
env["CMUX_TAB_ID"] = "00000000-0000-0000-0000-000000000001"
env["CMUX_PANEL_ID"] = "00000000-0000-0000-0000-000000000002"
env["CMUX_TEST_SCRIPT"] = str(script)
env["CMUX_TEST_REPO"] = str(repo)
env["CMUX_TEST_SEND_LOG"] = str(send_log)
env["CMUX_TEST_GH_COUNT_FILE"] = str(gh_count_file)
env["CMUX_TEST_GH_ARGS_LOG"] = str(gh_args_log)
env["CMUX_TEST_GH_PID_FILE"] = str(gh_pid_file)
env["CMUX_TEST_SCENARIO"] = scenario
env["CMUX_TEST_HEAD_FILE"] = str(head_file)
env["CMUX_TEST_HANG_SECONDS"] = "4"
with BoundUnixSocket(socket_path):
result = subprocess.run(
[shell, *shell_args, _shell_command(shell, scenario)],
env=env,
capture_output=True,
text=True,
timeout=12,
)
combined_output = (result.stdout or "") + (result.stderr or "")
if result.returncode != 0:
return (result.returncode, combined_output)
send_lines = _read_lines(send_log)
gh_args_lines = _read_lines(gh_args_log)
gh_count = int((gh_count_file.read_text(encoding="utf-8").strip() or "0")) if gh_count_file.exists() else 0
if not gh_args_lines:
return (1, f"{shell}/{scenario}: expected at least one gh invocation")
if any(not line.startswith("pr view ") for line in gh_args_lines):
return (1, f"{shell}/{scenario}: expected gh pr view only\n" + "\n".join(gh_args_lines))
if scenario == "prompt_helper_idle":
if gh_count < 2:
return (1, f"{shell}/{scenario}: expected idle polling to survive prompt helpers, saw {gh_count}")
if _report_line(1138) not in send_lines:
return (1, f"{shell}/{scenario}: missing report_pr payload\n" + "\n".join(send_lines))
return (0, f"{shell}/{scenario}: ok")
if scenario == "transient_same_context":
if gh_count < 2:
return (1, f"{shell}/{scenario}: expected at least 2 gh probes while idle, saw {gh_count}")
if any(line.startswith("clear_pr ") for line in send_lines):
return (1, f"{shell}/{scenario}: transient failure should not clear PR state\n" + "\n".join(send_lines))
if _report_line(1138) not in send_lines:
return (1, f"{shell}/{scenario}: expected recovered report_pr payload\n" + "\n".join(send_lines))
return (0, f"{shell}/{scenario}: ok")
if scenario == "branch_switch_clear":
old_report = _report_line(111)
if old_report not in send_lines:
return (1, f"{shell}/{scenario}: missing old-branch report\n" + "\n".join(send_lines))
try:
old_index = send_lines.index(old_report)
except ValueError:
return (1, f"{shell}/{scenario}: missing old-branch report\n" + "\n".join(send_lines))
clear_indices = [idx for idx, line in enumerate(send_lines) if line.startswith("clear_pr ")]
if not clear_indices:
return (1, f"{shell}/{scenario}: expected clear_pr after branch change\n" + "\n".join(send_lines))
if clear_indices[0] <= old_index:
return (1, f"{shell}/{scenario}: clear_pr happened before old report\n" + "\n".join(send_lines))
return (0, f"{shell}/{scenario}: ok")
if scenario == "timeout_recovery":
if gh_count < 2:
return (1, f"{shell}/{scenario}: expected timed-out probe to be retried, saw {gh_count}")
if _report_line(1138) not in send_lines:
return (1, f"{shell}/{scenario}: missing report_pr after timeout recovery\n" + "\n".join(send_lines))
if gh_pid_file.exists():
gh_pid = int(gh_pid_file.read_text(encoding="utf-8").strip() or "0")
if gh_pid > 0 and _pid_exists(gh_pid):
return (1, f"{shell}/{scenario}: timed-out gh probe still running as pid {gh_pid}")
return (0, f"{shell}/{scenario}: ok")
return (1, f"{shell}/{scenario}: unhandled scenario")
def main() -> int:
root = Path(__file__).resolve().parents[1]
cases = [
("zsh", ["-f", "-c"], root / "Resources" / "shell-integration" / "cmux-zsh-integration.zsh"),
("bash", ["--noprofile", "--norc", "-c"], root / "Resources" / "shell-integration" / "cmux-bash-integration.bash"),
]
scenarios = [
"prompt_helper_idle",
"transient_same_context",
"branch_switch_clear",
"timeout_recovery",
]
base = Path("/tmp") / f"cmux_issue_1138_pr_poll_{os.getpid()}"
try:
shutil.rmtree(base, ignore_errors=True)
base.mkdir(parents=True, exist_ok=True)
failures: list[str] = []
for shell, shell_args, script in cases:
if not script.exists():
print(f"SKIP: missing integration script at {script}")
continue
for scenario in scenarios:
rc, detail = _run_case(
base / f"{shell}-{scenario}",
shell=shell,
shell_args=shell_args,
script=script,
scenario=scenario,
)
if rc != 0:
failures.append(detail)
if failures:
print("FAIL:")
for failure in failures:
print(failure)
return 1
print("PASS: shell integrations poll PR state robustly across transient failures, branch changes, and timeouts")
return 0
finally:
shutil.rmtree(base, ignore_errors=True)
if __name__ == "__main__":
raise SystemExit(main())