From cbb13b26d15e02ec0da3559337f0a90ef89c8e1a Mon Sep 17 00:00:00 2001 From: Jiayuan Date: Sat, 31 Jan 2026 18:12:00 +0800 Subject: [PATCH 1/4] fix(agent): return collected output when exec auto-backgrounds Previously, when a command exceeded yieldMs (default 5s) and was auto-backgrounded, exec returned an empty output string. This caused agents to misinterpret slow commands (like curl) as failed, leading to infinite retry loops. Changes: - Implement three-layer buffer system (pending 30KB + aggregated 200KB + tail 1KB) - Return collected output snapshot when backgrounding instead of empty string - Increase default yieldMs from 5s to 10s for better coverage - Add auto sweeper for terminated process cleanup (30min TTL) - Register process immediately on spawn to capture all output Co-Authored-By: Claude Opus 4.5 --- src/agent/tools/exec.ts | 64 ++++++----- src/agent/tools/process-registry.ts | 159 +++++++++++++++++++++++----- src/agent/tools/process.ts | 5 +- 3 files changed, 172 insertions(+), 56 deletions(-) diff --git a/src/agent/tools/exec.ts b/src/agent/tools/exec.ts index 169678ee..11338593 100644 --- a/src/agent/tools/exec.ts +++ b/src/agent/tools/exec.ts @@ -1,7 +1,12 @@ import { spawn } from "child_process"; import { Type } from "@sinclair/typebox"; import type { AgentTool } from "@mariozechner/pi-agent-core"; -import { registerProcess } from "./process-registry.js"; +import { + registerProcess, + getOutputSnapshot, + getFullOutput, + PROCESS_REGISTRY, +} from "./process-registry.js"; const ExecSchema = Type.Object({ command: Type.String({ description: "Shell command to execute." }), @@ -12,7 +17,7 @@ const ExecSchema = Type.Object({ yieldMs: Type.Optional( Type.Number({ description: - "Auto-background threshold in milliseconds. If command doesn't complete within this time, it runs in background. Default 5000ms. Set to 0 to disable auto-backgrounding.", + "Auto-background threshold in milliseconds. If command doesn't complete within this time, it runs in background. Default 10000ms. Set to 0 to disable auto-backgrounding.", minimum: 0, }), ), @@ -33,15 +38,14 @@ export type ExecResult = { processId?: string; }; -const MAX_OUTPUT_BYTES = 64 * 1024; -const DEFAULT_YIELD_MS = 5000; +const DEFAULT_YIELD_MS = 10000; // Changed from 5000 to 10000 export function createExecTool(defaultCwd?: string): AgentTool { return { name: "exec", label: "Exec", description: - "Execute a shell command. If the command doesn't complete within yieldMs (default 5s), it automatically runs in background and returns a process ID. Use 'process output ' to check output, 'process status ' to check status, 'process stop ' to terminate.", + "Execute a shell command. If the command doesn't complete within yieldMs (default 10s), it automatically runs in background and returns a process ID with any output collected so far. Use 'process output ' to check output, 'process status ' to check status, 'process stop ' to terminate.", parameters: ExecSchema, execute: async (_toolCallId, args, signal) => { const { command, cwd, timeoutMs, yieldMs = DEFAULT_YIELD_MS } = args as ExecArgs; @@ -59,6 +63,10 @@ export function createExecTool(defaultCwd?: string): AgentTool 0) { timeout = setTimeout(() => { @@ -76,20 +84,27 @@ export function createExecTool(defaultCwd?: string): AgentTool { - if (truncated) return; - size += data.length; - if (size > MAX_OUTPUT_BYTES) { - truncated = true; - const remaining = MAX_OUTPUT_BYTES - (size - data.length); - if (remaining > 0) chunks.push(data.subarray(0, remaining)); - return; - } - chunks.push(data); - }; - - child.stdout?.on("data", handleData); - child.stderr?.on("data", handleData); + // Note: Output is now collected by process-registry, no local chunk collection needed let spawnError: Error | null = null; child.on("error", (err) => { @@ -131,6 +129,15 @@ export function createExecTool(defaultCwd?: string): AgentTool(); +// Sweeper state +let sweeperTimer: ReturnType | null = null; + +/** + * Flush pending buffer to aggregated buffer. + * Truncates from the head if aggregated exceeds max size. + */ +function flushPendingToAggregated(entry: ProcessEntry): void { + if (!entry.pendingBuffer) return; + + entry.aggregatedBuffer += entry.pendingBuffer; + entry.pendingBuffer = ""; + + // Truncate from head if exceeds max + if (entry.aggregatedBuffer.length > AGGREGATED_MAX_SIZE) { + entry.truncated = true; + entry.aggregatedBuffer = entry.aggregatedBuffer.slice(-AGGREGATED_MAX_SIZE); + } +} + +/** + * Append output to process buffers with three-layer management. + * + * Flow: + * 1. Append to pending buffer + * 2. If pending exceeds max, flush to aggregated + * 3. Always update tail buffer with last TAIL_SIZE bytes + */ +export function appendOutput(entry: ProcessEntry, chunk: string): void { + entry.totalBytesReceived += chunk.length; + + // 1. Append to pending + entry.pendingBuffer += chunk; + + // 2. Flush pending to aggregated if exceeds max + if (entry.pendingBuffer.length > PENDING_MAX_SIZE) { + flushPendingToAggregated(entry); + } + + // 3. Update tail buffer (always keep last TAIL_SIZE bytes) + const combined = entry.aggregatedBuffer + entry.pendingBuffer; + entry.tailBuffer = combined.slice(-TAIL_SIZE); +} + +/** + * Get full output for a process. + * Returns aggregated + pending + truncation info. + */ +export function getFullOutput(entry: ProcessEntry): { + output: string; + truncated: boolean; +} { + // Flush pending first to get complete view + flushPendingToAggregated(entry); + + return { + output: entry.aggregatedBuffer, + truncated: entry.truncated, + }; +} + +/** + * Get output snapshot for backgrounding. + * Returns current aggregated + pending without flushing. + */ +export function getOutputSnapshot(entry: ProcessEntry): { + output: string; + truncated: boolean; +} { + const output = entry.aggregatedBuffer + entry.pendingBuffer; + return { + output, + truncated: entry.truncated || output.length > AGGREGATED_MAX_SIZE, + }; +} + +/** + * Start the sweeper if not already running. + * Uses unref() so it doesn't prevent process exit. + */ +function ensureSweeperRunning(): void { + if (sweeperTimer) return; + + sweeperTimer = setInterval(() => { + cleanupTerminatedProcesses(); + + // If registry is empty, stop the sweeper + if (PROCESS_REGISTRY.size === 0) { + stopSweeper(); + } + }, SWEEPER_INTERVAL); + + // Allow process to exit even if sweeper is running + sweeperTimer.unref(); +} + +/** + * Stop the sweeper. + */ +function stopSweeper(): void { + if (sweeperTimer) { + clearInterval(sweeperTimer); + sweeperTimer = null; + } +} + /** * Register a process in the shared registry. * Sets up output collection and exit handling. @@ -39,33 +155,21 @@ export function registerProcess( child, exitCode: null, startedAt: Date.now(), - outputBuffer: [], - outputSize: 0, source, + // Three-layer buffer initialization + pendingBuffer: "", + aggregatedBuffer: "", + tailBuffer: "", + totalBytesReceived: 0, + truncated: false, }; PROCESS_REGISTRY.set(processId, entry); - // Collect output to buffer with size limit + // Collect output using the appendOutput function const collectOutput = (data: Buffer) => { - let text = data.toString("utf8"); - // Truncate if single chunk exceeds max buffer - if (text.length > MAX_OUTPUT_BUFFER) { - text = text.slice(-MAX_OUTPUT_BUFFER); - entry.outputBuffer = []; - entry.outputSize = 0; - } else if (entry.outputSize + text.length > MAX_OUTPUT_BUFFER) { - // Remove old entries to make room - while ( - entry.outputBuffer.length > 0 && - entry.outputSize + text.length > MAX_OUTPUT_BUFFER - ) { - const removed = entry.outputBuffer.shift(); - if (removed) entry.outputSize -= removed.length; - } - } - entry.outputBuffer.push(text); - entry.outputSize += text.length; + const text = data.toString("utf8"); + appendOutput(entry, text); }; child.stdout?.on("data", collectOutput); @@ -74,8 +178,13 @@ export function registerProcess( child.on("close", (code) => { entry.exitCode = code; entry.terminatedAt = Date.now(); + // Flush any remaining pending on close + flushPendingToAggregated(entry); }); + // Start sweeper if not already running + ensureSweeperRunning(); + return processId; } diff --git a/src/agent/tools/process.ts b/src/agent/tools/process.ts index 2f391f49..7de9b9c1 100644 --- a/src/agent/tools/process.ts +++ b/src/agent/tools/process.ts @@ -6,6 +6,7 @@ import { PROCESS_REGISTRY, registerProcess, cleanupTerminatedProcesses, + getFullOutput, } from "./process-registry.js"; const ProcessSchema = Type.Object({ @@ -138,10 +139,10 @@ export function createProcessTool(defaultCwd?: string): AgentTool Date: Sat, 31 Jan 2026 18:28:58 +0800 Subject: [PATCH 2/4] feat(agent): add real-time streaming updates to exec tool Utilize the onUpdate callback from pi-agent-core's AgentTool interface to stream output updates in real-time while commands are executing. - Accept onUpdate as 4th parameter in execute function - Emit tailBuffer updates on each stdout/stderr data event - Stop emitting updates once command is backgrounded (yielded) This enables UI to show live command output progress via the tool_execution_update agent event. Co-Authored-By: Claude Opus 4.5 --- src/agent/tools/exec.ts | 25 ++++++++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) diff --git a/src/agent/tools/exec.ts b/src/agent/tools/exec.ts index 11338593..cf77d83f 100644 --- a/src/agent/tools/exec.ts +++ b/src/agent/tools/exec.ts @@ -47,7 +47,7 @@ export function createExecTool(defaultCwd?: string): AgentTool' to check output, 'process status ' to check status, 'process stop ' to terminate.", parameters: ExecSchema, - execute: async (_toolCallId, args, signal) => { + execute: async (_toolCallId, args, signal, onUpdate) => { const { command, cwd, timeoutMs, yieldMs = DEFAULT_YIELD_MS } = args as ExecArgs; const effectiveCwd = cwd || defaultCwd; @@ -67,6 +67,29 @@ export function createExecTool(defaultCwd?: string): AgentTool { + if (!onUpdate || yielded) return; + const entry = PROCESS_REGISTRY.get(processId); + if (!entry) return; + onUpdate({ + content: [{ type: "text", text: entry.tailBuffer || "(running...)" }], + details: { + output: entry.tailBuffer, + exitCode: null, + truncated: false, + processId, + }, + }); + }; + + // Listen to stdout/stderr to trigger onUpdate (data collection is handled by registerProcess) + if (onUpdate) { + child.stdout?.on("data", emitUpdate); + child.stderr?.on("data", emitUpdate); + } + // Timeout handling (hard kill) if (timeoutMs && timeoutMs > 0) { timeout = setTimeout(() => { From aaf0522303bd6f807faced76e1f2ed82432a05bb Mon Sep 17 00:00:00 2001 From: Jiayuan Date: Sat, 31 Jan 2026 18:40:40 +0800 Subject: [PATCH 3/4] feat(cli): display real-time tool execution updates Handle tool_execution_update events in CLI output to show live progress while commands are running. Shows the last 60 characters of output on a single line that updates in place. This completes the real-time streaming feature added to exec tool. Co-Authored-By: Claude Opus 4.5 --- src/agent/output.ts | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/src/agent/output.ts b/src/agent/output.ts index e14e546b..e5132fc2 100644 --- a/src/agent/output.ts +++ b/src/agent/output.ts @@ -120,7 +120,18 @@ export function createAgentOutput(params: { case "tool_execution_start": params.stderr.write(`${formatToolLine(event.toolName, event.args)}\n`); break; + case "tool_execution_update": { + // Show real-time output updates (e.g., from exec tool) + const updateText = extractText(event.partialResult); + if (updateText) { + // Clear line and show latest tail output + params.stderr.write(`\r\x1b[K ↳ ${updateText.slice(-60).replace(/\n/g, " ")}`); + } + break; + } case "tool_execution_end": + // Clear any update line from tool_execution_update + params.stderr.write("\r\x1b[K"); if (event.isError) { const errorText = extractText(event.result) || "Tool failed"; params.stderr.write(`• Tool error (${toolDisplayName(event.toolName)}): ${errorText}\n`); From 53bd52b137da19beafb91eeea38049ecd39b8f1a Mon Sep 17 00:00:00 2001 From: Jiayuan Date: Sat, 31 Jan 2026 18:49:16 +0800 Subject: [PATCH 4/4] feat(agent): add process list action for activity monitoring Add 'list' action to process tool that displays all registered processes with their ID, command, status, duration, and source (exec/process). Example output: ID COMMAND STATUS DURATION SOURCE 019c139c-dbb7-70ec-ab91-0a7fd2711043 curl -X POST running 15.2s [exec] Co-Authored-By: Claude Opus 4.5 --- src/agent/tools/process.ts | 39 ++++++++++++++++++++++++++++++++++++-- 1 file changed, 37 insertions(+), 2 deletions(-) diff --git a/src/agent/tools/process.ts b/src/agent/tools/process.ts index 7de9b9c1..962b6a13 100644 --- a/src/agent/tools/process.ts +++ b/src/agent/tools/process.ts @@ -10,7 +10,7 @@ import { } from "./process-registry.js"; const ProcessSchema = Type.Object({ - action: Type.String({ description: "Action: start | status | stop | output | cleanup." }), + action: Type.String({ description: "Action: list | start | status | stop | output | cleanup." }), id: Type.Optional(Type.String({ description: "Process id for status/stop/output." })), command: Type.Optional(Type.String({ description: "Command to run for start." })), cwd: Type.Optional(Type.String({ description: "Working directory." })), @@ -28,7 +28,7 @@ export function createProcessTool(defaultCwd?: string): AgentTool { // Auto-cleanup old terminated processes on each invocation @@ -39,6 +39,41 @@ export function createProcessTool(defaultCwd?: string): AgentTool { + const running = entry.exitCode === null; + const durationMs = Date.now() - entry.startedAt; + const durationSec = (durationMs / 1000).toFixed(1); + return { + id: entry.id, + command: entry.command.length > 50 ? entry.command.slice(0, 47) + "..." : entry.command, + running, + exitCode: entry.exitCode, + duration: `${durationSec}s`, + source: entry.source, + }; + }); + + if (processes.length === 0) { + return { + content: [{ type: "text", text: "No processes in registry." }], + details: { processes: [] }, + }; + } + + const lines = processes.map((p) => { + const status = p.running ? "running" : `exited(${p.exitCode})`; + return `${p.id} ${p.command.padEnd(50)} ${status.padEnd(12)} ${p.duration.padStart(8)} [${p.source}]`; + }); + const header = "ID".padEnd(36) + " " + "COMMAND".padEnd(50) + " " + "STATUS".padEnd(12) + " " + "DURATION".padStart(8) + " SOURCE"; + const output = [header, "-".repeat(header.length), ...lines].join("\n"); + + return { + content: [{ type: "text", text: output }], + details: { processes }, + }; + } + if (action === "start") { const command = String(params.command ?? ""); if (!command) throw new Error("Missing command");