104 lines
3.3 KiB
Python
Executable file
104 lines
3.3 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
"""
|
|
E2E: focusing a panel clears its notification and triggers a flash.
|
|
|
|
Note: This uses the socket focus command (no assistive access needed).
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import time
|
|
|
|
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
|
|
|
from cmux import cmux, cmuxError
|
|
|
|
|
|
def wait_for_notification(client: cmux, surface_id: str, is_read: bool, timeout: float = 2.0) -> bool:
|
|
deadline = time.time() + timeout
|
|
while time.time() < deadline:
|
|
items = client.list_notifications()
|
|
for item in items:
|
|
if item["surface_id"] == surface_id and item["is_read"] == is_read:
|
|
return True
|
|
time.sleep(0.05)
|
|
return False
|
|
|
|
|
|
def surface_id_for_index(client: cmux, index: int) -> str:
|
|
surfaces = client.list_surfaces()
|
|
for entry in surfaces:
|
|
if entry[0] == index:
|
|
return entry[1]
|
|
raise RuntimeError(f"Surface index {index} not found")
|
|
|
|
|
|
def ensure_two_surfaces(client: cmux) -> None:
|
|
surfaces = client.list_surfaces()
|
|
if len(surfaces) < 2:
|
|
client.new_split("right")
|
|
time.sleep(0.2)
|
|
|
|
def first_two_terminal_indices(client: cmux) -> tuple[int, int]:
|
|
health = client.surface_health()
|
|
terms = [h["index"] for h in health if h.get("type") == "terminal"]
|
|
if len(terms) < 2:
|
|
raise RuntimeError(f"Expected >=2 terminal surfaces, got {health}")
|
|
return terms[0], terms[1]
|
|
|
|
|
|
def main() -> int:
|
|
try:
|
|
with cmux() as client:
|
|
# Socket-driven tests may run while the app isn't frontmost/key.
|
|
# Override app focus to make notification->focus behavior deterministic.
|
|
client.set_app_focus(True)
|
|
ws_id = client.new_workspace()
|
|
client.select_workspace(ws_id)
|
|
time.sleep(0.5)
|
|
ensure_two_surfaces(client)
|
|
term_a, term_b = first_two_terminal_indices(client)
|
|
client.focus_surface(term_a)
|
|
|
|
surface_id = surface_id_for_index(client, term_b)
|
|
client.clear_notifications()
|
|
client.reset_flash_counts()
|
|
initial_flash = client.flash_count(term_b)
|
|
|
|
client.notify_surface(term_b, "Focus Test", "panel", "body")
|
|
if not wait_for_notification(client, surface_id, is_read=False, timeout=2.0):
|
|
print("FAIL: Notification did not appear as unread")
|
|
return 1
|
|
|
|
client.focus_surface(term_b)
|
|
client.send("x")
|
|
time.sleep(0.2)
|
|
|
|
if not wait_for_notification(client, surface_id, is_read=True, timeout=2.0):
|
|
print("FAIL: Notification did not become read after focus")
|
|
return 1
|
|
|
|
final_flash = client.flash_count(term_b)
|
|
if final_flash <= initial_flash:
|
|
print(f"FAIL: Flash count did not increment (before={initial_flash}, after={final_flash})")
|
|
return 1
|
|
|
|
try:
|
|
client.close_workspace(ws_id)
|
|
except Exception:
|
|
pass
|
|
finally:
|
|
try:
|
|
client.set_app_focus(None)
|
|
except Exception:
|
|
pass
|
|
|
|
print("PASS: Focus clears notification and flashes panel")
|
|
return 0
|
|
except (cmuxError, RuntimeError) as exc:
|
|
print(f"FAIL: {exc}")
|
|
return 1
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|