Sidebar metadata + tagged reload isolation (#16)

* Sidebar primitives + tagged dev isolation

* Allow wider sidebar resize

* Fix tagged socket selection + panel id errors

* Fix progress label quoting + bundle suffix sanitize

* Skip ctrl-enter keybind test when keystrokes blocked

* Fix shell nc hang + prune stale per-surface sidebar metadata
This commit is contained in:
Lawrence Chen 2026-02-06 18:09:56 -08:00 committed by GitHub
parent b3c2a8c7c3
commit 7e69751e1b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
22 changed files with 2538 additions and 72 deletions

View file

@ -33,6 +33,8 @@ import select
import os
import time
import errno
import glob
import re
from typing import Optional, List, Tuple, Union
@ -41,24 +43,136 @@ class cmuxError(Exception):
pass
def _default_socket_path() -> str:
override = os.environ.get("CMUX_SOCKET_PATH")
_LAST_SOCKET_PATH_FILE = "/tmp/cmuxterm-last-socket-path"
_DEFAULT_DEBUG_BUNDLE_ID = "com.cmuxterm.app.debug"
def _sanitize_tag_slug(raw: str) -> str:
cleaned = re.sub(r"[^a-z0-9]+", "-", (raw or "").strip().lower())
cleaned = re.sub(r"-+", "-", cleaned).strip("-")
return cleaned or "agent"
def _sanitize_bundle_suffix(raw: str) -> str:
# Must match scripts/reload.sh sanitize_bundle() so tagged tests can
# reliably target the correct app via AppleScript.
cleaned = re.sub(r"[^a-z0-9]+", ".", (raw or "").strip().lower())
cleaned = re.sub(r"\.+", ".", cleaned).strip(".")
return cleaned or "agent"
def _quote_option_value(value: str) -> str:
# Must match TerminalController.parseOptions() quoting rules.
escaped = (value or "").replace("\\", "\\\\").replace('"', '\\"')
return f"\"{escaped}\""
def _default_bundle_id() -> str:
override = os.environ.get("CMUX_BUNDLE_ID") or os.environ.get("CMUXTERM_BUNDLE_ID")
if override:
return override
tag = os.environ.get("CMUX_TAG") or os.environ.get("CMUXTERM_TAG")
if tag:
suffix = _sanitize_bundle_suffix(tag)
return f"{_DEFAULT_DEBUG_BUNDLE_ID}.{suffix}"
return _DEFAULT_DEBUG_BUNDLE_ID
def _read_last_socket_path() -> Optional[str]:
try:
with open(_LAST_SOCKET_PATH_FILE, "r", encoding="utf-8") as f:
path = f.read().strip()
if path:
return path
except OSError:
pass
return None
def _can_connect(path: str, timeout: float = 0.15, retries: int = 4) -> bool:
# Best-effort check to avoid getting stuck on stale socket files.
for _ in range(max(1, retries)):
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
try:
s.settimeout(timeout)
s.connect(path)
return True
except OSError:
time.sleep(0.05)
finally:
try:
s.close()
except Exception:
pass
return False
def _default_socket_path() -> str:
tag = os.environ.get("CMUX_TAG") or os.environ.get("CMUXTERM_TAG")
if tag:
slug = _sanitize_tag_slug(tag)
tagged_candidates = [
f"/tmp/cmuxterm-debug-{slug}.sock",
f"/tmp/cmuxterm-{slug}.sock",
]
for path in tagged_candidates:
if os.path.exists(path) and _can_connect(path):
return path
# If nothing is connectable yet (e.g. the app is still starting),
# fall back to the first existing candidate.
for path in tagged_candidates:
if os.path.exists(path):
return path
# Prefer the debug naming convention when we have to guess.
return tagged_candidates[0]
override = os.environ.get("CMUX_SOCKET_PATH")
if override:
if os.path.exists(override) and _can_connect(override):
return override
# Fall back to other heuristics if the override points at a stale socket file.
if not os.path.exists(override):
return override
last_socket = _read_last_socket_path()
if last_socket:
if os.path.exists(last_socket) and _can_connect(last_socket):
return last_socket
# Prefer the non-tagged sockets when present.
candidates = ["/tmp/cmuxterm-debug.sock", "/tmp/cmuxterm.sock"]
for path in candidates:
if os.path.exists(path):
if os.path.exists(path) and _can_connect(path):
return path
# Otherwise, fall back to the newest tagged debug socket if there is one.
tagged = glob.glob("/tmp/cmuxterm-debug-*.sock")
tagged = [p for p in tagged if os.path.exists(p)]
if tagged:
tagged.sort(key=lambda p: os.path.getmtime(p), reverse=True)
for p in tagged:
if _can_connect(p, timeout=0.1, retries=2):
return p
return candidates[0]
class cmux:
"""Client for controlling cmux via Unix socket"""
DEFAULT_SOCKET_PATH = _default_socket_path()
@staticmethod
def default_socket_path() -> str:
return _default_socket_path()
@staticmethod
def default_bundle_id() -> str:
return _default_bundle_id()
def __init__(self, socket_path: str = None):
self.socket_path = socket_path or self.DEFAULT_SOCKET_PATH
# Resolve at init time so imports don't "lock in" a stale path.
self.socket_path = socket_path or _default_socket_path()
self._socket: Optional[socket.socket] = None
self._recv_buffer: str = ""
@ -359,6 +473,107 @@ class cmux:
if not response.startswith("OK"):
raise cmuxError(response)
def set_status(self, key: str, value: str, icon: str = None, color: str = None, tab: str = None) -> None:
"""Set a sidebar status entry."""
cmd = f"set_status {key} {value}"
if icon:
cmd += f" --icon={icon}"
if color:
cmd += f" --color={color}"
if tab:
cmd += f" --tab={tab}"
response = self._send_command(cmd)
if not response.startswith("OK"):
raise cmuxError(response)
def clear_status(self, key: str, tab: str = None) -> None:
"""Remove a sidebar status entry."""
cmd = f"clear_status {key}"
if tab:
cmd += f" --tab={tab}"
response = self._send_command(cmd)
if not response.startswith("OK"):
raise cmuxError(response)
def log(self, message: str, level: str = None, source: str = None, tab: str = None) -> None:
"""Append a sidebar log entry."""
cmd = f"log {message}"
if level:
cmd += f" --level={level}"
if source:
cmd += f" --source={source}"
if tab:
cmd += f" --tab={tab}"
response = self._send_command(cmd)
if not response.startswith("OK"):
raise cmuxError(response)
def set_progress(self, value: float, label: str = None, tab: str = None) -> None:
"""Set sidebar progress bar (0.0-1.0)."""
cmd = f"set_progress {value}"
if label:
cmd += f" --label={_quote_option_value(label)}"
if tab:
cmd += f" --tab={tab}"
response = self._send_command(cmd)
if not response.startswith("OK"):
raise cmuxError(response)
def clear_progress(self, tab: str = None) -> None:
"""Clear sidebar progress bar."""
cmd = "clear_progress"
if tab:
cmd += f" --tab={tab}"
response = self._send_command(cmd)
if not response.startswith("OK"):
raise cmuxError(response)
def report_git_branch(self, branch: str, status: str = None, tab: str = None) -> None:
"""Report git branch for sidebar display."""
cmd = f"report_git_branch {branch}"
if status:
cmd += f" --status={status}"
if tab:
cmd += f" --tab={tab}"
response = self._send_command(cmd)
if not response.startswith("OK"):
raise cmuxError(response)
def report_ports(self, *ports: int, tab: str = None) -> None:
"""Report listening ports for sidebar display."""
port_str = " ".join(str(p) for p in ports)
cmd = f"report_ports {port_str}"
if tab:
cmd += f" --tab={tab}"
response = self._send_command(cmd)
if not response.startswith("OK"):
raise cmuxError(response)
def clear_ports(self, tab: str = None) -> None:
"""Clear listening ports for sidebar display."""
cmd = "clear_ports"
if tab:
cmd += f" --tab={tab}"
response = self._send_command(cmd)
if not response.startswith("OK"):
raise cmuxError(response)
def sidebar_state(self, tab: str = None) -> str:
"""Dump all sidebar metadata for a tab."""
cmd = "sidebar_state"
if tab:
cmd += f" --tab={tab}"
return self._send_command(cmd)
def reset_sidebar(self, tab: str = None) -> None:
"""Clear all sidebar metadata for a tab."""
cmd = "reset_sidebar"
if tab:
cmd += f" --tab={tab}"
response = self._send_command(cmd)
if not response.startswith("OK"):
raise cmuxError(response)
def focus_notification(self, tab: Union[str, int], surface: Union[str, int, None] = None) -> None:
"""Focus tab/surface using the notification flow."""
if surface is None:
@ -391,8 +606,8 @@ def main():
parser = argparse.ArgumentParser(description="cmux CLI")
parser.add_argument("command", nargs="?", help="Command to send")
parser.add_argument("args", nargs="*", help="Command arguments")
parser.add_argument("-s", "--socket", default=cmux.DEFAULT_SOCKET_PATH,
help="Socket path")
parser.add_argument("-s", "--socket", default=None,
help="Socket path (default: auto-detect)")
args = parser.parse_args()

View file

@ -41,6 +41,32 @@ MONITOR_DURATION = 3.0
def get_cmuxterm_pid() -> Optional[int]:
"""Get the PID of the running cmuxterm process."""
socket_path = os.environ.get("CMUX_SOCKET_PATH")
if not socket_path:
# Ask cmux.py to resolve default socket path (supports CMUX_TAG and last-socket file).
try:
socket_path = cmux().socket_path
except Exception:
socket_path = None
if socket_path and os.path.exists(socket_path):
result = subprocess.run(
["lsof", "-t", socket_path],
capture_output=True,
text=True,
)
if result.returncode == 0:
for line in result.stdout.strip().split("\n"):
line = line.strip()
if not line:
continue
try:
pid = int(line)
except ValueError:
continue
if pid != os.getpid():
return pid
result = subprocess.run(
["pgrep", "-f", r"cmuxterm\.app/Contents/MacOS/cmuxterm$"],
capture_output=True,
@ -141,6 +167,14 @@ def test_cpu_after_popover_close(client: cmux, pid: int) -> tuple[bool, str]:
time.sleep(0.1)
time.sleep(0.5)
# Ensure the correct cmuxterm instance is frontmost (tag-safe).
bundle_id = cmux.default_bundle_id()
subprocess.run(
["osascript", "-e", f'tell application id "{bundle_id}" to activate'],
capture_output=True,
)
time.sleep(0.2)
# Simulate opening and closing the popover via keyboard shortcut
# We can't directly control the popover, but we can toggle it
subprocess.run([
@ -212,6 +246,8 @@ def main():
print("cmuxterm Notification CPU Tests")
print("=" * 60)
socket_path = cmux().socket_path
pid = get_cmuxterm_pid()
if pid is None:
print("\n❌ SKIP: cmuxterm is not running")
@ -220,20 +256,13 @@ def main():
print(f"\nFound cmuxterm process: PID {pid}")
# Try to connect to the socket
socket_paths = ["/tmp/cmuxterm.sock", "/tmp/cmuxterm-debug.sock"]
client = None
for socket_path in socket_paths:
if os.path.exists(socket_path):
try:
client = cmux(socket_path)
client.connect()
print(f"Connected to {socket_path}")
break
except cmuxError:
continue
if client is None:
print(f"\n❌ SKIP: Could not connect to cmuxterm socket")
client = cmux(socket_path)
try:
client.connect()
print(f"Connected to {socket_path}")
except cmuxError:
print("\n❌ SKIP: Could not connect to cmuxterm socket")
print("Tip: set CMUX_TAG=<tag> or CMUX_SOCKET_PATH=<path> to target a tagged instance.")
return 0
results = []

View file

@ -19,9 +19,15 @@ import subprocess
import sys
import time
import re
import os
from pathlib import Path
from typing import List, Optional
# Allow importing tests/cmux.py when running from repo root.
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from cmux import cmux
# Maximum acceptable CPU usage during idle (percentage)
MAX_IDLE_CPU_PERCENT = 15.0
@ -45,6 +51,31 @@ SUSPICIOUS_PATTERNS = [
def get_cmuxterm_pid() -> Optional[int]:
"""Get the PID of the running cmuxterm process."""
socket_path = os.environ.get("CMUX_SOCKET_PATH")
if not socket_path:
try:
socket_path = cmux().socket_path
except Exception:
socket_path = None
if socket_path and os.path.exists(socket_path):
result = subprocess.run(
["lsof", "-t", socket_path],
capture_output=True,
text=True,
)
if result.returncode == 0:
for line in result.stdout.strip().split("\n"):
line = line.strip()
if not line:
continue
try:
pid = int(line)
except ValueError:
continue
if pid != os.getpid():
return pid
result = subprocess.run(
["pgrep", "-f", r"cmuxterm\.app/Contents/MacOS/cmuxterm$"],
capture_output=True,
@ -156,7 +187,7 @@ def main():
print(f" - {issue}")
# Save sample for debugging
sample_file = Path("/tmp/cmuxterm_cpu_test_sample.txt")
sample_file = Path(f"/tmp/cmuxterm_cpu_test_sample_{pid}.txt")
sample_file.write_text(sample_output)
print(f"\nFull sample saved to: {sample_file}")

View file

@ -21,8 +21,26 @@ sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from cmux import cmux, cmuxError
def run_osascript(script: str) -> None:
subprocess.run(["osascript", "-e", script], check=True)
def run_osascript(script: str) -> subprocess.CompletedProcess[str]:
# Use capture_output so we can detect common permission failures and skip.
result = subprocess.run(
["osascript", "-e", script],
capture_output=True,
text=True,
)
if result.returncode != 0:
raise subprocess.CalledProcessError(
result.returncode,
result.args,
output=result.stdout,
stderr=result.stderr,
)
return result
def is_keystroke_permission_error(err: subprocess.CalledProcessError) -> bool:
text = f"{getattr(err, 'stderr', '') or ''}\n{getattr(err, 'output', '') or ''}"
return "not allowed to send keystrokes" in text or "(1002)" in text
def has_ctrl_enter_keybind(config_text: str) -> bool:
@ -63,34 +81,36 @@ def test_ctrl_enter_keybind(client: cmux) -> tuple[bool, str]:
new_tab_id = client.new_tab()
client.select_tab(new_tab_id)
time.sleep(0.3)
# Make sure the app is focused for keystrokes
run_osascript('tell application "cmuxterm" to activate')
time.sleep(0.2)
# Clear any running command
try:
client.send_key("ctrl-c")
# Make sure the app is focused for keystrokes
bundle_id = cmux.default_bundle_id()
run_osascript(f'tell application id "{bundle_id}" to activate')
time.sleep(0.2)
except Exception:
pass
# Type the command (without pressing Enter)
run_osascript(f'tell application "System Events" to keystroke "touch {marker}"')
time.sleep(0.1)
# Clear any running command
try:
client.send_key("ctrl-c")
time.sleep(0.2)
except Exception:
pass
# Send Ctrl+Enter (key code 36 = Return)
run_osascript('tell application "System Events" to key code 36 using control down')
time.sleep(0.5)
# Type the command (without pressing Enter)
run_osascript(f'tell application "System Events" to keystroke "touch {marker}"')
time.sleep(0.1)
ok = marker.exists()
if ok:
marker.unlink(missing_ok=True)
try:
client.close_tab(new_tab_id)
except Exception:
pass
return ok, ("Ctrl+Enter keybind executed command" if ok else "Marker not created by Ctrl+Enter")
# Send Ctrl+Enter (key code 36 = Return)
run_osascript('tell application "System Events" to key code 36 using control down')
time.sleep(0.5)
ok = marker.exists()
return ok, ("Ctrl+Enter keybind executed command" if ok else "Marker not created by Ctrl+Enter")
finally:
if marker.exists():
marker.unlink(missing_ok=True)
try:
client.close_tab(new_tab_id)
except Exception:
pass
def run_tests() -> int:
@ -99,19 +119,17 @@ def run_tests() -> int:
print("=" * 60)
print()
socket_path = cmux.DEFAULT_SOCKET_PATH
socket_path = cmux.default_socket_path()
if not os.path.exists(socket_path):
print(f"Error: Socket not found at {socket_path}")
print("Please make sure cmuxterm is running.")
return 1
print(f"SKIP: Socket not found at {socket_path}")
print("Tip: start cmuxterm first (or set CMUX_TAG / CMUX_SOCKET_PATH).")
return 0
config_path = find_config_with_keybind()
if not config_path:
print("Error: Required keybind not found in Ghostty config.")
print("Add a line like:")
print(" keybind = ctrl+enter=text:\\r")
print("Then restart cmuxterm and re-run this test.")
return 1
print("SKIP: Required keybind not found in Ghostty config.")
print("Expected a line like: keybind = ctrl+enter=text:\\r")
return 0
print(f"Using keybind from: {config_path}")
print()
@ -123,10 +141,17 @@ def run_tests() -> int:
print(f"{status} {message}")
return 0 if ok else 1
except cmuxError as e:
print(f"Error: {e}")
return 1
print(f"SKIP: {e}")
return 0
except subprocess.CalledProcessError as e:
if is_keystroke_permission_error(e):
print("SKIP: osascript/System Events not allowed to send keystrokes (Accessibility permission missing)")
return 0
print(f"Error: osascript failed: {e}")
if getattr(e, "stderr", None):
print(e.stderr.strip())
if getattr(e, "output", None):
print(e.output.strip())
return 1

View file

@ -271,10 +271,11 @@ def run_tests():
print("=" * 60)
print()
socket_path = cmux.DEFAULT_SOCKET_PATH
socket_path = cmux().socket_path
if not os.path.exists(socket_path):
print(f"Error: Socket not found at {socket_path}")
print("Please make sure cmux is running.")
print("Tip: set CMUX_TAG=<tag> or CMUX_SOCKET_PATH=<path> to target a tagged instance.")
return 1
results = []

View file

@ -0,0 +1,170 @@
#!/usr/bin/env python3
"""
End-to-end test for sidebar CWD + git branch updates.
This specifically covers the regression where the sidebar directory can get
stuck (e.g. showing "~" even after multiple `cd`s).
Run with a tagged instance to avoid unix socket conflicts:
CMUX_TAG=<tag> python3 tests/test_sidebar_cwd_git.py
"""
from __future__ import annotations
import os
import shutil
import subprocess
import sys
import time
from pathlib import Path
# Add the directory containing cmux.py to the path
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from cmux import cmux, cmuxError # noqa: E402
def _parse_sidebar_state(text: str) -> dict[str, str]:
data: dict[str, str] = {}
for raw in (text or "").splitlines():
line = raw.rstrip("\n")
if not line or line.startswith(" "):
continue
if "=" not in line:
continue
k, v = line.split("=", 1)
data[k.strip()] = v.strip()
return data
def _wait_for(predicate, timeout: float, interval: float, label: str):
start = time.time()
last_error: Exception | None = None
while time.time() - start < timeout:
try:
value = predicate()
if value:
return value
except Exception as e:
last_error = e
time.sleep(interval)
if last_error is not None:
raise AssertionError(f"Timed out waiting for {label}. Last error: {last_error}")
raise AssertionError(f"Timed out waiting for {label}.")
def _wait_for_state_field(
client: cmux,
key: str,
expected: str,
timeout: float = 6.0,
interval: float = 0.1,
) -> dict[str, str]:
def pred():
state = _parse_sidebar_state(client.sidebar_state())
return state if state.get(key) == expected else None
return _wait_for(pred, timeout=timeout, interval=interval, label=f"{key}={expected!r}")
def _wait_for_git_branch(
client: cmux,
expected: str,
timeout: float = 8.0,
interval: float = 0.15,
) -> dict[str, str]:
def pred():
state = _parse_sidebar_state(client.sidebar_state())
raw = state.get("git_branch", "")
branch = raw.split(" ", 1)[0] # "main dirty" -> "main", "none" -> "none"
return state if branch == expected else None
return _wait_for(pred, timeout=timeout, interval=interval, label=f"git_branch={expected!r}")
def _git(cwd: Path, *args: str) -> None:
subprocess.run(["git", *args], cwd=str(cwd), check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
def _init_git_repo(repo: Path) -> None:
repo.mkdir(parents=True, exist_ok=True)
_git(repo, "init")
_git(repo, "config", "user.email", "cmuxterm-test@example.com")
_git(repo, "config", "user.name", "cmuxterm-test")
(repo / "README.md").write_text("hello\n", encoding="utf-8")
_git(repo, "add", "README.md")
_git(repo, "commit", "-m", "init")
# Normalize the initial branch to "main" so the test is deterministic.
branch = subprocess.check_output(
["git", "rev-parse", "--abbrev-ref", "HEAD"], cwd=str(repo)
).decode("utf-8", errors="replace").strip()
if branch and branch != "main":
_git(repo, "branch", "-m", "main")
def main() -> int:
tag = os.environ.get("CMUX_TAG") or os.environ.get("CMUXTERM_TAG") or ""
if not tag:
print("Tip: set CMUX_TAG=<tag> when running this test to avoid socket conflicts.")
base = Path("/tmp") / f"cmux_sidebar_test_{os.getpid()}"
repo = base / "repo"
other = base / "other"
try:
if base.exists():
shutil.rmtree(base)
other.mkdir(parents=True, exist_ok=True)
_init_git_repo(repo)
with cmux() as client:
new_tab_id = client.new_tab()
client.select_tab(new_tab_id)
time.sleep(0.6)
# Initial: sync via `pwd` to a file, then wait for sidebar_state cwd.
marker = base / "pwd.txt"
client.send(f"pwd > {marker}\n")
_wait_for(lambda: marker.exists(), timeout=4.0, interval=0.1, label="pwd marker file")
expected_pwd = marker.read_text(encoding="utf-8").strip()
_wait_for_state_field(client, "cwd", expected_pwd)
# Multiple cd's: ensure cwd tracks changes.
client.send(f"cd {other}\n")
_wait_for_state_field(client, "cwd", str(other))
_wait_for_git_branch(client, "none")
client.send(f"cd {repo}\n")
_wait_for_state_field(client, "cwd", str(repo))
_wait_for_git_branch(client, "main")
# Branch change should update.
client.send("git checkout -b feature/sidebar\n")
_wait_for_git_branch(client, "feature/sidebar")
# Leaving the repo should clear the branch.
client.send(f"cd {other}\n")
_wait_for_state_field(client, "cwd", str(other))
_wait_for_git_branch(client, "none")
try:
client.close_tab(new_tab_id)
except Exception:
pass
print("Sidebar CWD + git branch test passed.")
return 0
except (cmuxError, subprocess.CalledProcessError, AssertionError) as e:
print(f"Sidebar CWD + git branch test failed: {e}")
return 1
finally:
try:
shutil.rmtree(base)
except Exception:
pass
if __name__ == "__main__":
raise SystemExit(main())

View file

@ -0,0 +1,127 @@
#!/usr/bin/env python3
"""
Regression: report_ports/report_pwd must validate panel IDs.
If shell-integration hooks fire late (after a split is closed) they can report
ports/cwd for a stale surface UUID. These updates should not pollute the sidebar
state (stale ports/cwd).
Run with a tagged instance to avoid unix socket conflicts:
CMUX_TAG=<tag> python3 tests/test_sidebar_invalid_panel.py
"""
from __future__ import annotations
import os
import random
import subprocess
import sys
import time
import uuid
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from cmux import cmux, cmuxError # noqa: E402
def _parse_sidebar_state(text: str) -> dict[str, str]:
data: dict[str, str] = {}
for raw in (text or "").splitlines():
line = raw.rstrip("\n")
if not line or line.startswith(" "):
continue
if "=" not in line:
continue
k, v = line.split("=", 1)
data[k.strip()] = v.strip()
return data
def _parse_ports(raw: str) -> set[int]:
raw = (raw or "").strip()
if not raw or raw == "none":
return set()
ports: set[int] = set()
for item in raw.split(","):
item = item.strip()
if not item:
continue
try:
ports.add(int(item))
except ValueError:
continue
return ports
def _pick_absent_port(exclude: set[int]) -> int:
# Pick a random port that isn't already showing and also isn't currently
# listening machine-wide (avoid false failures if something is bound).
for _ in range(200):
port = random.randint(20000, 65000)
if port in exclude:
continue
result = subprocess.run(
["lsof", "-nP", f"-iTCP:{port}", "-sTCP:LISTEN", "-t"],
capture_output=True,
text=True,
)
if result.returncode != 0 and not (result.stdout or "").strip():
return port
# Fall back to a fixed high port; still validate against exclude.
for port in (54321, 54322, 54323, 61999):
if port not in exclude:
return port
return 65000
def main() -> int:
try:
with cmux() as client:
tab_id = client.new_tab()
client.select_tab(tab_id)
time.sleep(0.8)
initial_state = client.sidebar_state(tab_id)
initial_ports = _parse_ports(_parse_sidebar_state(initial_state).get("ports", ""))
test_port = _pick_absent_port(initial_ports)
surface_ids = {surface_id for _, surface_id, _ in client.list_surfaces(tab_id)}
fake_panel = uuid.uuid4()
while str(fake_panel) in surface_ids:
fake_panel = uuid.uuid4()
# Ports: reporting against a bogus panel must not update the union.
client._send_command(f"report_ports {test_port} --tab={tab_id} --panel={fake_panel}")
time.sleep(0.3)
state = client.sidebar_state(tab_id)
ports = _parse_ports(_parse_sidebar_state(state).get("ports", ""))
if test_port in ports:
print(f"FAIL: invalid panel report_ports leaked into sidebar ports: {ports}")
return 1
# CWD: reporting against a bogus panel must not set cwd to that value.
unique_dir = f"/tmp/cmux_invalid_pwd_{os.getpid()}"
client._send_command(f"report_pwd {unique_dir} --tab={tab_id} --panel={fake_panel}")
time.sleep(0.3)
state = client.sidebar_state(tab_id)
if unique_dir in state:
print("FAIL: invalid panel report_pwd leaked into sidebar_state")
print(state)
return 1
try:
client.close_tab(tab_id)
except cmuxError:
pass
print("PASS: invalid panel reports do not pollute sidebar metadata")
return 0
except (cmuxError, RuntimeError, ValueError) as e:
print(f"FAIL: {e}")
return 1
if __name__ == "__main__":
raise SystemExit(main())

306
tests/test_sidebar_ports.py Normal file
View file

@ -0,0 +1,306 @@
#!/usr/bin/env python3
"""
End-to-end test for sidebar listening ports auto-detection.
This covers regressions where a listening server (e.g. `python3 -m http.server`)
doesn't show up in the sidebar ports row.
Run with a tagged instance to avoid unix socket conflicts:
CMUX_TAG=<tag> python3 tests/test_sidebar_ports.py
"""
from __future__ import annotations
import os
import shutil
import socket
import subprocess
import sys
import time
from pathlib import Path
# Add the directory containing cmux.py to the path
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from cmux import cmux, cmuxError # noqa: E402
# Historically, ports detection only checked a small allowlist. This test
# intentionally uses a port outside that set to avoid regressions where ports
# "work" only for the allowlist.
_HISTORICAL_ALLOWLIST = {8000, 8080, 8888, 5173, 3000, 3001, 5000, 5432}
_PREFERRED_BIND_HOST = "127.0.0.1"
def _parse_sidebar_state(text: str) -> dict[str, str]:
data: dict[str, str] = {}
for raw in (text or "").splitlines():
line = raw.rstrip("\n")
if not line or line.startswith(" "):
continue
if "=" not in line:
continue
k, v = line.split("=", 1)
data[k.strip()] = v.strip()
return data
def _wait_for(predicate, timeout: float, interval: float, label: str):
start = time.time()
last_error: Exception | None = None
while time.time() - start < timeout:
try:
value = predicate()
if value:
return value
except Exception as e:
last_error = e
time.sleep(interval)
if last_error is not None:
raise AssertionError(f"Timed out waiting for {label}. Last error: {last_error}")
raise AssertionError(f"Timed out waiting for {label}.")
def _find_free_allowed_port() -> int:
# Prefer a random ephemeral port to avoid flakiness from well-known ports
# being grabbed by background services.
for _ in range(50):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind((_PREFERRED_BIND_HOST, 0))
port = int(s.getsockname()[1])
if port not in _HISTORICAL_ALLOWLIST:
return port
finally:
try:
s.close()
except Exception:
pass
raise RuntimeError("Failed to find a free test port (outside historical allowlist).")
def _start_external_server(base: Path, port: int) -> subprocess.Popen:
"""
Start an http.server outside cmuxterm and ensure it is actually listening.
Retries are handled by the caller by picking a different port.
"""
proc = subprocess.Popen(
[sys.executable, "-m", "http.server", str(port), "--bind", _PREFERRED_BIND_HOST],
cwd=str(base),
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
_wait_for_lsof_listen_pid(port, expected_pid=proc.pid, timeout=6.0)
return proc
def _wait_for_port(client: cmux, port: int, timeout: float = 18.0) -> dict[str, str]:
def pred():
state = _parse_sidebar_state(client.sidebar_state())
raw = state.get("ports", "")
if raw == "none" or not raw:
return None
ports = []
for item in raw.split(","):
item = item.strip()
if not item:
continue
try:
ports.append(int(item))
except ValueError:
continue
return state if port in ports else None
return _wait_for(pred, timeout=timeout, interval=0.15, label=f"ports include {port}")
def _wait_for_port_absent(client: cmux, port: int, timeout: float = 18.0) -> dict[str, str]:
def pred():
state = _parse_sidebar_state(client.sidebar_state())
raw = state.get("ports", "")
if raw == "none" or not raw:
return state
ports = []
for item in raw.split(","):
item = item.strip()
if not item:
continue
try:
ports.append(int(item))
except ValueError:
continue
return state if port not in ports else None
return _wait_for(pred, timeout=timeout, interval=0.15, label=f"ports do not include {port}")
def _assert_port_absent_for_duration(client: cmux, port: int, duration: float = 6.0, interval: float = 0.15) -> None:
"""
Assert the port does not appear in sidebar_state during the full duration.
This is important to catch "machine-wide ports" leaking into a fresh tab.
"""
start = time.time()
while time.time() - start < duration:
state = _parse_sidebar_state(client.sidebar_state())
raw = state.get("ports", "")
if raw and raw != "none":
try:
ports = {int(p.strip()) for p in raw.split(",") if p.strip()}
except ValueError:
ports = set()
if port in ports:
raise AssertionError(f"Port {port} unexpectedly appeared in sidebar ports: {raw}")
time.sleep(interval)
def _wait_for_lsof_listen_pid(port: int, expected_pid: int | None, timeout: float = 8.0) -> int:
"""
Wait until `lsof -iTCP:<port> -sTCP:LISTEN` returns a pid.
If expected_pid is provided, require that pid to be present.
"""
def pred():
result = subprocess.run(
["lsof", "-nP", f"-iTCP:{port}", "-sTCP:LISTEN", "-t"],
capture_output=True,
text=True,
)
if result.returncode != 0:
return None
pids = []
for line in (result.stdout or "").splitlines():
line = line.strip()
if not line:
continue
try:
pids.append(int(line))
except ValueError:
continue
if not pids:
return None
if expected_pid is not None and expected_pid not in pids:
return None
return expected_pid if expected_pid is not None else pids[0]
value = _wait_for(pred, timeout=timeout, interval=0.15, label=f"lsof LISTEN pid for {port}")
return int(value)
def _wait_for_lsof_listen_gone(port: int, timeout: float = 8.0) -> None:
def pred():
result = subprocess.run(
["lsof", "-nP", f"-iTCP:{port}", "-sTCP:LISTEN", "-t"],
capture_output=True,
text=True,
)
return result.returncode != 0 or not (result.stdout or "").strip()
_wait_for(pred, timeout=timeout, interval=0.15, label=f"lsof no LISTEN for {port}")
def main() -> int:
tag = os.environ.get("CMUX_TAG") or os.environ.get("CMUXTERM_TAG") or ""
if not tag:
print("Tip: set CMUX_TAG=<tag> when running this test to avoid socket conflicts.")
base = Path("/tmp") / f"cmux_ports_test_{os.getpid()}"
pid_file = base / "server.pid"
log_file = base / "server.log"
external_proc: subprocess.Popen | None = None
try:
if base.exists():
shutil.rmtree(base)
base.mkdir(parents=True, exist_ok=True)
# Start a listening server outside cmuxterm. A fresh tab should NOT show this port,
# since ports should be attributed to the shell session in the tab.
port = None
last_start_err: Exception | None = None
for _ in range(8):
try:
port = _find_free_allowed_port()
external_proc = _start_external_server(base, port)
break
except Exception as e:
last_start_err = e
if external_proc is not None:
try:
external_proc.kill()
except Exception:
pass
external_proc = None
continue
if port is None or external_proc is None:
raise RuntimeError(f"Failed to start external http.server. Last error: {last_start_err}")
with cmux() as client:
new_tab_id = client.new_tab()
client.select_tab(new_tab_id)
time.sleep(0.8)
# Trigger a prompt cycle (and thus a ports scan burst) before checking absence.
client.send("echo cmux_ports_test\n")
_assert_port_absent_for_duration(client, port, duration=6.0)
# Stop the external server, then reuse the port inside the tab.
external_proc.terminate()
try:
external_proc.wait(timeout=3.0)
except subprocess.TimeoutExpired:
external_proc.kill()
external_proc = None
_wait_for_lsof_listen_gone(port, timeout=8.0)
# Start a server in the background and capture its PID so we can clean up.
client.send(f"rm -f {pid_file} {log_file}\n")
client.send(
f"python3 -m http.server {port} --bind {_PREFERRED_BIND_HOST} > {log_file} 2>&1 & echo $! > {pid_file}\n"
)
_wait_for(lambda: pid_file.exists(), timeout=4.0, interval=0.1, label="pid file")
pid = int(pid_file.read_text(encoding="utf-8").strip())
# Ensure the server is actually listening (sanity check + reduces flakiness).
_wait_for_lsof_listen_pid(port, expected_pid=pid, timeout=8.0)
# Wait for the sidebar to report the port.
_wait_for_port(client, port, timeout=18.0)
# Cleanup server.
client.send(f"kill {pid} >/dev/null 2>&1 || true\n")
_wait_for_lsof_listen_gone(port, timeout=8.0)
_wait_for_port_absent(client, port, timeout=18.0)
try:
client.close_tab(new_tab_id)
except Exception:
pass
print("Sidebar ports test passed.")
return 0
except (cmuxError, AssertionError, RuntimeError, ValueError) as e:
print(f"Sidebar ports test failed: {e}")
return 1
finally:
if external_proc is not None:
try:
external_proc.terminate()
external_proc.wait(timeout=2.0)
except Exception:
try:
external_proc.kill()
except Exception:
pass
try:
shutil.rmtree(base)
except Exception:
pass
if __name__ == "__main__":
raise SystemExit(main())