Add regression test for sidebar PR polling
This commit is contained in:
parent
b824147dcb
commit
06cb63cc8d
1 changed files with 214 additions and 0 deletions
214
tests/test_issue_1138_sidebar_pr_polling.py
Normal file
214
tests/test_issue_1138_sidebar_pr_polling.py
Normal file
|
|
@ -0,0 +1,214 @@
|
|||
#!/usr/bin/env python3
|
||||
"""
|
||||
Regression for issue #1138:
|
||||
sidebar PR badges should recover from a transient gh failure without waiting
|
||||
for another prompt, and a failed gh probe should not eagerly clear PR state.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import shutil
|
||||
import socket
|
||||
import subprocess
|
||||
import textwrap
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
class BoundUnixSocket:
|
||||
def __init__(self, path: Path) -> None:
|
||||
self.path = path
|
||||
self.sock: socket.socket | None = None
|
||||
|
||||
def __enter__(self) -> "BoundUnixSocket":
|
||||
self.path.parent.mkdir(parents=True, exist_ok=True)
|
||||
self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
||||
self.sock.bind(str(self.path))
|
||||
self.sock.listen(1)
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc, tb) -> None:
|
||||
if self.sock is not None:
|
||||
self.sock.close()
|
||||
try:
|
||||
self.path.unlink()
|
||||
except FileNotFoundError:
|
||||
pass
|
||||
|
||||
|
||||
def _write_executable(path: Path, contents: str) -> None:
|
||||
path.write_text(contents, encoding="utf-8")
|
||||
path.chmod(0o755)
|
||||
|
||||
|
||||
def _git_stub() -> str:
|
||||
return textwrap.dedent(
|
||||
"""\
|
||||
#!/bin/sh
|
||||
if [ "$1" = "-C" ]; then
|
||||
shift
|
||||
shift
|
||||
fi
|
||||
|
||||
if [ "$1" = "branch" ] && [ "$2" = "--show-current" ]; then
|
||||
printf '%s\\n' 'feature/issue-1138'
|
||||
exit 0
|
||||
fi
|
||||
|
||||
if [ "$1" = "status" ] && [ "$2" = "--porcelain" ] && [ "$3" = "-uno" ]; then
|
||||
exit 0
|
||||
fi
|
||||
|
||||
printf 'unexpected git args: %s\\n' "$*" >&2
|
||||
exit 1
|
||||
"""
|
||||
)
|
||||
|
||||
|
||||
def _gh_stub() -> str:
|
||||
return textwrap.dedent(
|
||||
"""\
|
||||
#!/bin/sh
|
||||
count_file="${CMUX_TEST_GH_COUNT_FILE:?}"
|
||||
count=0
|
||||
if [ -f "$count_file" ]; then
|
||||
count="$(cat "$count_file")"
|
||||
fi
|
||||
count=$((count + 1))
|
||||
printf '%s\\n' "$count" > "$count_file"
|
||||
|
||||
if [ "$count" -eq 1 ]; then
|
||||
printf 'rate limit exceeded\\n' >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
printf '1138\\tOPEN\\thttps://github.com/manaflow-ai/cmux/pull/1138\\n'
|
||||
"""
|
||||
)
|
||||
|
||||
|
||||
def _shell_command(kind: str) -> str:
|
||||
if kind == "zsh":
|
||||
return textwrap.dedent(
|
||||
"""\
|
||||
source "$CMUX_TEST_SCRIPT"
|
||||
_cmux_send() { print -r -- "$1" >> "$CMUX_TEST_SEND_LOG"; }
|
||||
cd "$CMUX_TEST_REPO"
|
||||
_CMUX_PR_POLL_INTERVAL=1
|
||||
_cmux_precmd
|
||||
sleep 3
|
||||
_cmux_zshexit
|
||||
"""
|
||||
)
|
||||
|
||||
if kind == "bash":
|
||||
return textwrap.dedent(
|
||||
"""\
|
||||
source "$CMUX_TEST_SCRIPT"
|
||||
_cmux_send() { printf '%s\\n' "$1" >> "$CMUX_TEST_SEND_LOG"; }
|
||||
cd "$CMUX_TEST_REPO"
|
||||
_CMUX_PR_POLL_INTERVAL=1
|
||||
_cmux_prompt_command
|
||||
sleep 3
|
||||
type _cmux_bash_cleanup >/dev/null 2>&1 && _cmux_bash_cleanup
|
||||
"""
|
||||
)
|
||||
|
||||
raise ValueError(f"Unsupported shell kind: {kind}")
|
||||
|
||||
|
||||
def _run_case(base: Path, *, shell: str, shell_args: list[str], script: Path) -> tuple[int, str]:
|
||||
bindir = base / "bin"
|
||||
repo = base / "repo"
|
||||
repo_git = repo / ".git"
|
||||
socket_path = base / "cmux.sock"
|
||||
send_log = base / f"{shell}-send.log"
|
||||
gh_count_file = base / f"{shell}-gh-count.txt"
|
||||
|
||||
bindir.mkdir(parents=True, exist_ok=True)
|
||||
repo_git.mkdir(parents=True, exist_ok=True)
|
||||
(repo_git / "HEAD").write_text("ref: refs/heads/feature/issue-1138\n", encoding="utf-8")
|
||||
_write_executable(bindir / "git", _git_stub())
|
||||
_write_executable(bindir / "gh", _gh_stub())
|
||||
|
||||
env = dict(os.environ)
|
||||
env["PATH"] = f"{bindir}:{env.get('PATH', '')}"
|
||||
env["CMUX_SOCKET_PATH"] = str(socket_path)
|
||||
env["CMUX_TAB_ID"] = "00000000-0000-0000-0000-000000000001"
|
||||
env["CMUX_PANEL_ID"] = "00000000-0000-0000-0000-000000000002"
|
||||
env["CMUX_TEST_SCRIPT"] = str(script)
|
||||
env["CMUX_TEST_REPO"] = str(repo)
|
||||
env["CMUX_TEST_SEND_LOG"] = str(send_log)
|
||||
env["CMUX_TEST_GH_COUNT_FILE"] = str(gh_count_file)
|
||||
|
||||
with BoundUnixSocket(socket_path):
|
||||
result = subprocess.run(
|
||||
[shell, *shell_args, _shell_command(shell)],
|
||||
env=env,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=10,
|
||||
)
|
||||
|
||||
output = (result.stdout or "") + (result.stderr or "")
|
||||
if result.returncode != 0:
|
||||
return (result.returncode, output)
|
||||
|
||||
send_lines = []
|
||||
if send_log.exists():
|
||||
send_lines = [line.strip() for line in send_log.read_text(encoding="utf-8").splitlines() if line.strip()]
|
||||
|
||||
gh_count = 0
|
||||
if gh_count_file.exists():
|
||||
gh_count = int(gh_count_file.read_text(encoding="utf-8").strip() or "0")
|
||||
|
||||
report_line = (
|
||||
"report_pr 1138 https://github.com/manaflow-ai/cmux/pull/1138 "
|
||||
"--state=open --tab=00000000-0000-0000-0000-000000000001 "
|
||||
"--panel=00000000-0000-0000-0000-000000000002"
|
||||
)
|
||||
if gh_count < 2:
|
||||
return (1, f"{shell}: expected at least 2 gh probes while idle, saw {gh_count}")
|
||||
if any(line.startswith("clear_pr ") for line in send_lines):
|
||||
return (1, f"{shell}: transient gh failure should not send clear_pr\n" + "\n".join(send_lines))
|
||||
if report_line not in send_lines:
|
||||
return (1, f"{shell}: expected recovered report_pr payload\n" + "\n".join(send_lines))
|
||||
|
||||
return (0, f"{shell}: ok")
|
||||
|
||||
|
||||
def main() -> int:
|
||||
root = Path(__file__).resolve().parents[1]
|
||||
cases = [
|
||||
("zsh", ["-f", "-c"], root / "Resources" / "shell-integration" / "cmux-zsh-integration.zsh"),
|
||||
("bash", ["--noprofile", "--norc", "-c"], root / "Resources" / "shell-integration" / "cmux-bash-integration.bash"),
|
||||
]
|
||||
|
||||
base = Path("/tmp") / f"cmux_issue_1138_pr_poll_{os.getpid()}"
|
||||
try:
|
||||
shutil.rmtree(base, ignore_errors=True)
|
||||
base.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
failures: list[str] = []
|
||||
for shell, shell_args, script in cases:
|
||||
if not script.exists():
|
||||
print(f"SKIP: missing integration script at {script}")
|
||||
continue
|
||||
rc, detail = _run_case(base / shell, shell=shell, shell_args=shell_args, script=script)
|
||||
if rc != 0:
|
||||
failures.append(detail)
|
||||
|
||||
if failures:
|
||||
print("FAIL:")
|
||||
for failure in failures:
|
||||
print(failure)
|
||||
return 1
|
||||
|
||||
print("PASS: shell integrations keep polling PR state and recover after transient gh failures")
|
||||
return 0
|
||||
finally:
|
||||
shutil.rmtree(base, ignore_errors=True)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
Loading…
Add table
Add a link
Reference in a new issue