Fix Ctrl+C/D handling and add Unix socket control API for testing

Key changes:
- Fix keyboard input handling for control characters in GhosttyTerminalView
  - Set consumed_mods correctly (exclude Ctrl/Cmd from consumed mods)
  - Send unmodified character text for Ctrl+key combinations
  - Add unshifted_codepoint support for proper key encoding

- Add TerminalController with Unix socket API (/tmp/ghosttytabs.sock)
  - Commands: send, send_key, list_tabs, new_tab, close_tab, select_tab
  - Supports ctrl-c, ctrl-d, ctrl-z, enter, tab, escape, etc.

- Add Python test client and automated test suite
  - tests/ghosttytabs.py - Python client library
  - tests/test_ctrl_socket.py - Main Ctrl+C/D test suite (4 tests)
  - tests/test_signals_auto.py - Standalone PTY signal tests

- Update CLAUDE.md with socket API documentation and testing guide
- Update .gitignore for Python cache files

This fixes Ctrl+C/D not working in apps like claude-code, btop, opencode
while continuing to work in simpler apps like htop.
This commit is contained in:
Lawrence Chen 2026-01-22 02:20:51 -08:00
parent f969298c6e
commit cd0f6200c0
12 changed files with 1519 additions and 21 deletions

6
.gitignore vendored
View file

@ -12,3 +12,9 @@ xcuserdata/
# GhosttyKit binary (rebuild from /tmp/ghostty with zig build)
GhosttyKit.xcframework/
# Python
__pycache__/
*.pyc
*.pyo
.pytest_cache/

111
CLAUDE.md
View file

@ -31,7 +31,11 @@ cp -R /tmp/ghostty/macos/GhosttyKit.xcframework /Users/lawrencechen/fun/cmux-ter
- `TabManager.swift` - Tab state management
- `GhosttyTerminalView.swift` - libghostty terminal integration
- `GhosttyConfig.swift` - Ghostty config parser
- `GhosttyKit.xcframework/` - libghostty static library
- `TerminalController.swift` - Unix socket server for programmatic control
- `tests/` - Test files and utilities
- `ghosttytabs.py` - Python client library for socket API
- `test_ctrl_socket.py` - Main automated test suite
- `GhosttyKit.xcframework/` - libghostty static library (gitignored, rebuild from /tmp/ghostty)
- `ghostty.h` - Ghostty C API header
- `GhosttyTabs-Bridging-Header.h` - Swift bridging header
@ -45,3 +49,108 @@ cp -R /tmp/ghostty/macos/GhosttyKit.xcframework /Users/lawrencechen/fun/cmux-ter
### Config
Reads user's Ghostty config from:
`~/Library/Application Support/com.mitchellh.ghostty/config`
## Testing
### Unix Socket Control API
GhosttyTabs exposes a Unix socket at `/tmp/ghosttytabs.sock` for programmatic control and automated testing. The socket is created when the app launches.
#### Socket Commands
Text-based protocol with newline-delimited commands:
| Command | Description | Response |
|---------|-------------|----------|
| `ping` | Check if server is running | `PONG` |
| `list_tabs` | List all tabs | `* 0: <UUID> <title>` (per line) |
| `new_tab` | Create a new tab | `OK <UUID>` |
| `close_tab <id>` | Close tab by UUID | `OK` or `ERROR: ...` |
| `select_tab <id\|index>` | Select tab by UUID or index | `OK` or `ERROR: ...` |
| `current_tab` | Get current tab UUID | `<UUID>` |
| `send <text>` | Send text to terminal | `OK` |
| `send_key <key>` | Send special key | `OK` |
| `help` | Show available commands | Help text |
#### Special Keys for `send_key`
- `ctrl-c`, `ctrl-d`, `ctrl-z`, `ctrl-\` - Control signals
- `enter`, `tab`, `escape`, `backspace` - Common keys
- `ctrl-<letter>` - Any control+letter combination
#### Text Escaping for `send`
Use `\n` for Enter (carriage return), `\t` for tab, `\r` for raw CR.
### Python Client Library
Located at `tests/ghosttytabs.py`:
```python
from ghosttytabs import GhosttyTabs
with GhosttyTabs() as client:
# Send text with Enter
client.send("echo hello\n")
# Send special keys
client.send_ctrl_c() # Interrupt
client.send_ctrl_d() # EOF
client.send_key("enter")
# Tab management
tabs = client.list_tabs()
client.new_tab()
client.select_tab(0)
```
### Running Tests
```bash
# Build and launch the app first
pkill -9 GhosttyTabs 2>/dev/null
xcodebuild -scheme GhosttyTabs -configuration Release build
open ~/Library/Developer/Xcode/DerivedData/GhosttyTabs-cbjivvtpirygxbbgqlpdpiiyjnwh/Build/Products/Release/GhosttyTabs.app
sleep 3
# Run the main test suite (tests Ctrl+C, Ctrl+D)
python3 tests/test_ctrl_socket.py
# Interactive CLI for manual testing
python3 tests/ghosttytabs.py
```
### Writing New Tests
1. **Use marker files for verification** - Create temp files to verify commands executed:
```python
marker = Path(tempfile.gettempdir()) / f"test_marker_{os.getpid()}"
client.send(f"touch {marker}\n")
time.sleep(0.5)
assert marker.exists()
```
2. **Allow settling time** - Terminal commands need time to execute:
```python
client.send("sleep 5\n")
time.sleep(0.3) # Wait for command to start
client.send_ctrl_c()
time.sleep(0.3) # Wait for interrupt
```
3. **Clean up marker files** - Always remove test artifacts:
```python
try:
# test code
finally:
marker.unlink(missing_ok=True)
```
### Test Files
- `tests/ghosttytabs.py` - Python client library for socket API
- `tests/test_ctrl_socket.py` - Automated Ctrl+C/D test suite (main tests)
- `tests/test_signals_auto.py` - PTY-based signal tests (standalone)
- `tests/test_ctrl_interactive.py` - Interactive manual tests
- `tests/test_ctrl_signals.sh` - Simple bash signal test
- `tests/test_app_keystrokes.sh` - AppleScript keystroke tests (deprecated)

View file

@ -13,6 +13,7 @@
A5001004 /* GhosttyConfig.swift in Sources */ = {isa = PBXBuildFile; fileRef = A5001014 /* GhosttyConfig.swift */; };
A5001005 /* GhosttyTerminalView.swift in Sources */ = {isa = PBXBuildFile; fileRef = A5001015 /* GhosttyTerminalView.swift */; };
A5001006 /* GhosttyKit.xcframework in Frameworks */ = {isa = PBXBuildFile; fileRef = A5001016 /* GhosttyKit.xcframework */; };
A5001007 /* TerminalController.swift in Sources */ = {isa = PBXBuildFile; fileRef = A5001019 /* TerminalController.swift */; };
/* End PBXBuildFile section */
/* Begin PBXCopyFilesBuildPhase section */
@ -38,6 +39,7 @@
A5001016 /* GhosttyKit.xcframework */ = {isa = PBXFileReference; lastKnownFileType = wrapper.xcframework; path = GhosttyKit.xcframework; sourceTree = "<group>"; };
A5001017 /* ghostty.h */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.c.h; path = ghostty.h; sourceTree = "<group>"; };
A5001018 /* GhosttyTabs-Bridging-Header.h */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.c.h; path = "GhosttyTabs-Bridging-Header.h"; sourceTree = "<group>"; };
A5001019 /* TerminalController.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = TerminalController.swift; sourceTree = "<group>"; };
/* End PBXFileReference section */
/* Begin PBXFrameworksBuildPhase section */
@ -71,6 +73,7 @@
A5001013 /* TabManager.swift */,
A5001014 /* GhosttyConfig.swift */,
A5001015 /* GhosttyTerminalView.swift */,
A5001019 /* TerminalController.swift */,
);
path = Sources;
sourceTree = "<group>";
@ -141,6 +144,7 @@
A5001003 /* TabManager.swift in Sources */,
A5001004 /* GhosttyConfig.swift in Sources */,
A5001005 /* GhosttyTerminalView.swift in Sources */,
A5001007 /* TerminalController.swift in Sources */,
);
runOnlyForDeploymentPostprocessing = 0;
};

View file

@ -4,10 +4,19 @@ import SwiftUI
struct GhosttyTabsApp: App {
@StateObject private var tabManager = TabManager()
init() {
// Start the terminal controller for programmatic control
// This runs after TabManager is created via @StateObject
}
var body: some Scene {
WindowGroup {
ContentView()
.environmentObject(tabManager)
.onAppear {
// Start the Unix socket controller for programmatic access
TerminalController.shared.start(tabManager: tabManager)
}
}
.windowStyle(.hiddenTitleBar)
.commands {

View file

@ -342,10 +342,12 @@ class GhosttyNSView: NSView {
keyEvent.action = action
keyEvent.keycode = UInt32(event.keyCode)
keyEvent.mods = modsFromEvent(event)
keyEvent.consumed_mods = GHOSTTY_MODS_NONE
// Control and Command never contribute to text translation
keyEvent.consumed_mods = consumedModsFromEvent(event)
keyEvent.composing = markedText.length > 0
keyEvent.unshifted_codepoint = unshiftedCodepointFromEvent(event)
// Use accumulated text from insertText, or fall back to event characters
// Use accumulated text from insertText (for IME), or compute text for key
if let accumulated = keyTextAccumulator, !accumulated.isEmpty {
for text in accumulated {
text.withCString { ptr in
@ -354,28 +356,18 @@ class GhosttyNSView: NSView {
}
}
} else {
// No accumulated text - send the key with event characters
if let chars = event.characters, !chars.isEmpty {
// Filter out control characters
var hasControlChars = false
for scalar in chars.unicodeScalars where scalar.value < 0x20 {
hasControlChars = true
break
}
if hasControlChars {
keyEvent.text = nil
} else {
chars.withCString { ptr in
keyEvent.text = ptr
_ = ghostty_surface_key(surface, keyEvent)
return
}
return
// Get the appropriate text for this key event
// For control characters, this returns the unmodified character
// so Ghostty's KeyEncoder can handle ctrl encoding
if let text = textForKeyEvent(event) {
text.withCString { ptr in
keyEvent.text = ptr
_ = ghostty_surface_key(surface, keyEvent)
}
} else {
keyEvent.text = nil
_ = ghostty_surface_key(surface, keyEvent)
}
_ = ghostty_surface_key(surface, keyEvent)
}
}
@ -420,6 +412,59 @@ class GhosttyNSView: NSView {
return ghostty_input_mods_e(rawValue: mods)
}
/// Consumed mods are modifiers that were used for text translation.
/// Control and Command never contribute to text translation, so they
/// should be excluded from consumed_mods.
private func consumedModsFromEvent(_ event: NSEvent) -> ghostty_input_mods_e {
var mods = GHOSTTY_MODS_NONE.rawValue
// Only include Shift and Option as potentially consumed
// Control and Command are never consumed for text translation
if event.modifierFlags.contains(.shift) { mods |= GHOSTTY_MODS_SHIFT.rawValue }
if event.modifierFlags.contains(.option) { mods |= GHOSTTY_MODS_ALT.rawValue }
return ghostty_input_mods_e(rawValue: mods)
}
/// Get the characters for a key event with control character handling.
/// When control is pressed, we get the character without the control modifier
/// so Ghostty's KeyEncoder can apply its own control character encoding.
private func textForKeyEvent(_ event: NSEvent) -> String? {
// First try charactersIgnoringModifiers to get the base character
// This is important for control keys - we want 'c' not '\x03' (ETX)
if event.modifierFlags.contains(.control) {
// For control+key, return the unmodified character
// Ghostty's KeyEncoder will handle the ctrl encoding internally
return event.charactersIgnoringModifiers
}
guard let chars = event.characters, !chars.isEmpty else {
return nil
}
// Check if the first character is a control character or PUA
if let scalar = chars.unicodeScalars.first {
// Control characters (< 0x20) should not be sent as text
// Ghostty handles these internally via keycode + mods
if scalar.value < 0x20 {
return nil
}
// Private Use Area characters (function keys) should not be sent
if scalar.value >= 0xF700 && scalar.value <= 0xF8FF {
return nil
}
}
return chars
}
/// Get the unshifted codepoint for the key event
private func unshiftedCodepointFromEvent(_ event: NSEvent) -> UInt32 {
guard let chars = event.charactersIgnoringModifiers,
let scalar = chars.unicodeScalars.first else {
return 0
}
return scalar.value
}
// MARK: - Mouse Handling
override func mouseDown(with event: NSEvent) {

View file

@ -0,0 +1,370 @@
import Foundation
/// Unix socket-based controller for programmatic terminal control
/// Allows automated testing and external control of terminal tabs
class TerminalController {
static let shared = TerminalController()
private let socketPath = "/tmp/ghosttytabs.sock"
private var serverSocket: Int32 = -1
private var isRunning = false
private var clientHandlers: [Int32: Thread] = [:]
private weak var tabManager: TabManager?
private init() {}
func start(tabManager: TabManager) {
self.tabManager = tabManager
// Remove existing socket file
unlink(socketPath)
// Create socket
serverSocket = socket(AF_UNIX, SOCK_STREAM, 0)
guard serverSocket >= 0 else {
print("TerminalController: Failed to create socket")
return
}
// Bind to path
var addr = sockaddr_un()
addr.sun_family = sa_family_t(AF_UNIX)
socketPath.withCString { ptr in
withUnsafeMutablePointer(to: &addr.sun_path) { pathPtr in
let pathBuf = UnsafeMutableRawPointer(pathPtr).assumingMemoryBound(to: CChar.self)
strcpy(pathBuf, ptr)
}
}
let bindResult = withUnsafePointer(to: &addr) { ptr in
ptr.withMemoryRebound(to: sockaddr.self, capacity: 1) { sockaddrPtr in
bind(serverSocket, sockaddrPtr, socklen_t(MemoryLayout<sockaddr_un>.size))
}
}
guard bindResult >= 0 else {
print("TerminalController: Failed to bind socket")
close(serverSocket)
return
}
// Listen
guard listen(serverSocket, 5) >= 0 else {
print("TerminalController: Failed to listen on socket")
close(serverSocket)
return
}
isRunning = true
print("TerminalController: Listening on \(socketPath)")
// Accept connections in background thread
Thread.detachNewThread { [weak self] in
self?.acceptLoop()
}
}
func stop() {
isRunning = false
if serverSocket >= 0 {
close(serverSocket)
serverSocket = -1
}
unlink(socketPath)
}
private func acceptLoop() {
while isRunning {
var clientAddr = sockaddr_un()
var clientAddrLen = socklen_t(MemoryLayout<sockaddr_un>.size)
let clientSocket = withUnsafeMutablePointer(to: &clientAddr) { ptr in
ptr.withMemoryRebound(to: sockaddr.self, capacity: 1) { sockaddrPtr in
accept(serverSocket, sockaddrPtr, &clientAddrLen)
}
}
guard clientSocket >= 0 else {
if isRunning {
print("TerminalController: Accept failed")
}
continue
}
// Handle client in new thread
Thread.detachNewThread { [weak self] in
self?.handleClient(clientSocket)
}
}
}
private func handleClient(_ socket: Int32) {
defer { close(socket) }
var buffer = [UInt8](repeating: 0, count: 4096)
while isRunning {
let bytesRead = read(socket, &buffer, buffer.count - 1)
guard bytesRead > 0 else { break }
buffer[bytesRead] = 0
let command = String(cString: buffer)
let response = processCommand(command.trimmingCharacters(in: .whitespacesAndNewlines))
response.withCString { ptr in
_ = write(socket, ptr, strlen(ptr))
}
"\n".withCString { ptr in
_ = write(socket, ptr, 1)
}
}
}
private func processCommand(_ command: String) -> String {
let parts = command.split(separator: " ", maxSplits: 1).map(String.init)
guard !parts.isEmpty else { return "ERROR: Empty command" }
let cmd = parts[0].lowercased()
let args = parts.count > 1 ? parts[1] : ""
switch cmd {
case "ping":
return "PONG"
case "list_tabs":
return listTabs()
case "new_tab":
return newTab()
case "close_tab":
return closeTab(args)
case "select_tab":
return selectTab(args)
case "current_tab":
return currentTab()
case "send":
return sendInput(args)
case "send_key":
return sendKey(args)
case "help":
return helpText()
default:
return "ERROR: Unknown command '\(cmd)'. Use 'help' for available commands."
}
}
private func helpText() -> String {
return """
Available commands:
ping - Check if server is running
list_tabs - List all tabs with IDs
new_tab - Create a new tab
close_tab <id> - Close tab by ID
select_tab <id|index> - Select tab by ID or index (0-based)
current_tab - Get current tab ID
send <text> - Send text to current tab
send_key <key> - Send special key (ctrl-c, ctrl-d, enter, tab, escape)
help - Show this help
"""
}
private func listTabs() -> String {
guard let tabManager = tabManager else { return "ERROR: TabManager not available" }
var result: String = ""
DispatchQueue.main.sync {
let tabs = tabManager.tabs.enumerated().map { (index, tab) in
let selected = tab.id == tabManager.selectedTabId ? "*" : " "
return "\(selected) \(index): \(tab.id.uuidString) \(tab.title)"
}
result = tabs.joined(separator: "\n")
}
return result.isEmpty ? "No tabs" : result
}
private func newTab() -> String {
guard let tabManager = tabManager else { return "ERROR: TabManager not available" }
var newTabId: UUID?
DispatchQueue.main.sync {
tabManager.addTab()
newTabId = tabManager.selectedTabId
}
return "OK \(newTabId?.uuidString ?? "unknown")"
}
private func closeTab(_ tabId: String) -> String {
guard let tabManager = tabManager else { return "ERROR: TabManager not available" }
guard let uuid = UUID(uuidString: tabId) else { return "ERROR: Invalid tab ID" }
var success = false
DispatchQueue.main.sync {
if let tab = tabManager.tabs.first(where: { $0.id == uuid }) {
tabManager.closeTab(tab)
success = true
}
}
return success ? "OK" : "ERROR: Tab not found"
}
private func selectTab(_ arg: String) -> String {
guard let tabManager = tabManager else { return "ERROR: TabManager not available" }
var success = false
DispatchQueue.main.sync {
// Try as UUID first
if let uuid = UUID(uuidString: arg) {
if let tab = tabManager.tabs.first(where: { $0.id == uuid }) {
tabManager.selectTab(tab)
success = true
}
}
// Try as index
else if let index = Int(arg), index >= 0, index < tabManager.tabs.count {
tabManager.selectTab(at: index)
success = true
}
}
return success ? "OK" : "ERROR: Tab not found"
}
private func currentTab() -> String {
guard let tabManager = tabManager else { return "ERROR: TabManager not available" }
var result: String = ""
DispatchQueue.main.sync {
if let id = tabManager.selectedTabId {
result = id.uuidString
}
}
return result.isEmpty ? "ERROR: No tab selected" : result
}
private func sendInput(_ text: String) -> String {
guard let tabManager = tabManager else { return "ERROR: TabManager not available" }
var success = false
DispatchQueue.main.sync {
guard let selectedId = tabManager.selectedTabId,
let tab = tabManager.tabs.first(where: { $0.id == selectedId }),
let surface = tab.terminalSurface.surface else {
return
}
// Unescape common escape sequences
// Note: \n is converted to \r for terminal (Enter key sends \r)
let unescaped = text
.replacingOccurrences(of: "\\n", with: "\r")
.replacingOccurrences(of: "\\r", with: "\r")
.replacingOccurrences(of: "\\t", with: "\t")
// Send each character as a key event (like typing)
for char in unescaped {
String(char).withCString { ptr in
var keyEvent = ghostty_input_key_s()
keyEvent.action = GHOSTTY_ACTION_PRESS
keyEvent.keycode = 0
keyEvent.mods = GHOSTTY_MODS_NONE
keyEvent.consumed_mods = GHOSTTY_MODS_NONE
keyEvent.text = ptr
keyEvent.composing = false
_ = ghostty_surface_key(surface, keyEvent)
}
}
success = true
}
return success ? "OK" : "ERROR: Failed to send input"
}
private func sendKey(_ keyName: String) -> String {
guard let tabManager = tabManager else { return "ERROR: TabManager not available" }
var success = false
DispatchQueue.main.sync {
guard let selectedId = tabManager.selectedTabId,
let tab = tabManager.tabs.first(where: { $0.id == selectedId }),
let surface = tab.terminalSurface.surface else {
return
}
// Helper to send a key event with text
func sendKeyEvent(text: String, mods: ghostty_input_mods_e = GHOSTTY_MODS_NONE) {
text.withCString { ptr in
var keyEvent = ghostty_input_key_s()
keyEvent.action = GHOSTTY_ACTION_PRESS
keyEvent.keycode = 0
keyEvent.mods = mods
keyEvent.consumed_mods = GHOSTTY_MODS_NONE
keyEvent.text = ptr
keyEvent.composing = false
_ = ghostty_surface_key(surface, keyEvent)
}
}
switch keyName.lowercased() {
case "ctrl-c", "ctrl+c", "sigint":
// Send Ctrl+C - the control character 0x03 (ETX)
// Note: We send the raw control character, which the terminal
// interprets as an interrupt signal
sendKeyEvent(text: "\u{03}")
success = true
case "ctrl-d", "ctrl+d", "eof":
// Send Ctrl+D - the control character 0x04 (EOT)
sendKeyEvent(text: "\u{04}")
success = true
case "ctrl-z", "ctrl+z", "sigtstp":
// Send Ctrl+Z - the control character 0x1A (SUB)
sendKeyEvent(text: "\u{1A}")
success = true
case "ctrl-\\", "ctrl+\\", "sigquit":
// Send Ctrl+\ - the control character 0x1C (FS)
sendKeyEvent(text: "\u{1C}")
success = true
case "enter", "return":
sendKeyEvent(text: "\r")
success = true
case "tab":
sendKeyEvent(text: "\t")
success = true
case "escape", "esc":
sendKeyEvent(text: "\u{1B}")
success = true
case "backspace":
sendKeyEvent(text: "\u{7F}")
success = true
default:
// Check for ctrl-<letter> pattern
if keyName.lowercased().hasPrefix("ctrl-") || keyName.lowercased().hasPrefix("ctrl+") {
let letter = keyName.dropFirst(5).lowercased()
if letter.count == 1, let char = letter.first, char.isLetter {
// Convert letter to control character (a=1, b=2, ..., z=26)
let ctrlCode = UInt8(char.asciiValue! - Character("a").asciiValue! + 1)
let ctrlChar = String(UnicodeScalar(ctrlCode))
sendKeyEvent(text: ctrlChar)
success = true
}
}
}
}
return success ? "OK" : "ERROR: Unknown key '\(keyName)'"
}
deinit {
stop()
}
}

237
tests/ghosttytabs.py Executable file
View file

@ -0,0 +1,237 @@
#!/usr/bin/env python3
"""
GhosttyTabs Python Client
A client library for programmatically controlling GhosttyTabs via Unix socket.
Usage:
from ghosttytabs import GhosttyTabs
client = GhosttyTabs()
client.connect()
# Send text to terminal
client.send("echo hello\\n")
# Send special keys
client.send_key("ctrl-c")
client.send_key("ctrl-d")
# Tab management
client.new_tab()
client.list_tabs()
client.select_tab(0)
client.close()
"""
import socket
import os
from typing import Optional, List, Tuple
class GhosttyTabsError(Exception):
"""Exception raised for GhosttyTabs errors"""
pass
class GhosttyTabs:
"""Client for controlling GhosttyTabs via Unix socket"""
DEFAULT_SOCKET_PATH = "/tmp/ghosttytabs.sock"
def __init__(self, socket_path: str = None):
self.socket_path = socket_path or self.DEFAULT_SOCKET_PATH
self._socket: Optional[socket.socket] = None
def connect(self) -> None:
"""Connect to the GhosttyTabs socket"""
if self._socket is not None:
return
if not os.path.exists(self.socket_path):
raise GhosttyTabsError(
f"Socket not found at {self.socket_path}. "
"Is GhosttyTabs running?"
)
self._socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
try:
self._socket.connect(self.socket_path)
self._socket.settimeout(5.0)
except socket.error as e:
self._socket = None
raise GhosttyTabsError(f"Failed to connect: {e}")
def close(self) -> None:
"""Close the connection"""
if self._socket is not None:
self._socket.close()
self._socket = None
def __enter__(self):
self.connect()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
return False
def _send_command(self, command: str) -> str:
"""Send a command and receive response"""
if self._socket is None:
raise GhosttyTabsError("Not connected")
try:
self._socket.sendall((command + "\n").encode())
response = self._socket.recv(8192).decode().strip()
return response
except socket.timeout:
raise GhosttyTabsError("Command timed out")
except socket.error as e:
raise GhosttyTabsError(f"Socket error: {e}")
def ping(self) -> bool:
"""Check if the server is responding"""
response = self._send_command("ping")
return response == "PONG"
def list_tabs(self) -> List[Tuple[int, str, str, bool]]:
"""
List all tabs.
Returns list of (index, id, title, is_selected) tuples.
"""
response = self._send_command("list_tabs")
if response == "No tabs":
return []
tabs = []
for line in response.split("\n"):
if not line.strip():
continue
selected = line.startswith("*")
parts = line.lstrip("* ").split(" ", 2)
if len(parts) >= 3:
index = int(parts[0].rstrip(":"))
tab_id = parts[1]
title = parts[2] if len(parts) > 2 else ""
tabs.append((index, tab_id, title, selected))
return tabs
def new_tab(self) -> str:
"""Create a new tab. Returns the new tab's ID."""
response = self._send_command("new_tab")
if response.startswith("OK "):
return response[3:]
raise GhosttyTabsError(response)
def close_tab(self, tab_id: str) -> None:
"""Close a tab by ID"""
response = self._send_command(f"close_tab {tab_id}")
if not response.startswith("OK"):
raise GhosttyTabsError(response)
def select_tab(self, tab: str | int) -> None:
"""Select a tab by ID or index"""
response = self._send_command(f"select_tab {tab}")
if not response.startswith("OK"):
raise GhosttyTabsError(response)
def current_tab(self) -> str:
"""Get the current tab's ID"""
response = self._send_command("current_tab")
if response.startswith("ERROR"):
raise GhosttyTabsError(response)
return response
def send(self, text: str) -> None:
"""
Send text to the current terminal.
Use \\n for newline (Enter), \\t for tab, etc.
Note: The text is sent as-is. Use actual escape sequences:
client.send("echo hello\\n") # Sends: echo hello<Enter>
client.send("echo hello" + "\\n") # Same thing
"""
# Escape actual newlines/tabs to their backslash forms for protocol
# The server will unescape them
escaped = text.replace("\n", "\\n").replace("\r", "\\r").replace("\t", "\\t")
response = self._send_command(f"send {escaped}")
if not response.startswith("OK"):
raise GhosttyTabsError(response)
def send_key(self, key: str) -> None:
"""
Send a special key to the current terminal.
Supported keys:
ctrl-c, ctrl-d, ctrl-z, ctrl-\\
enter, tab, escape, backspace
ctrl-<letter> for any letter
"""
response = self._send_command(f"send_key {key}")
if not response.startswith("OK"):
raise GhosttyTabsError(response)
def send_line(self, text: str) -> None:
"""Send text followed by Enter"""
self.send(text + "\\n")
def send_ctrl_c(self) -> None:
"""Send Ctrl+C (SIGINT)"""
self.send_key("ctrl-c")
def send_ctrl_d(self) -> None:
"""Send Ctrl+D (EOF)"""
self.send_key("ctrl-d")
def help(self) -> str:
"""Get help text from server"""
return self._send_command("help")
def main():
"""CLI interface for ghosttytabs"""
import sys
import argparse
parser = argparse.ArgumentParser(description="GhosttyTabs CLI")
parser.add_argument("command", nargs="?", help="Command to send")
parser.add_argument("args", nargs="*", help="Command arguments")
parser.add_argument("-s", "--socket", default=GhosttyTabs.DEFAULT_SOCKET_PATH,
help="Socket path")
args = parser.parse_args()
try:
with GhosttyTabs(args.socket) as client:
if not args.command:
# Interactive mode
print("GhosttyTabs CLI (type 'help' for commands, 'quit' to exit)")
while True:
try:
line = input("> ").strip()
if line.lower() in ("quit", "exit"):
break
if line:
response = client._send_command(line)
print(response)
except EOFError:
break
except KeyboardInterrupt:
print()
break
else:
# Single command mode
command = args.command
if args.args:
command += " " + " ".join(args.args)
response = client._send_command(command)
print(response)
except GhosttyTabsError as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
main()

63
tests/test_app_keystrokes.sh Executable file
View file

@ -0,0 +1,63 @@
#!/bin/bash
# Test script that sends keystrokes to GhosttyTabs via AppleScript
# This tests the actual keyboard input path through the app
set -e
echo "=== GhosttyTabs Keystroke Test ==="
echo ""
# Check if GhosttyTabs is running
if ! pgrep -x "GhosttyTabs" > /dev/null; then
echo "Error: GhosttyTabs is not running"
echo "Please start GhosttyTabs first"
exit 1
fi
echo "GhosttyTabs is running"
echo ""
# Activate GhosttyTabs
osascript -e 'tell application "GhosttyTabs" to activate'
sleep 0.5
echo "Test 1: Testing Ctrl+C (SIGINT)"
echo " Typing 'sleep 30' and pressing Enter..."
# Type the command
osascript -e 'tell application "System Events" to keystroke "sleep 30"'
sleep 0.2
osascript -e 'tell application "System Events" to keystroke return'
sleep 0.5
echo " Sending Ctrl+C..."
# Send Ctrl+C
osascript -e 'tell application "System Events" to keystroke "c" using control down'
sleep 0.5
echo " If you see '^C' or the command was interrupted, Ctrl+C is working!"
echo ""
echo "Test 2: Testing Ctrl+D (EOF)"
echo " Starting cat command..."
# Type cat command
osascript -e 'tell application "System Events" to keystroke "cat"'
sleep 0.2
osascript -e 'tell application "System Events" to keystroke return'
sleep 0.5
echo " Sending Ctrl+D..."
# Send Ctrl+D
osascript -e 'tell application "System Events" to keystroke "d" using control down'
sleep 0.5
echo " If cat exited, Ctrl+D is working!"
echo ""
echo "=== Manual Verification Required ==="
echo "Please check the GhosttyTabs window to verify:"
echo " 1. The 'sleep 30' command was interrupted by Ctrl+C"
echo " 2. The 'cat' command exited after Ctrl+D"
echo ""
echo "If both worked, the fix is successful!"

121
tests/test_ctrl_interactive.py Executable file
View file

@ -0,0 +1,121 @@
#!/usr/bin/env python3
"""
Interactive test for Ctrl+C and Ctrl+D in GhosttyTabs terminal.
This script tests that control signals are properly handled.
Run this script inside the GhosttyTabs terminal.
Tests:
1. Ctrl+C (SIGINT) - Should interrupt a running process
2. Ctrl+D (EOF) - Should signal end-of-file on stdin
Usage:
python3 test_ctrl_interactive.py
"""
import signal
import sys
import os
def test_ctrl_c():
"""Test Ctrl+C signal handling"""
print("\n=== Test 1: Ctrl+C (SIGINT) ===")
print("This test will wait for you to press Ctrl+C.")
print("Press Ctrl+C now...")
received = [False]
def handler(signum, frame):
received[0] = True
print("\n✅ SUCCESS: SIGINT (Ctrl+C) received!")
old_handler = signal.signal(signal.SIGINT, handler)
try:
# Wait for up to 10 seconds for Ctrl+C
import time
for i in range(10):
if received[0]:
break
time.sleep(1)
if not received[0]:
print(f" Waiting... ({10-i-1}s remaining)")
if not received[0]:
print("\n❌ FAILED: No SIGINT received within 10 seconds")
print(" Ctrl+C may not be working correctly.")
return False
return True
finally:
signal.signal(signal.SIGINT, old_handler)
def test_ctrl_d():
"""Test Ctrl+D (EOF) handling"""
print("\n=== Test 2: Ctrl+D (EOF) ===")
print("This test will read from stdin.")
print("Press Ctrl+D (on empty line) to send EOF...")
print("Type something and press Enter, then Ctrl+D on empty line:")
try:
lines = []
while True:
try:
line = input("> ")
lines.append(line)
except EOFError:
print("\n✅ SUCCESS: EOF (Ctrl+D) received!")
print(f" Lines entered before EOF: {len(lines)}")
return True
except KeyboardInterrupt:
print("\n⚠️ Got Ctrl+C instead of Ctrl+D")
return False
def main():
print("=" * 50)
print("GhosttyTabs Control Signal Test")
print("=" * 50)
print("\nThis script tests if Ctrl+C and Ctrl+D work correctly.")
print("Run this inside the GhosttyTabs terminal to verify the fix.\n")
# Check if running in a terminal
if not os.isatty(sys.stdin.fileno()):
print("Warning: Not running in a terminal")
results = []
# Test Ctrl+C
try:
results.append(("Ctrl+C (SIGINT)", test_ctrl_c()))
except Exception as e:
print(f"Error in Ctrl+C test: {e}")
results.append(("Ctrl+C (SIGINT)", False))
# Test Ctrl+D
try:
results.append(("Ctrl+D (EOF)", test_ctrl_d()))
except Exception as e:
print(f"Error in Ctrl+D test: {e}")
results.append(("Ctrl+D (EOF)", False))
# Summary
print("\n" + "=" * 50)
print("Test Results Summary")
print("=" * 50)
all_passed = True
for name, passed in results:
status = "✅ PASS" if passed else "❌ FAIL"
print(f" {name}: {status}")
if not passed:
all_passed = False
print()
if all_passed:
print("All tests passed! Control signals are working correctly.")
else:
print("Some tests failed. Check the key input handling code.")
return 0 if all_passed else 1
if __name__ == "__main__":
sys.exit(main())

24
tests/test_ctrl_signals.sh Executable file
View file

@ -0,0 +1,24 @@
#!/bin/bash
# Test script to verify Ctrl+C and Ctrl+D work correctly in the terminal
# Run this script inside the GhosttyTabs terminal to test signal handling
set -e
echo "=== Control Signal Test Suite ==="
echo ""
# Test 1: Ctrl+C interrupt test
echo "Test 1: Ctrl+C (SIGINT) - Press Ctrl+C to interrupt the sleep"
echo " A long sleep will start. Press Ctrl+C to interrupt it."
echo " If Ctrl+C works, you should see 'SIGINT received!' within 2 seconds."
echo ""
echo "Starting sleep... (press Ctrl+C now)"
trap 'echo "SIGINT received! Ctrl+C is working correctly."; exit 0' INT
# Start a long sleep - user should interrupt this with Ctrl+C
sleep 30
# If we get here, Ctrl+C didn't work
echo "ERROR: Sleep completed without interruption. Ctrl+C may not be working!"
exit 1

248
tests/test_ctrl_socket.py Executable file
View file

@ -0,0 +1,248 @@
#!/usr/bin/env python3
"""
Automated tests for Ctrl+C and Ctrl+D using the GhosttyTabs socket interface.
Usage:
python3 test_ctrl_socket.py
Requirements:
- GhosttyTabs must be running with the socket controller enabled
"""
import os
import sys
import time
import tempfile
from pathlib import Path
# Add the directory containing ghosttytabs.py to the path
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from ghosttytabs import GhosttyTabs, GhosttyTabsError
class TestResult:
def __init__(self, name: str):
self.name = name
self.passed = False
self.message = ""
def success(self, msg: str = ""):
self.passed = True
self.message = msg
def failure(self, msg: str):
self.passed = False
self.message = msg
def test_connection(client: GhosttyTabs) -> TestResult:
"""Test that we can connect and ping the server"""
result = TestResult("Connection")
try:
if client.ping():
result.success("Connected and received PONG")
else:
result.failure("Ping failed")
except Exception as e:
result.failure(str(e))
return result
def test_ctrl_c(client: GhosttyTabs) -> TestResult:
"""
Test Ctrl+C by:
1. Starting sleep command
2. Sending Ctrl+C
3. Verifying shell responds to next command
"""
result = TestResult("Ctrl+C (SIGINT)")
marker = Path(tempfile.gettempdir()) / f"ghostty_ctrlc_{os.getpid()}"
try:
marker.unlink(missing_ok=True)
# Start a long sleep
client.send("sleep 30\n")
time.sleep(0.3)
# Send Ctrl+C to interrupt
client.send_ctrl_c()
time.sleep(0.3)
# If Ctrl+C worked, shell should accept new command
client.send(f"touch {marker}\n")
time.sleep(0.5)
if marker.exists():
result.success("Ctrl+C interrupted sleep, shell responsive")
marker.unlink(missing_ok=True)
else:
result.failure("Shell not responsive after Ctrl+C")
except Exception as e:
result.failure(f"Exception: {e}")
marker.unlink(missing_ok=True)
return result
def test_ctrl_d(client: GhosttyTabs) -> TestResult:
"""
Test Ctrl+D by:
1. Running cat command
2. Sending Ctrl+D
3. Verifying cat exits and next command runs
"""
result = TestResult("Ctrl+D (EOF)")
marker = Path(tempfile.gettempdir()) / f"ghostty_ctrld_{os.getpid()}"
try:
marker.unlink(missing_ok=True)
# Run cat (waits for input)
client.send("cat\n")
time.sleep(0.3)
# Send Ctrl+D (EOF)
client.send_ctrl_d()
time.sleep(0.3)
# If Ctrl+D worked, cat should exit and we can run another command
client.send(f"touch {marker}\n")
time.sleep(0.5)
if marker.exists():
result.success("Ctrl+D sent EOF, cat exited")
marker.unlink(missing_ok=True)
else:
result.failure("cat did not exit after Ctrl+D")
except Exception as e:
result.failure(f"Exception: {e}")
marker.unlink(missing_ok=True)
return result
def test_ctrl_c_python(client: GhosttyTabs) -> TestResult:
"""
Test Ctrl+C with Python process
"""
result = TestResult("Ctrl+C in Python")
marker = Path(tempfile.gettempdir()) / f"ghostty_pyctrlc_{os.getpid()}"
try:
marker.unlink(missing_ok=True)
# Start Python that loops forever
client.send("python3 -c 'import time; [time.sleep(1) for _ in iter(int, 1)]'\n")
time.sleep(1.0) # Give Python time to start
# Send Ctrl+C
client.send_ctrl_c()
time.sleep(0.5)
# If Ctrl+C worked, shell should accept new command
client.send(f"touch {marker}\n")
time.sleep(0.5)
if marker.exists():
result.success("Ctrl+C interrupted Python process")
marker.unlink(missing_ok=True)
else:
result.failure("Python not interrupted by Ctrl+C")
except Exception as e:
result.failure(f"Exception: {type(e).__name__}: {e}")
marker.unlink(missing_ok=True)
return result
def run_tests():
"""Run all tests"""
print("=" * 60)
print("GhosttyTabs Ctrl+C/D Automated Tests")
print("=" * 60)
print()
socket_path = GhosttyTabs.DEFAULT_SOCKET_PATH
if not os.path.exists(socket_path):
print(f"Error: Socket not found at {socket_path}")
print("Please make sure GhosttyTabs is running.")
return 1
results = []
try:
with GhosttyTabs() as client:
# Test connection
print("Testing connection...")
results.append(test_connection(client))
status = "" if results[-1].passed else ""
print(f" {status} {results[-1].message}")
print()
if not results[-1].passed:
return 1
# Test Ctrl+C
print("Testing Ctrl+C (SIGINT)...")
results.append(test_ctrl_c(client))
status = "" if results[-1].passed else ""
print(f" {status} {results[-1].message}")
print()
time.sleep(0.5)
# Test Ctrl+D
print("Testing Ctrl+D (EOF)...")
results.append(test_ctrl_d(client))
status = "" if results[-1].passed else ""
print(f" {status} {results[-1].message}")
print()
time.sleep(0.5)
# Test Ctrl+C in Python
print("Testing Ctrl+C in Python process...")
results.append(test_ctrl_c_python(client))
status = "" if results[-1].passed else ""
print(f" {status} {results[-1].message}")
print()
except GhosttyTabsError as e:
print(f"Error: {e}")
return 1
# Summary
print("=" * 60)
print("Test Results Summary")
print("=" * 60)
passed = sum(1 for r in results if r.passed)
total = len(results)
for r in results:
status = "✅ PASS" if r.passed else "❌ FAIL"
print(f" {r.name}: {status}")
if not r.passed and r.message:
print(f" {r.message}")
print()
print(f"Passed: {passed}/{total}")
if passed == total:
print("\n🎉 All tests passed!")
return 0
else:
print(f"\n⚠️ {total - passed} test(s) failed")
return 1
if __name__ == "__main__":
sys.exit(run_tests())

262
tests/test_signals_auto.py Normal file
View file

@ -0,0 +1,262 @@
#!/usr/bin/env python3
"""
Automated test for signal handling - tests that SIGINT and EOF work correctly.
This test doesn't require manual interaction.
"""
import subprocess
import signal
import sys
import os
import time
import pty
import select
import termios
import tty
def test_sigint_in_pty():
"""Test that Ctrl+C (SIGINT) works in a PTY"""
print("Test 1: SIGINT via PTY (simulating Ctrl+C)")
# Create a PTY pair
master_fd, slave_fd = pty.openpty()
# Configure the PTY for proper signal handling
# This enables ISIG so Ctrl+C generates SIGINT
attrs = termios.tcgetattr(slave_fd)
attrs[3] |= termios.ISIG # Enable signals
attrs[3] |= termios.ICANON # Canonical mode
attrs[6][termios.VINTR] = 3 # Ctrl+C = SIGINT
termios.tcsetattr(slave_fd, termios.TCSANOW, attrs)
# Start a process that waits for SIGINT
# Use start_new_session=True to create new session with controlling terminal
proc = subprocess.Popen(
['python3', '-c', '''
import signal
import sys
import time
received = False
def handler(sig, frame):
global received
received = True
print("SIGINT_RECEIVED", flush=True)
sys.exit(0)
signal.signal(signal.SIGINT, handler)
print("WAITING", flush=True)
for i in range(10):
time.sleep(0.5)
if received:
break
if not received:
print("TIMEOUT", flush=True)
sys.exit(1)
'''],
stdin=slave_fd,
stdout=slave_fd,
stderr=slave_fd,
start_new_session=True
)
os.close(slave_fd)
try:
# Wait for "WAITING" message
output = b""
for _ in range(20):
if select.select([master_fd], [], [], 0.1)[0]:
output += os.read(master_fd, 1024)
if b"WAITING" in output:
break
if b"WAITING" not in output:
print(" ❌ FAILED: Process didn't start properly")
return False
# Send SIGINT directly to the process group
# This simulates what the terminal does when it receives Ctrl+C
os.kill(-proc.pid, signal.SIGINT)
# Wait for response
output = b""
for _ in range(20):
if select.select([master_fd], [], [], 0.1)[0]:
output += os.read(master_fd, 1024)
if b"SIGINT_RECEIVED" in output:
break
proc.wait(timeout=2)
if b"SIGINT_RECEIVED" in output:
print(" ✅ PASSED: SIGINT received via Ctrl+C in PTY")
return True
else:
print(f" ❌ FAILED: No SIGINT received. Output: {output}")
return False
except Exception as e:
print(f" ❌ FAILED: {e}")
return False
finally:
try:
proc.kill()
except:
pass
os.close(master_fd)
def test_eof_in_pty():
"""Test that Ctrl+D (EOF) works in a PTY"""
print("\nTest 2: EOF via PTY (simulating Ctrl+D)")
master_fd, slave_fd = pty.openpty()
proc = subprocess.Popen(
['python3', '-c', '''
import sys
print("WAITING", flush=True)
try:
line = input()
if line == "":
print("EMPTY_LINE", flush=True)
else:
print(f"GOT: {line}", flush=True)
except EOFError:
print("EOF_RECEIVED", flush=True)
sys.exit(0)
'''],
stdin=slave_fd,
stdout=slave_fd,
stderr=slave_fd,
preexec_fn=os.setsid
)
os.close(slave_fd)
try:
# Wait for "WAITING"
output = b""
for _ in range(20):
if select.select([master_fd], [], [], 0.1)[0]:
output += os.read(master_fd, 1024)
if b"WAITING" in output:
break
if b"WAITING" not in output:
print(" ❌ FAILED: Process didn't start properly")
return False
# Send Ctrl+D (ASCII 0x04) through the PTY
os.write(master_fd, b'\x04')
# Wait for response
output = b""
for _ in range(20):
if select.select([master_fd], [], [], 0.1)[0]:
output += os.read(master_fd, 1024)
if b"EOF_RECEIVED" in output or b"EMPTY_LINE" in output:
break
proc.wait(timeout=2)
if b"EOF_RECEIVED" in output:
print(" ✅ PASSED: EOF received via Ctrl+D in PTY")
return True
else:
print(f" ❌ FAILED: No EOF received. Output: {output}")
return False
except Exception as e:
print(f" ❌ FAILED: {e}")
return False
finally:
try:
proc.kill()
except:
pass
os.close(master_fd)
def test_direct_signal():
"""Test direct signal sending (not through keyboard)"""
print("\nTest 3: Direct SIGINT signal")
proc = subprocess.Popen(
['python3', '-c', '''
import signal
import time
import sys
def handler(sig, frame):
print("SIGINT_RECEIVED", flush=True)
sys.exit(0)
signal.signal(signal.SIGINT, handler)
print("WAITING", flush=True)
sys.stdout.flush()
time.sleep(10)
print("TIMEOUT", flush=True)
'''],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
try:
# Wait for process to start
time.sleep(0.2)
# Send SIGINT directly
proc.send_signal(signal.SIGINT)
stdout, stderr = proc.communicate(timeout=2)
if b"SIGINT_RECEIVED" in stdout:
print(" ✅ PASSED: Direct SIGINT works")
return True
else:
print(f" ❌ FAILED: Output: {stdout}")
return False
except Exception as e:
print(f" ❌ FAILED: {e}")
return False
finally:
try:
proc.kill()
except:
pass
def main():
print("=" * 50)
print("Automated Signal Handling Tests")
print("=" * 50)
print()
results = []
results.append(("SIGINT via PTY (Ctrl+C)", test_sigint_in_pty()))
results.append(("EOF via PTY (Ctrl+D)", test_eof_in_pty()))
results.append(("Direct SIGINT", test_direct_signal()))
print()
print("=" * 50)
print("Results Summary")
print("=" * 50)
all_passed = True
for name, passed in results:
status = "✅ PASS" if passed else "❌ FAIL"
print(f" {name}: {status}")
if not passed:
all_passed = False
print()
if all_passed:
print("All tests passed!")
return 0
else:
print("Some tests failed.")
return 1
if __name__ == "__main__":
sys.exit(main())