Fix notification focus handling
This commit is contained in:
parent
8db5ccbb58
commit
6cf9dbe2a1
5 changed files with 571 additions and 23 deletions
|
|
@ -33,12 +33,26 @@ class Tab: Identifiable, ObservableObject {
|
|||
return nil
|
||||
}
|
||||
|
||||
func focusSurface(_ id: UUID) {
|
||||
guard focusedSurfaceId != id else { return }
|
||||
func focusSurface(_ id: UUID, shouldFlash: Bool = true) {
|
||||
let wasFocused = focusedSurfaceId == id
|
||||
focusedSurfaceId = id
|
||||
if let selectedId = AppDelegate.shared?.tabManager?.selectedTabId, selectedId == self.id {
|
||||
let isSelectedTab = AppDelegate.shared?.tabManager?.selectedTabId == self.id
|
||||
if isSelectedTab {
|
||||
focusedSurface?.applyWindowBackgroundIfActive()
|
||||
}
|
||||
let isAppFocused = AppFocusState.isAppFocused()
|
||||
guard isSelectedTab && isAppFocused else { return }
|
||||
guard let notificationStore = AppDelegate.shared?.notificationStore else { return }
|
||||
if notificationStore.hasUnreadNotification(forTabId: self.id, surfaceId: id) {
|
||||
if shouldFlash {
|
||||
triggerNotificationFocusFlash(surfaceId: id, requiresSplit: false)
|
||||
}
|
||||
notificationStore.markRead(forTabId: self.id, surfaceId: id)
|
||||
return
|
||||
}
|
||||
if !wasFocused {
|
||||
notificationStore.markRead(forTabId: self.id, surfaceId: id)
|
||||
}
|
||||
}
|
||||
|
||||
func updateSurfaceDirectory(surfaceId: UUID, directory: String) {
|
||||
|
|
@ -50,8 +64,8 @@ class Tab: Identifiable, ObservableObject {
|
|||
currentDirectory = trimmed
|
||||
}
|
||||
|
||||
func triggerNotificationFocusFlash(surfaceId: UUID) {
|
||||
triggerPanelFlash(surfaceId: surfaceId, requiresSplit: true)
|
||||
func triggerNotificationFocusFlash(surfaceId: UUID, requiresSplit: Bool = false) {
|
||||
triggerPanelFlash(surfaceId: surfaceId, requiresSplit: requiresSplit)
|
||||
}
|
||||
|
||||
func triggerDebugFlash(surfaceId: UUID) {
|
||||
|
|
@ -64,7 +78,24 @@ class Tab: Identifiable, ObservableObject {
|
|||
if requiresSplit && !splitTree.isSplit {
|
||||
return
|
||||
}
|
||||
surface.hostedView.triggerFlash()
|
||||
triggerFlashWhenReady(surface: surface)
|
||||
}
|
||||
|
||||
private func triggerFlashWhenReady(surface: TerminalSurface, attempts: Int = 0) {
|
||||
let maxAttempts = 6
|
||||
let view = surface.hostedView
|
||||
if view.window != nil {
|
||||
view.layoutSubtreeIfNeeded()
|
||||
}
|
||||
let hasBounds = view.bounds.width > 0 && view.bounds.height > 0
|
||||
if view.window != nil && hasBounds {
|
||||
view.triggerFlash()
|
||||
return
|
||||
}
|
||||
guard attempts < maxAttempts else { return }
|
||||
DispatchQueue.main.asyncAfter(deadline: .now() + 0.05) { [weak self] in
|
||||
self?.triggerFlashWhenReady(surface: surface, attempts: attempts + 1)
|
||||
}
|
||||
}
|
||||
|
||||
func updateSplitViewSize(_ size: CGSize) {
|
||||
|
|
@ -221,10 +252,14 @@ class TabManager: ObservableObject {
|
|||
DispatchQueue.main.async { [weak self] in
|
||||
self?.focusSelectedTabSurface(previousTabId: previousTabId)
|
||||
self?.updateWindowTitleForSelectedTab()
|
||||
if let selectedTabId = self?.selectedTabId {
|
||||
self?.markFocusedPanelReadIfActive(tabId: selectedTabId)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
private var observers: [NSObjectProtocol] = []
|
||||
private var suppressFocusFlash = false
|
||||
|
||||
init() {
|
||||
addTab()
|
||||
|
|
@ -398,6 +433,20 @@ class TabManager: ObservableObject {
|
|||
surface.hostedView.ensureFocus(for: selectedTabId, surfaceId: surface.id)
|
||||
}
|
||||
|
||||
private func markFocusedPanelReadIfActive(tabId: UUID) {
|
||||
let shouldSuppressFlash = suppressFocusFlash
|
||||
suppressFocusFlash = false
|
||||
guard AppFocusState.isAppFocused() else { return }
|
||||
guard let surfaceId = focusedSurfaceId(for: tabId) else { return }
|
||||
guard let notificationStore = AppDelegate.shared?.notificationStore else { return }
|
||||
guard notificationStore.hasUnreadNotification(forTabId: tabId, surfaceId: surfaceId) else { return }
|
||||
if !shouldSuppressFlash,
|
||||
let tab = tabs.first(where: { $0.id == tabId }) {
|
||||
tab.triggerNotificationFocusFlash(surfaceId: surfaceId, requiresSplit: false)
|
||||
}
|
||||
notificationStore.markRead(forTabId: tabId, surfaceId: surfaceId)
|
||||
}
|
||||
|
||||
private func updateTabTitle(tabId: UUID, title: String) {
|
||||
guard !title.isEmpty else { return }
|
||||
guard let index = tabs.firstIndex(where: { $0.id == tabId }) else { return }
|
||||
|
|
@ -434,7 +483,7 @@ class TabManager: ObservableObject {
|
|||
return trimmedDirectory.isEmpty ? "cmuxterm" : trimmedDirectory
|
||||
}
|
||||
|
||||
func focusTab(_ tabId: UUID, surfaceId: UUID? = nil) {
|
||||
func focusTab(_ tabId: UUID, surfaceId: UUID? = nil, suppressFlash: Bool = false) {
|
||||
guard tabs.contains(where: { $0.id == tabId }) else { return }
|
||||
selectedTabId = tabId
|
||||
NotificationCenter.default.post(
|
||||
|
|
@ -452,27 +501,33 @@ class TabManager: ObservableObject {
|
|||
}
|
||||
|
||||
if let surfaceId {
|
||||
focusSurface(tabId: tabId, surfaceId: surfaceId)
|
||||
focusSurface(tabId: tabId, surfaceId: surfaceId, shouldFlash: !suppressFlash)
|
||||
}
|
||||
}
|
||||
|
||||
func focusTabFromNotification(_ tabId: UUID, surfaceId: UUID? = nil) {
|
||||
let wasSelected = selectedTabId == tabId
|
||||
suppressFocusFlash = true
|
||||
focusTab(tabId, surfaceId: surfaceId)
|
||||
if wasSelected {
|
||||
suppressFocusFlash = false
|
||||
}
|
||||
|
||||
DispatchQueue.main.asyncAfter(deadline: .now() + 0.05) { [weak self] in
|
||||
guard let self,
|
||||
let tab = self.tabs.first(where: { $0.id == tabId }),
|
||||
tab.splitTree.isSplit else { return }
|
||||
let tab = self.tabs.first(where: { $0.id == tabId }) else { return }
|
||||
let targetSurfaceId = surfaceId ?? tab.focusedSurfaceId
|
||||
guard let targetSurfaceId,
|
||||
tab.surface(for: targetSurfaceId) != nil else { return }
|
||||
tab.triggerNotificationFocusFlash(surfaceId: targetSurfaceId)
|
||||
guard let notificationStore = AppDelegate.shared?.notificationStore else { return }
|
||||
guard notificationStore.hasUnreadNotification(forTabId: tabId, surfaceId: targetSurfaceId) else { return }
|
||||
tab.triggerNotificationFocusFlash(surfaceId: targetSurfaceId, requiresSplit: false)
|
||||
}
|
||||
}
|
||||
|
||||
func focusSurface(tabId: UUID, surfaceId: UUID) {
|
||||
func focusSurface(tabId: UUID, surfaceId: UUID, shouldFlash: Bool = true) {
|
||||
guard let tab = tabs.first(where: { $0.id == tabId }) else { return }
|
||||
tab.focusSurface(surfaceId)
|
||||
tab.focusSurface(surfaceId, shouldFlash: shouldFlash)
|
||||
}
|
||||
|
||||
func selectNextTab() {
|
||||
|
|
|
|||
|
|
@ -1,3 +1,4 @@
|
|||
import AppKit
|
||||
import Foundation
|
||||
|
||||
/// Unix socket-based controller for programmatic terminal control
|
||||
|
|
@ -102,20 +103,28 @@ class TerminalController {
|
|||
defer { close(socket) }
|
||||
|
||||
var buffer = [UInt8](repeating: 0, count: 4096)
|
||||
var pending = ""
|
||||
|
||||
while isRunning {
|
||||
let bytesRead = read(socket, &buffer, buffer.count - 1)
|
||||
guard bytesRead > 0 else { break }
|
||||
|
||||
buffer[bytesRead] = 0
|
||||
let command = String(cString: buffer)
|
||||
let response = processCommand(command.trimmingCharacters(in: .whitespacesAndNewlines))
|
||||
let chunk = String(bytes: buffer[0..<bytesRead], encoding: .utf8) ?? ""
|
||||
pending.append(chunk)
|
||||
|
||||
response.withCString { ptr in
|
||||
_ = write(socket, ptr, strlen(ptr))
|
||||
}
|
||||
"\n".withCString { ptr in
|
||||
_ = write(socket, ptr, 1)
|
||||
while let newlineIndex = pending.firstIndex(of: "\n") {
|
||||
let line = String(pending[..<newlineIndex])
|
||||
pending = String(pending[pending.index(after: newlineIndex)...])
|
||||
let trimmed = line.trimmingCharacters(in: .whitespacesAndNewlines)
|
||||
guard !trimmed.isEmpty else { continue }
|
||||
|
||||
let response = processCommand(trimmed)
|
||||
response.withCString { ptr in
|
||||
_ = write(socket, ptr, strlen(ptr))
|
||||
}
|
||||
"\n".withCString { ptr in
|
||||
_ = write(socket, ptr, 1)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -167,6 +176,24 @@ class TerminalController {
|
|||
case "send_key_surface":
|
||||
return sendKeyToSurface(args)
|
||||
|
||||
case "notify":
|
||||
return notifyCurrent(args)
|
||||
|
||||
case "notify_surface":
|
||||
return notifySurface(args)
|
||||
|
||||
case "list_notifications":
|
||||
return listNotifications()
|
||||
|
||||
case "clear_notifications":
|
||||
return clearNotifications()
|
||||
|
||||
case "set_app_focus":
|
||||
return setAppFocusOverride(args)
|
||||
|
||||
case "simulate_app_active":
|
||||
return simulateAppDidBecomeActive()
|
||||
|
||||
case "help":
|
||||
return helpText()
|
||||
|
||||
|
|
@ -191,6 +218,12 @@ class TerminalController {
|
|||
send_key <key> - Send special key (ctrl-c, ctrl-d, enter, tab, escape)
|
||||
send_surface <id|idx> <text> - Send text to a surface in current tab
|
||||
send_key_surface <id|idx> <key> - Send special key to a surface in current tab
|
||||
notify <title>|<body> - Create a notification for the focused surface
|
||||
notify_surface <id|idx> <title>|<body> - Create a notification for a surface
|
||||
list_notifications - List all notifications
|
||||
clear_notifications - Clear all notifications
|
||||
set_app_focus <active|inactive|clear> - Override app focus state
|
||||
simulate_app_active - Trigger app active handler
|
||||
help - Show this help
|
||||
"""
|
||||
}
|
||||
|
|
@ -289,6 +322,104 @@ class TerminalController {
|
|||
return success ? "OK" : "ERROR: Surface not found"
|
||||
}
|
||||
|
||||
private func notifyCurrent(_ args: String) -> String {
|
||||
guard let tabManager = tabManager else { return "ERROR: TabManager not available" }
|
||||
|
||||
var result = "OK"
|
||||
DispatchQueue.main.sync {
|
||||
guard let tabId = tabManager.selectedTabId else {
|
||||
result = "ERROR: No tab selected"
|
||||
return
|
||||
}
|
||||
let surfaceId = tabManager.focusedSurfaceId(for: tabId)
|
||||
let (title, body) = parseNotificationPayload(args)
|
||||
TerminalNotificationStore.shared.addNotification(
|
||||
tabId: tabId,
|
||||
surfaceId: surfaceId,
|
||||
title: title,
|
||||
body: body
|
||||
)
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
private func notifySurface(_ args: String) -> String {
|
||||
guard let tabManager = tabManager else { return "ERROR: TabManager not available" }
|
||||
let trimmed = args.trimmingCharacters(in: .whitespacesAndNewlines)
|
||||
guard !trimmed.isEmpty else { return "ERROR: Missing surface id or index" }
|
||||
|
||||
let parts = trimmed.split(separator: " ", maxSplits: 1).map(String.init)
|
||||
let surfaceArg = parts[0]
|
||||
let payload = parts.count > 1 ? parts[1] : ""
|
||||
|
||||
var result = "OK"
|
||||
DispatchQueue.main.sync {
|
||||
guard let tabId = tabManager.selectedTabId,
|
||||
let tab = tabManager.tabs.first(where: { $0.id == tabId }) else {
|
||||
result = "ERROR: No tab selected"
|
||||
return
|
||||
}
|
||||
guard let surfaceId = resolveSurfaceId(from: surfaceArg, tab: tab) else {
|
||||
result = "ERROR: Surface not found"
|
||||
return
|
||||
}
|
||||
let (title, body) = parseNotificationPayload(payload)
|
||||
TerminalNotificationStore.shared.addNotification(
|
||||
tabId: tabId,
|
||||
surfaceId: surfaceId,
|
||||
title: title,
|
||||
body: body
|
||||
)
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
private func listNotifications() -> String {
|
||||
var result = ""
|
||||
DispatchQueue.main.sync {
|
||||
let lines = TerminalNotificationStore.shared.notifications.enumerated().map { index, notification in
|
||||
let surfaceText = notification.surfaceId?.uuidString ?? "none"
|
||||
let readText = notification.isRead ? "read" : "unread"
|
||||
return "\(index):\(notification.id.uuidString)|\(notification.tabId.uuidString)|\(surfaceText)|\(readText)|\(notification.title)|\(notification.body)"
|
||||
}
|
||||
result = lines.joined(separator: "\n")
|
||||
}
|
||||
return result.isEmpty ? "No notifications" : result
|
||||
}
|
||||
|
||||
private func clearNotifications() -> String {
|
||||
DispatchQueue.main.sync {
|
||||
TerminalNotificationStore.shared.clearAll()
|
||||
}
|
||||
return "OK"
|
||||
}
|
||||
|
||||
private func setAppFocusOverride(_ arg: String) -> String {
|
||||
let trimmed = arg.trimmingCharacters(in: .whitespacesAndNewlines).lowercased()
|
||||
switch trimmed {
|
||||
case "active", "1", "true":
|
||||
AppFocusState.overrideIsFocused = true
|
||||
return "OK"
|
||||
case "inactive", "0", "false":
|
||||
AppFocusState.overrideIsFocused = false
|
||||
return "OK"
|
||||
case "clear", "none", "":
|
||||
AppFocusState.overrideIsFocused = nil
|
||||
return "OK"
|
||||
default:
|
||||
return "ERROR: Expected active, inactive, or clear"
|
||||
}
|
||||
}
|
||||
|
||||
private func simulateAppDidBecomeActive() -> String {
|
||||
DispatchQueue.main.sync {
|
||||
AppDelegate.shared?.applicationDidBecomeActive(
|
||||
Notification(name: NSApplication.didBecomeActiveNotification)
|
||||
)
|
||||
}
|
||||
return "OK"
|
||||
}
|
||||
|
||||
private func parseSplitDirection(_ value: String) -> SplitTree<TerminalSurface>.NewDirection? {
|
||||
switch value.lowercased() {
|
||||
case "left", "l":
|
||||
|
|
@ -342,6 +473,29 @@ class TerminalController {
|
|||
return nil
|
||||
}
|
||||
|
||||
private func resolveSurfaceId(from arg: String, tab: Tab) -> UUID? {
|
||||
if let uuid = UUID(uuidString: arg), tab.surface(for: uuid) != nil {
|
||||
return uuid
|
||||
}
|
||||
|
||||
if let index = Int(arg), index >= 0 {
|
||||
let surfaces = tab.splitTree.root?.leaves() ?? []
|
||||
guard index < surfaces.count else { return nil }
|
||||
return surfaces[index].id
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
private func parseNotificationPayload(_ args: String) -> (String, String) {
|
||||
let trimmed = args.trimmingCharacters(in: .whitespacesAndNewlines)
|
||||
guard !trimmed.isEmpty else { return ("Notification", "") }
|
||||
let parts = trimmed.split(separator: "|", maxSplits: 1).map(String.init)
|
||||
let title = parts[0].trimmingCharacters(in: .whitespacesAndNewlines)
|
||||
let body = parts.count > 1 ? parts[1].trimmingCharacters(in: .whitespacesAndNewlines) : ""
|
||||
return (title.isEmpty ? "Notification" : title, body)
|
||||
}
|
||||
|
||||
private func closeTab(_ tabId: String) -> String {
|
||||
guard let tabManager = tabManager else { return "ERROR: TabManager not available" }
|
||||
guard let uuid = UUID(uuidString: tabId) else { return "ERROR: Invalid tab ID" }
|
||||
|
|
|
|||
|
|
@ -2,6 +2,17 @@ import AppKit
|
|||
import Foundation
|
||||
import UserNotifications
|
||||
|
||||
enum AppFocusState {
|
||||
static var overrideIsFocused: Bool?
|
||||
|
||||
static func isAppFocused() -> Bool {
|
||||
if let overrideIsFocused {
|
||||
return overrideIsFocused
|
||||
}
|
||||
return NSApp.isActive && (NSApp.keyWindow?.isKeyWindow ?? false)
|
||||
}
|
||||
}
|
||||
|
||||
struct TerminalNotification: Identifiable, Hashable {
|
||||
let id: UUID
|
||||
let tabId: UUID
|
||||
|
|
@ -34,6 +45,10 @@ final class TerminalNotificationStore: ObservableObject {
|
|||
notifications.filter { $0.tabId == tabId && !$0.isRead }.count
|
||||
}
|
||||
|
||||
func hasUnreadNotification(forTabId tabId: UUID, surfaceId: UUID?) -> Bool {
|
||||
notifications.contains { $0.tabId == tabId && $0.surfaceId == surfaceId && !$0.isRead }
|
||||
}
|
||||
|
||||
func latestNotification(forTabId tabId: UUID) -> TerminalNotification? {
|
||||
if let unread = notifications.first(where: { $0.tabId == tabId && !$0.isRead }) {
|
||||
return unread
|
||||
|
|
@ -42,10 +57,17 @@ final class TerminalNotificationStore: ObservableObject {
|
|||
}
|
||||
|
||||
func addNotification(tabId: UUID, surfaceId: UUID?, title: String, body: String) {
|
||||
clearNotifications(forTabId: tabId, surfaceId: surfaceId)
|
||||
|
||||
let isActiveTab = AppDelegate.shared?.tabManager?.selectedTabId == tabId
|
||||
let focusedSurfaceId = AppDelegate.shared?.tabManager?.focusedSurfaceId(for: tabId)
|
||||
let isFocusedSurface = surfaceId == nil || focusedSurfaceId == surfaceId
|
||||
let shouldMarkRead = NSApp.isActive && (NSApp.keyWindow?.isKeyWindow ?? false) && isActiveTab && isFocusedSurface
|
||||
let isFocusedPanel = isActiveTab && isFocusedSurface
|
||||
let isAppFocused = AppFocusState.isAppFocused()
|
||||
if isAppFocused && isFocusedPanel {
|
||||
return
|
||||
}
|
||||
|
||||
let notification = TerminalNotification(
|
||||
id: UUID(),
|
||||
tabId: tabId,
|
||||
|
|
@ -53,7 +75,7 @@ final class TerminalNotificationStore: ObservableObject {
|
|||
title: title,
|
||||
body: body,
|
||||
createdAt: Date(),
|
||||
isRead: shouldMarkRead
|
||||
isRead: false
|
||||
)
|
||||
notifications.insert(notification, at: 0)
|
||||
scheduleUserNotification(notification)
|
||||
|
|
@ -79,6 +101,22 @@ final class TerminalNotificationStore: ObservableObject {
|
|||
}
|
||||
}
|
||||
|
||||
func markRead(forTabId tabId: UUID, surfaceId: UUID?) {
|
||||
var idsToClear: [String] = []
|
||||
for index in notifications.indices {
|
||||
if notifications[index].tabId == tabId,
|
||||
notifications[index].surfaceId == surfaceId,
|
||||
!notifications[index].isRead {
|
||||
notifications[index].isRead = true
|
||||
idsToClear.append(notifications[index].id.uuidString)
|
||||
}
|
||||
}
|
||||
if !idsToClear.isEmpty {
|
||||
center.removeDeliveredNotifications(withIdentifiers: idsToClear)
|
||||
center.removePendingNotificationRequests(withIdentifiers: idsToClear)
|
||||
}
|
||||
}
|
||||
|
||||
func markUnread(forTabId tabId: UUID) {
|
||||
for index in notifications.indices {
|
||||
if notifications[index].tabId == tabId {
|
||||
|
|
@ -100,6 +138,17 @@ final class TerminalNotificationStore: ObservableObject {
|
|||
}
|
||||
}
|
||||
|
||||
func clearNotifications(forTabId tabId: UUID, surfaceId: UUID?) {
|
||||
let ids = notifications
|
||||
.filter { $0.tabId == tabId && $0.surfaceId == surfaceId }
|
||||
.map { $0.id.uuidString }
|
||||
notifications.removeAll { $0.tabId == tabId && $0.surfaceId == surfaceId }
|
||||
if !ids.isEmpty {
|
||||
center.removeDeliveredNotifications(withIdentifiers: ids)
|
||||
center.removePendingNotificationRequests(withIdentifiers: ids)
|
||||
}
|
||||
}
|
||||
|
||||
private func scheduleUserNotification(_ notification: TerminalNotification) {
|
||||
ensureAuthorization { [weak self] authorized in
|
||||
guard let self, authorized else { return }
|
||||
|
|
|
|||
|
|
@ -239,6 +239,70 @@ class cmux:
|
|||
"""Get help text from server"""
|
||||
return self._send_command("help")
|
||||
|
||||
def notify(self, title: str, body: str = "") -> None:
|
||||
"""Create a notification for the focused surface."""
|
||||
payload = f"{title}|{body}" if body else title
|
||||
response = self._send_command(f"notify {payload}")
|
||||
if not response.startswith("OK"):
|
||||
raise cmuxError(response)
|
||||
|
||||
def notify_surface(self, surface: str | int, title: str, body: str = "") -> None:
|
||||
"""Create a notification for a specific surface by ID or index."""
|
||||
payload = f"{title}|{body}" if body else title
|
||||
response = self._send_command(f"notify_surface {surface} {payload}")
|
||||
if not response.startswith("OK"):
|
||||
raise cmuxError(response)
|
||||
|
||||
def list_notifications(self) -> list[dict]:
|
||||
"""
|
||||
List notifications.
|
||||
Returns list of dicts with keys: id, tab_id, surface_id, is_read, title, body.
|
||||
"""
|
||||
response = self._send_command("list_notifications")
|
||||
if response == "No notifications":
|
||||
return []
|
||||
|
||||
items = []
|
||||
for line in response.split("\n"):
|
||||
if not line.strip():
|
||||
continue
|
||||
_, payload = line.split(":", 1)
|
||||
parts = payload.split("|", 5)
|
||||
if len(parts) < 6:
|
||||
continue
|
||||
notif_id, tab_id, surface_id, read_text, title, body = parts
|
||||
items.append({
|
||||
"id": notif_id,
|
||||
"tab_id": tab_id,
|
||||
"surface_id": None if surface_id == "none" else surface_id,
|
||||
"is_read": read_text == "read",
|
||||
"title": title,
|
||||
"body": body,
|
||||
})
|
||||
return items
|
||||
|
||||
def clear_notifications(self) -> None:
|
||||
"""Clear all notifications."""
|
||||
response = self._send_command("clear_notifications")
|
||||
if not response.startswith("OK"):
|
||||
raise cmuxError(response)
|
||||
|
||||
def set_app_focus(self, active: bool | None) -> None:
|
||||
"""Override app focus state. Use None to clear override."""
|
||||
if active is None:
|
||||
value = "clear"
|
||||
else:
|
||||
value = "active" if active else "inactive"
|
||||
response = self._send_command(f"set_app_focus {value}")
|
||||
if not response.startswith("OK"):
|
||||
raise cmuxError(response)
|
||||
|
||||
def simulate_app_active(self) -> None:
|
||||
"""Trigger the app active handler."""
|
||||
response = self._send_command("simulate_app_active")
|
||||
if not response.startswith("OK"):
|
||||
raise cmuxError(response)
|
||||
|
||||
|
||||
def main():
|
||||
"""CLI interface for cmux"""
|
||||
|
|
|
|||
226
tests/test_notifications.py
Normal file
226
tests/test_notifications.py
Normal file
|
|
@ -0,0 +1,226 @@
|
|||
#!/usr/bin/env python3
|
||||
"""
|
||||
Automated tests for notification focus/suppression behavior.
|
||||
|
||||
Usage:
|
||||
python3 test_notifications.py
|
||||
|
||||
Requirements:
|
||||
- cmux must be running with the socket controller enabled
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
|
||||
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
||||
|
||||
from cmux import cmux, cmuxError
|
||||
|
||||
|
||||
class TestResult:
|
||||
def __init__(self, name: str):
|
||||
self.name = name
|
||||
self.passed = False
|
||||
self.message = ""
|
||||
|
||||
def success(self, msg: str = ""):
|
||||
self.passed = True
|
||||
self.message = msg
|
||||
|
||||
def failure(self, msg: str):
|
||||
self.passed = False
|
||||
self.message = msg
|
||||
|
||||
|
||||
def wait_for_notifications(client: cmux, expected: int, timeout: float = 2.0) -> list[dict]:
|
||||
start = time.time()
|
||||
while time.time() - start < timeout:
|
||||
items = client.list_notifications()
|
||||
if len(items) == expected:
|
||||
return items
|
||||
time.sleep(0.05)
|
||||
return client.list_notifications()
|
||||
|
||||
|
||||
def ensure_two_surfaces(client: cmux) -> list[tuple[int, str, bool]]:
|
||||
surfaces = client.list_surfaces()
|
||||
if len(surfaces) < 2:
|
||||
client.new_split("right")
|
||||
time.sleep(0.1)
|
||||
surfaces = client.list_surfaces()
|
||||
return surfaces
|
||||
|
||||
|
||||
def test_clear_prior_notifications(client: cmux) -> TestResult:
|
||||
result = TestResult("Clear Prior Panel Notifications")
|
||||
try:
|
||||
client.clear_notifications()
|
||||
client.set_app_focus(False)
|
||||
client.notify("first")
|
||||
time.sleep(0.1)
|
||||
client.notify("second")
|
||||
items = wait_for_notifications(client, 1)
|
||||
if len(items) != 1:
|
||||
result.failure(f"Expected 1 notification, got {len(items)}")
|
||||
elif items[0]["title"] != "second":
|
||||
result.failure(f"Expected latest title 'second', got '{items[0]['title']}'")
|
||||
else:
|
||||
result.success("Prior panel notifications cleared")
|
||||
except Exception as e:
|
||||
result.failure(f"Exception: {e}")
|
||||
return result
|
||||
|
||||
|
||||
def test_suppress_when_focused(client: cmux) -> TestResult:
|
||||
result = TestResult("Suppress When App+Panel Focused")
|
||||
try:
|
||||
client.clear_notifications()
|
||||
client.set_app_focus(True)
|
||||
client.notify("focused")
|
||||
items = wait_for_notifications(client, 0)
|
||||
if len(items) == 0:
|
||||
result.success("Suppressed notification when focused")
|
||||
else:
|
||||
result.failure(f"Expected 0 notifications, got {len(items)}")
|
||||
except Exception as e:
|
||||
result.failure(f"Exception: {e}")
|
||||
return result
|
||||
|
||||
|
||||
def test_not_suppressed_when_inactive(client: cmux) -> TestResult:
|
||||
result = TestResult("Allow When App Inactive")
|
||||
try:
|
||||
client.clear_notifications()
|
||||
client.set_app_focus(False)
|
||||
client.notify("inactive")
|
||||
items = wait_for_notifications(client, 1)
|
||||
if len(items) != 1:
|
||||
result.failure(f"Expected 1 notification, got {len(items)}")
|
||||
elif items[0]["is_read"]:
|
||||
result.failure("Expected notification to be unread")
|
||||
else:
|
||||
result.success("Notification stored when app inactive")
|
||||
except Exception as e:
|
||||
result.failure(f"Exception: {e}")
|
||||
return result
|
||||
|
||||
|
||||
def test_mark_read_on_focus_change(client: cmux) -> TestResult:
|
||||
result = TestResult("Mark Read On Panel Focus")
|
||||
try:
|
||||
client.clear_notifications()
|
||||
surfaces = ensure_two_surfaces(client)
|
||||
focused = next((s for s in surfaces if s[2]), None)
|
||||
other = next((s for s in surfaces if not s[2]), None)
|
||||
if focused is None or other is None:
|
||||
result.failure("Unable to identify focused and unfocused surfaces")
|
||||
return result
|
||||
|
||||
client.set_app_focus(False)
|
||||
client.notify_surface(other[0], "focusread")
|
||||
time.sleep(0.1)
|
||||
|
||||
client.set_app_focus(True)
|
||||
client.focus_surface(other[0])
|
||||
time.sleep(0.1)
|
||||
|
||||
items = client.list_notifications()
|
||||
target = next((n for n in items if n["surface_id"] == other[1]), None)
|
||||
if target is None:
|
||||
result.failure("Expected notification for target surface")
|
||||
elif not target["is_read"]:
|
||||
result.failure("Expected notification to be marked read on focus")
|
||||
else:
|
||||
result.success("Notification marked read on focus")
|
||||
except Exception as e:
|
||||
result.failure(f"Exception: {e}")
|
||||
return result
|
||||
|
||||
|
||||
def test_mark_read_on_app_active(client: cmux) -> TestResult:
|
||||
result = TestResult("Mark Read On App Active")
|
||||
try:
|
||||
client.clear_notifications()
|
||||
client.set_app_focus(False)
|
||||
client.notify("activate")
|
||||
time.sleep(0.1)
|
||||
|
||||
items = client.list_notifications()
|
||||
if not items or items[0]["is_read"]:
|
||||
result.failure("Expected unread notification before activation")
|
||||
return result
|
||||
|
||||
client.simulate_app_active()
|
||||
time.sleep(0.1)
|
||||
|
||||
items = client.list_notifications()
|
||||
if not items:
|
||||
result.failure("Expected notification to remain after activation")
|
||||
elif not items[0]["is_read"]:
|
||||
result.failure("Expected notification to be marked read on app active")
|
||||
else:
|
||||
result.success("Notification marked read on app active")
|
||||
except Exception as e:
|
||||
result.failure(f"Exception: {e}")
|
||||
return result
|
||||
|
||||
|
||||
def test_mark_read_on_tab_switch(client: cmux) -> TestResult:
|
||||
result = TestResult("Mark Read On Tab Switch")
|
||||
try:
|
||||
client.clear_notifications()
|
||||
client.set_app_focus(False)
|
||||
tab1 = client.current_tab()
|
||||
client.notify("tabswitch")
|
||||
time.sleep(0.1)
|
||||
|
||||
tab2 = client.new_tab()
|
||||
time.sleep(0.1)
|
||||
|
||||
client.set_app_focus(True)
|
||||
client.select_tab(tab1)
|
||||
time.sleep(0.1)
|
||||
|
||||
items = client.list_notifications()
|
||||
target = next((n for n in items if n["tab_id"] == tab1), None)
|
||||
if target is None:
|
||||
result.failure("Expected notification for original tab")
|
||||
elif not target["is_read"]:
|
||||
result.failure("Expected notification to be marked read on tab switch")
|
||||
else:
|
||||
result.success("Notification marked read on tab switch")
|
||||
except Exception as e:
|
||||
result.failure(f"Exception: {e}")
|
||||
return result
|
||||
|
||||
|
||||
def run_tests() -> int:
|
||||
results = []
|
||||
with cmux() as client:
|
||||
results.append(test_clear_prior_notifications(client))
|
||||
results.append(test_suppress_when_focused(client))
|
||||
results.append(test_not_suppressed_when_inactive(client))
|
||||
results.append(test_mark_read_on_focus_change(client))
|
||||
results.append(test_mark_read_on_app_active(client))
|
||||
results.append(test_mark_read_on_tab_switch(client))
|
||||
client.set_app_focus(None)
|
||||
client.clear_notifications()
|
||||
|
||||
print("\nNotification Tests:")
|
||||
for r in results:
|
||||
status = "PASS" if r.passed else "FAIL"
|
||||
msg = f" - {r.message}" if r.message else ""
|
||||
print(f"{status}: {r.name}{msg}")
|
||||
|
||||
passed = sum(1 for r in results if r.passed)
|
||||
total = len(results)
|
||||
if passed == total:
|
||||
print("\n🎉 All notification tests passed!")
|
||||
return 0
|
||||
print(f"\n⚠️ {total - passed} test(s) failed")
|
||||
return 1
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(run_tests())
|
||||
Loading…
Add table
Add a link
Reference in a new issue