From cbb13b26d15e02ec0da3559337f0a90ef89c8e1a Mon Sep 17 00:00:00 2001 From: Jiayuan Date: Sat, 31 Jan 2026 18:12:00 +0800 Subject: [PATCH] fix(agent): return collected output when exec auto-backgrounds Previously, when a command exceeded yieldMs (default 5s) and was auto-backgrounded, exec returned an empty output string. This caused agents to misinterpret slow commands (like curl) as failed, leading to infinite retry loops. Changes: - Implement three-layer buffer system (pending 30KB + aggregated 200KB + tail 1KB) - Return collected output snapshot when backgrounding instead of empty string - Increase default yieldMs from 5s to 10s for better coverage - Add auto sweeper for terminated process cleanup (30min TTL) - Register process immediately on spawn to capture all output Co-Authored-By: Claude Opus 4.5 --- src/agent/tools/exec.ts | 64 ++++++----- src/agent/tools/process-registry.ts | 159 +++++++++++++++++++++++----- src/agent/tools/process.ts | 5 +- 3 files changed, 172 insertions(+), 56 deletions(-) diff --git a/src/agent/tools/exec.ts b/src/agent/tools/exec.ts index 169678ee..11338593 100644 --- a/src/agent/tools/exec.ts +++ b/src/agent/tools/exec.ts @@ -1,7 +1,12 @@ import { spawn } from "child_process"; import { Type } from "@sinclair/typebox"; import type { AgentTool } from "@mariozechner/pi-agent-core"; -import { registerProcess } from "./process-registry.js"; +import { + registerProcess, + getOutputSnapshot, + getFullOutput, + PROCESS_REGISTRY, +} from "./process-registry.js"; const ExecSchema = Type.Object({ command: Type.String({ description: "Shell command to execute." }), @@ -12,7 +17,7 @@ const ExecSchema = Type.Object({ yieldMs: Type.Optional( Type.Number({ description: - "Auto-background threshold in milliseconds. If command doesn't complete within this time, it runs in background. Default 5000ms. Set to 0 to disable auto-backgrounding.", + "Auto-background threshold in milliseconds. If command doesn't complete within this time, it runs in background. Default 10000ms. Set to 0 to disable auto-backgrounding.", minimum: 0, }), ), @@ -33,15 +38,14 @@ export type ExecResult = { processId?: string; }; -const MAX_OUTPUT_BYTES = 64 * 1024; -const DEFAULT_YIELD_MS = 5000; +const DEFAULT_YIELD_MS = 10000; // Changed from 5000 to 10000 export function createExecTool(defaultCwd?: string): AgentTool { return { name: "exec", label: "Exec", description: - "Execute a shell command. If the command doesn't complete within yieldMs (default 5s), it automatically runs in background and returns a process ID. Use 'process output ' to check output, 'process status ' to check status, 'process stop ' to terminate.", + "Execute a shell command. If the command doesn't complete within yieldMs (default 10s), it automatically runs in background and returns a process ID with any output collected so far. Use 'process output ' to check output, 'process status ' to check status, 'process stop ' to terminate.", parameters: ExecSchema, execute: async (_toolCallId, args, signal) => { const { command, cwd, timeoutMs, yieldMs = DEFAULT_YIELD_MS } = args as ExecArgs; @@ -59,6 +63,10 @@ export function createExecTool(defaultCwd?: string): AgentTool 0) { timeout = setTimeout(() => { @@ -76,20 +84,27 @@ export function createExecTool(defaultCwd?: string): AgentTool { - if (truncated) return; - size += data.length; - if (size > MAX_OUTPUT_BYTES) { - truncated = true; - const remaining = MAX_OUTPUT_BYTES - (size - data.length); - if (remaining > 0) chunks.push(data.subarray(0, remaining)); - return; - } - chunks.push(data); - }; - - child.stdout?.on("data", handleData); - child.stderr?.on("data", handleData); + // Note: Output is now collected by process-registry, no local chunk collection needed let spawnError: Error | null = null; child.on("error", (err) => { @@ -131,6 +129,15 @@ export function createExecTool(defaultCwd?: string): AgentTool(); +// Sweeper state +let sweeperTimer: ReturnType | null = null; + +/** + * Flush pending buffer to aggregated buffer. + * Truncates from the head if aggregated exceeds max size. + */ +function flushPendingToAggregated(entry: ProcessEntry): void { + if (!entry.pendingBuffer) return; + + entry.aggregatedBuffer += entry.pendingBuffer; + entry.pendingBuffer = ""; + + // Truncate from head if exceeds max + if (entry.aggregatedBuffer.length > AGGREGATED_MAX_SIZE) { + entry.truncated = true; + entry.aggregatedBuffer = entry.aggregatedBuffer.slice(-AGGREGATED_MAX_SIZE); + } +} + +/** + * Append output to process buffers with three-layer management. + * + * Flow: + * 1. Append to pending buffer + * 2. If pending exceeds max, flush to aggregated + * 3. Always update tail buffer with last TAIL_SIZE bytes + */ +export function appendOutput(entry: ProcessEntry, chunk: string): void { + entry.totalBytesReceived += chunk.length; + + // 1. Append to pending + entry.pendingBuffer += chunk; + + // 2. Flush pending to aggregated if exceeds max + if (entry.pendingBuffer.length > PENDING_MAX_SIZE) { + flushPendingToAggregated(entry); + } + + // 3. Update tail buffer (always keep last TAIL_SIZE bytes) + const combined = entry.aggregatedBuffer + entry.pendingBuffer; + entry.tailBuffer = combined.slice(-TAIL_SIZE); +} + +/** + * Get full output for a process. + * Returns aggregated + pending + truncation info. + */ +export function getFullOutput(entry: ProcessEntry): { + output: string; + truncated: boolean; +} { + // Flush pending first to get complete view + flushPendingToAggregated(entry); + + return { + output: entry.aggregatedBuffer, + truncated: entry.truncated, + }; +} + +/** + * Get output snapshot for backgrounding. + * Returns current aggregated + pending without flushing. + */ +export function getOutputSnapshot(entry: ProcessEntry): { + output: string; + truncated: boolean; +} { + const output = entry.aggregatedBuffer + entry.pendingBuffer; + return { + output, + truncated: entry.truncated || output.length > AGGREGATED_MAX_SIZE, + }; +} + +/** + * Start the sweeper if not already running. + * Uses unref() so it doesn't prevent process exit. + */ +function ensureSweeperRunning(): void { + if (sweeperTimer) return; + + sweeperTimer = setInterval(() => { + cleanupTerminatedProcesses(); + + // If registry is empty, stop the sweeper + if (PROCESS_REGISTRY.size === 0) { + stopSweeper(); + } + }, SWEEPER_INTERVAL); + + // Allow process to exit even if sweeper is running + sweeperTimer.unref(); +} + +/** + * Stop the sweeper. + */ +function stopSweeper(): void { + if (sweeperTimer) { + clearInterval(sweeperTimer); + sweeperTimer = null; + } +} + /** * Register a process in the shared registry. * Sets up output collection and exit handling. @@ -39,33 +155,21 @@ export function registerProcess( child, exitCode: null, startedAt: Date.now(), - outputBuffer: [], - outputSize: 0, source, + // Three-layer buffer initialization + pendingBuffer: "", + aggregatedBuffer: "", + tailBuffer: "", + totalBytesReceived: 0, + truncated: false, }; PROCESS_REGISTRY.set(processId, entry); - // Collect output to buffer with size limit + // Collect output using the appendOutput function const collectOutput = (data: Buffer) => { - let text = data.toString("utf8"); - // Truncate if single chunk exceeds max buffer - if (text.length > MAX_OUTPUT_BUFFER) { - text = text.slice(-MAX_OUTPUT_BUFFER); - entry.outputBuffer = []; - entry.outputSize = 0; - } else if (entry.outputSize + text.length > MAX_OUTPUT_BUFFER) { - // Remove old entries to make room - while ( - entry.outputBuffer.length > 0 && - entry.outputSize + text.length > MAX_OUTPUT_BUFFER - ) { - const removed = entry.outputBuffer.shift(); - if (removed) entry.outputSize -= removed.length; - } - } - entry.outputBuffer.push(text); - entry.outputSize += text.length; + const text = data.toString("utf8"); + appendOutput(entry, text); }; child.stdout?.on("data", collectOutput); @@ -74,8 +178,13 @@ export function registerProcess( child.on("close", (code) => { entry.exitCode = code; entry.terminatedAt = Date.now(); + // Flush any remaining pending on close + flushPendingToAggregated(entry); }); + // Start sweeper if not already running + ensureSweeperRunning(); + return processId; } diff --git a/src/agent/tools/process.ts b/src/agent/tools/process.ts index 2f391f49..7de9b9c1 100644 --- a/src/agent/tools/process.ts +++ b/src/agent/tools/process.ts @@ -6,6 +6,7 @@ import { PROCESS_REGISTRY, registerProcess, cleanupTerminatedProcesses, + getFullOutput, } from "./process-registry.js"; const ProcessSchema = Type.Object({ @@ -138,10 +139,10 @@ export function createProcessTool(defaultCwd?: string): AgentTool