diff --git a/CLAUDE.md b/CLAUDE.md index 1bdd00ae..633454cb 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -60,3 +60,4 @@ Create three commits: - Never combine unrelated changes in one commit - Keep commit messages concise but descriptive - If all changes are related to one logical unit, a single commit is fine +- `git commit --amend` can be used for immediate small fixes to the last commit, but not for unrelated changes diff --git a/src/agent/cli.ts b/src/agent/cli.ts index 32de4bc3..ab017ace 100644 --- a/src/agent/cli.ts +++ b/src/agent/cli.ts @@ -9,6 +9,7 @@ type CliOptions = { thinking?: string | undefined; cwd?: string | undefined; session?: string | undefined; + debug?: boolean | undefined; help?: boolean | undefined; }; @@ -24,6 +25,7 @@ function printUsage() { console.log(" --thinking LEVEL Thinking level"); console.log(" --cwd DIR Working directory for commands"); console.log(" --session ID Session ID for conversation persistence"); + console.log(" --debug Enable debug logging"); console.log(" --help, -h Show this help"); } @@ -67,6 +69,10 @@ function parseArgs(argv: string[]) { opts.session = args.shift(); continue; } + if (arg === "--debug") { + opts.debug = true; + continue; + } if (arg === "--") { promptParts.push(...args); break; @@ -110,6 +116,7 @@ async function main() { thinkingLevel: opts.thinking as any, cwd: opts.cwd, sessionId: opts.session, + debug: opts.debug, }); // If it's a newly created session, notify user of sessionId diff --git a/src/agent/runner.ts b/src/agent/runner.ts index 95427a0d..53c558a8 100644 --- a/src/agent/runner.ts +++ b/src/agent/runner.ts @@ -44,6 +44,7 @@ export class Agent { private readonly session: SessionManager; private readonly profile?: ProfileManager; private readonly contextWindowGuard: ContextWindowGuardResult; + private readonly debug: boolean; /** Current session ID */ readonly sessionId: string; @@ -52,6 +53,7 @@ export class Agent { const stdout = options.logger?.stdout ?? process.stdout; const stderr = options.logger?.stderr ?? process.stderr; this.output = createAgentOutput({ stdout, stderr }); + this.debug = options.debug ?? false; this.agent = new PiAgentCore(); @@ -142,6 +144,26 @@ export class Agent { const restoredMessages = this.session.loadMessages(); if (restoredMessages.length > 0) { + if (this.debug) { + console.error(`[debug] Restoring ${restoredMessages.length} messages from session`); + for (const msg of restoredMessages) { + const msgAny = msg as any; + const content = Array.isArray(msgAny.content) + ? msgAny.content.map((c: any) => c.type || "text").join(", ") + : typeof msgAny.content; + console.error(`[debug] ${msg.role}: ${content}`); + if (Array.isArray(msgAny.content)) { + for (const block of msgAny.content) { + if (block.type === "tool_use") { + console.error(`[debug] tool_use id: ${block.id}, name: ${block.name}`); + } + if (block.type === "tool_result") { + console.error(`[debug] tool_result tool_use_id: ${block.tool_use_id}`); + } + } + } + } + } this.agent.replaceMessages(restoredMessages); } diff --git a/src/agent/session/compaction.ts b/src/agent/session/compaction.ts index 935abe1b..1ce60ae1 100644 --- a/src/agent/session/compaction.ts +++ b/src/agent/session/compaction.ts @@ -22,6 +22,58 @@ export type CompactionResult = { reason: "count" | "tokens" | "summary"; }; +/** + * Find a safe compaction point that doesn't break tool_use/tool_result pairs. + * Returns the index to start keeping messages from. + */ +function findSafeCompactionPoint(messages: AgentMessage[], targetStart: number): number { + let start = targetStart; + + // Move forward until we find a safe starting point + while (start < messages.length) { + const msg = messages[start]; + if (!msg) { + start++; + continue; + } + + // Safe to start from a user message + if (msg.role === "user") { + // But make sure it's not a toolResult without corresponding tool_use + const msgAny = msg as any; + if (Array.isArray(msgAny.content)) { + const hasToolResult = msgAny.content.some((b: any) => b.type === "tool_result"); + if (!hasToolResult) { + break; // Safe: user message without tool_result + } + } else { + break; // Safe: simple user message + } + } + + // toolResult messages need their corresponding tool_use, skip them + // assistant messages are ok to start from if they don't reference missing tool calls + if (msg.role === "assistant") { + // Check if previous messages have the required tool_use for any following tool_result + const nextMsg = messages[start + 1]; + if (nextMsg && nextMsg.role === "user") { + const nextAny = nextMsg as any; + if (Array.isArray(nextAny.content)) { + const hasToolResult = nextAny.content.some((b: any) => b.type === "tool_result"); + if (hasToolResult) { + // This assistant message has tool_use that's needed by next message + break; + } + } + } + } + + start++; + } + + return start; +} + /** * Simple compression based on message count (legacy logic, maintains backward compatibility) */ @@ -31,7 +83,22 @@ export function compactMessagesByCount( keepLast: number, ): CompactionResult | null { if (messages.length <= maxMessages) return null; - const kept = messages.slice(-keepLast); + + const targetStart = messages.length - keepLast; + const safeStart = findSafeCompactionPoint(messages, targetStart); + + // If we can't find a safe point, don't compact + if (safeStart >= messages.length) { + return null; + } + + const kept = messages.slice(safeStart); + + // Don't compact if we'd keep almost everything anyway + if (kept.length >= messages.length - 2) { + return null; + } + return { kept, removedCount: messages.length - kept.length, diff --git a/src/agent/session/session-manager.ts b/src/agent/session/session-manager.ts index c635c58e..1f309925 100644 --- a/src/agent/session/session-manager.ts +++ b/src/agent/session/session-manager.ts @@ -1,9 +1,19 @@ import type { AgentMessage } from "@mariozechner/pi-agent-core"; -import type { Model } from "@mariozechner/pi-ai"; +import { getModel, type Model } from "@mariozechner/pi-ai"; import type { SessionEntry, SessionMeta } from "./types.js"; import { appendEntry, readEntries, writeEntries } from "./storage.js"; import { compactMessages, compactMessagesAsync } from "./compaction.js"; +/** Get Kimi model for summarization (use a cheaper model than k2-thinking) */ +function getSummaryModel(): Model { + return (getModel as (p: string, m: string) => Model)("kimi", "moonshot-v1-128k"); +} + +/** Get Kimi API key */ +function getSummaryApiKey(): string | undefined { + return process.env.KIMI_API_KEY ?? process.env.MOONSHOT_API_KEY; +} + export type SessionManagerOptions = { sessionId: string; baseDir?: string | undefined; @@ -171,9 +181,12 @@ export class SessionManager { let result; if (this.compactionMode === "summary") { - // Summary mode requires model and apiKey - if (!this.model || !this.apiKey) { - // Downgrade to tokens mode + // Use provided model/apiKey or fall back to Kimi + const model = this.model ?? getSummaryModel(); + const apiKey = this.apiKey ?? getSummaryApiKey(); + + if (!apiKey) { + // No API key available, downgrade to tokens mode result = compactMessages(messages, { mode: "tokens", contextWindowTokens: this.contextWindowTokens, @@ -185,8 +198,8 @@ export class SessionManager { } else { result = await compactMessagesAsync(messages, { mode: "summary", - model: this.model, - apiKey: this.apiKey, + model, + apiKey, contextWindowTokens: this.contextWindowTokens, systemPrompt: this.systemPrompt, reserveTokens: this.reserveTokens, diff --git a/src/agent/tools/exec.ts b/src/agent/tools/exec.ts index 222141f1..169678ee 100644 --- a/src/agent/tools/exec.ts +++ b/src/agent/tools/exec.ts @@ -1,6 +1,7 @@ import { spawn } from "child_process"; import { Type } from "@sinclair/typebox"; import type { AgentTool } from "@mariozechner/pi-agent-core"; +import { registerProcess } from "./process-registry.js"; const ExecSchema = Type.Object({ command: Type.String({ description: "Shell command to execute." }), @@ -8,39 +9,57 @@ const ExecSchema = Type.Object({ timeoutMs: Type.Optional( Type.Number({ description: "Timeout in milliseconds.", minimum: 0 }), ), + yieldMs: Type.Optional( + Type.Number({ + description: + "Auto-background threshold in milliseconds. If command doesn't complete within this time, it runs in background. Default 5000ms. Set to 0 to disable auto-backgrounding.", + minimum: 0, + }), + ), }); type ExecArgs = { command: string; cwd?: string; timeoutMs?: number; + yieldMs?: number; }; export type ExecResult = { output: string; exitCode: number | null; truncated: boolean; + backgrounded?: boolean; + processId?: string; }; const MAX_OUTPUT_BYTES = 64 * 1024; +const DEFAULT_YIELD_MS = 5000; export function createExecTool(defaultCwd?: string): AgentTool { return { name: "exec", label: "Exec", - description: "Execute a shell command and wait for it to complete. Returns stdout/stderr output. Use this for short-lived commands only (e.g., ls, cat, pip install). Do NOT use for long-running processes like servers - use the 'process' tool instead.", + description: + "Execute a shell command. If the command doesn't complete within yieldMs (default 5s), it automatically runs in background and returns a process ID. Use 'process output ' to check output, 'process status ' to check status, 'process stop ' to terminate.", parameters: ExecSchema, execute: async (_toolCallId, args, signal) => { - const { command, cwd, timeoutMs } = args as ExecArgs; - return new Promise((resolve, reject) => { + const { command, cwd, timeoutMs, yieldMs = DEFAULT_YIELD_MS } = args as ExecArgs; + const effectiveCwd = cwd || defaultCwd; + + return new Promise((resolve) => { const child = spawn(command, { shell: true, - cwd: cwd || defaultCwd, + cwd: effectiveCwd, stdio: ["ignore", "pipe", "pipe"], }); let timedOut = false; + let yielded = false; let timeout: NodeJS.Timeout | undefined; + let yieldTimer: NodeJS.Timeout | undefined; + + // Timeout handling (hard kill) if (timeoutMs && timeoutMs > 0) { timeout = setTimeout(() => { timedOut = true; @@ -48,6 +67,36 @@ export function createExecTool(defaultCwd?: string): AgentTool 0) { + yieldTimer = setTimeout(() => { + if (yielded) return; + yielded = true; + + // Clear timeout since we're backgrounding + if (timeout) clearTimeout(timeout); + + // Register to shared process registry + const processId = registerProcess(child, command, effectiveCwd, "exec"); + + resolve({ + content: [ + { + type: "text", + text: `Command running in background. Process ID: ${processId}\nUse 'process output ${processId}' to check output.`, + }, + ], + details: { + output: "", + exitCode: null, + truncated: false, + backgrounded: true, + processId, + }, + }); + }, yieldMs); + } + const chunks: Buffer[] = []; let size = 0; let truncated = false; @@ -70,11 +119,17 @@ export function createExecTool(defaultCwd?: string): AgentTool { if (timeout) clearTimeout(timeout); + if (yieldTimer) clearTimeout(yieldTimer); spawnError = err; // Don't reject, let close event handle }); + child.on("close", (code) => { if (timeout) clearTimeout(timeout); + if (yieldTimer) clearTimeout(yieldTimer); + + // If already backgrounded, don't resolve again + if (yielded) return; // If there's a spawn error, return error message if (spawnError) { @@ -100,8 +155,12 @@ export function createExecTool(defaultCwd?: string): AgentTool { + if (yielded) return; // Already backgrounded, ignore abort + if (timeout) clearTimeout(timeout); + if (yieldTimer) clearTimeout(yieldTimer); child.kill("SIGTERM"); }); } diff --git a/src/agent/tools/process-registry.ts b/src/agent/tools/process-registry.ts new file mode 100644 index 00000000..aba9409a --- /dev/null +++ b/src/agent/tools/process-registry.ts @@ -0,0 +1,96 @@ +import { type ChildProcess } from "child_process"; +import { v7 as uuidv7 } from "uuid"; + +export const MAX_OUTPUT_BUFFER = 64 * 1024; // 64KB per process +export const TERMINATED_PROCESS_TTL = 60 * 60 * 1000; // 1 hour TTL for terminated processes + +export type ProcessEntry = { + id: string; + command: string; + cwd?: string | undefined; + child: ChildProcess; + exitCode: number | null; + startedAt: number; + terminatedAt?: number | undefined; + outputBuffer: string[]; + outputSize: number; + source: "exec" | "process"; +}; + +export const PROCESS_REGISTRY = new Map(); + +/** + * Register a process in the shared registry. + * Sets up output collection and exit handling. + */ +export function registerProcess( + child: ChildProcess, + command: string, + cwd: string | undefined, + source: "exec" | "process", + id?: string, +): string { + const processId = id ?? uuidv7(); + + const entry: ProcessEntry = { + id: processId, + command, + cwd, + child, + exitCode: null, + startedAt: Date.now(), + outputBuffer: [], + outputSize: 0, + source, + }; + + PROCESS_REGISTRY.set(processId, entry); + + // Collect output to buffer with size limit + const collectOutput = (data: Buffer) => { + let text = data.toString("utf8"); + // Truncate if single chunk exceeds max buffer + if (text.length > MAX_OUTPUT_BUFFER) { + text = text.slice(-MAX_OUTPUT_BUFFER); + entry.outputBuffer = []; + entry.outputSize = 0; + } else if (entry.outputSize + text.length > MAX_OUTPUT_BUFFER) { + // Remove old entries to make room + while ( + entry.outputBuffer.length > 0 && + entry.outputSize + text.length > MAX_OUTPUT_BUFFER + ) { + const removed = entry.outputBuffer.shift(); + if (removed) entry.outputSize -= removed.length; + } + } + entry.outputBuffer.push(text); + entry.outputSize += text.length; + }; + + child.stdout?.on("data", collectOutput); + child.stderr?.on("data", collectOutput); + + child.on("close", (code) => { + entry.exitCode = code; + entry.terminatedAt = Date.now(); + }); + + return processId; +} + +/** + * Remove terminated processes older than TTL. + * Returns the number of processes removed. + */ +export function cleanupTerminatedProcesses(): number { + const now = Date.now(); + let removed = 0; + for (const [id, entry] of PROCESS_REGISTRY) { + if (entry.terminatedAt && now - entry.terminatedAt > TERMINATED_PROCESS_TTL) { + PROCESS_REGISTRY.delete(id); + removed++; + } + } + return removed; +} diff --git a/src/agent/tools/process.ts b/src/agent/tools/process.ts index e2809155..2f391f49 100644 --- a/src/agent/tools/process.ts +++ b/src/agent/tools/process.ts @@ -1,7 +1,12 @@ -import { spawn, type ChildProcess } from "child_process"; +import { spawn } from "child_process"; import { Type } from "@sinclair/typebox"; import type { AgentTool } from "@mariozechner/pi-agent-core"; import { v7 as uuidv7 } from "uuid"; +import { + PROCESS_REGISTRY, + registerProcess, + cleanupTerminatedProcesses, +} from "./process-registry.js"; const ProcessSchema = Type.Object({ action: Type.String({ description: "Action: start | status | stop | output | cleanup." }), @@ -10,36 +15,6 @@ const ProcessSchema = Type.Object({ cwd: Type.Optional(Type.String({ description: "Working directory." })), }); -const MAX_OUTPUT_BUFFER = 64 * 1024; // 64KB per process -const TERMINATED_PROCESS_TTL = 60 * 60 * 1000; // 1 hour TTL for terminated processes - -type ProcessEntry = { - id: string; - command: string; - cwd?: string | undefined; - child: ChildProcess; - exitCode: number | null; - startedAt: number; - terminatedAt?: number | undefined; - outputBuffer: string[]; - outputSize: number; -}; - -const PROCESS_REGISTRY = new Map(); - -/** Remove terminated processes older than TTL */ -function cleanupTerminatedProcesses(): number { - const now = Date.now(); - let removed = 0; - for (const [id, entry] of PROCESS_REGISTRY) { - if (entry.terminatedAt && now - entry.terminatedAt > TERMINATED_PROCESS_TTL) { - PROCESS_REGISTRY.delete(id); - removed++; - } - } - return removed; -} - export type ProcessResult = { id?: string | undefined; running?: boolean | undefined; @@ -71,11 +46,13 @@ export function createProcessTool(defaultCwd?: string): AgentTool((resolve) => { const child = spawn(command, { shell: true, - cwd: params.cwd || defaultCwd, + cwd, stdio: ["ignore", "pipe", "pipe"], detached: true, }); @@ -94,44 +71,7 @@ export function createProcessTool(defaultCwd?: string): AgentTool { if (!resolved) { resolved = true; - const entry: ProcessEntry = { - id, - command, - cwd: params.cwd || defaultCwd, - child, - exitCode: null, - startedAt: Date.now(), - outputBuffer: [], - outputSize: 0, - }; - PROCESS_REGISTRY.set(id, entry); - - // Collect output to buffer with size limit - const collectOutput = (data: Buffer) => { - let text = data.toString("utf8"); - // Truncate if single chunk exceeds max buffer - if (text.length > MAX_OUTPUT_BUFFER) { - text = text.slice(-MAX_OUTPUT_BUFFER); - entry.outputBuffer = []; - entry.outputSize = 0; - } else if (entry.outputSize + text.length > MAX_OUTPUT_BUFFER) { - // Remove old entries to make room - while (entry.outputBuffer.length > 0 && entry.outputSize + text.length > MAX_OUTPUT_BUFFER) { - const removed = entry.outputBuffer.shift(); - if (removed) entry.outputSize -= removed.length; - } - } - entry.outputBuffer.push(text); - entry.outputSize += text.length; - }; - - child.stdout?.on("data", collectOutput); - child.stderr?.on("data", collectOutput); - - child.on("close", (code) => { - entry.exitCode = code; - entry.terminatedAt = Date.now(); - }); + registerProcess(child, command, cwd, "process", id); if (signal) { signal.addEventListener("abort", () => { diff --git a/src/agent/types.ts b/src/agent/types.ts index fb99426b..a31dfe6e 100644 --- a/src/agent/types.ts +++ b/src/agent/types.ts @@ -45,4 +45,7 @@ export type AgentOptions = { // === Summary Compaction Configuration === /** Custom summary generation instructions */ summaryInstructions?: string | undefined; + + /** Enable debug logging */ + debug?: boolean | undefined; };