fix(agent): return collected output when exec auto-backgrounds
Previously, when a command exceeded yieldMs (default 5s) and was auto-backgrounded, exec returned an empty output string. This caused agents to misinterpret slow commands (like curl) as failed, leading to infinite retry loops. Changes: - Implement three-layer buffer system (pending 30KB + aggregated 200KB + tail 1KB) - Return collected output snapshot when backgrounding instead of empty string - Increase default yieldMs from 5s to 10s for better coverage - Add auto sweeper for terminated process cleanup (30min TTL) - Register process immediately on spawn to capture all output Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
parent
a5f979fceb
commit
cbb13b26d1
3 changed files with 172 additions and 56 deletions
|
|
@ -1,7 +1,12 @@
|
|||
import { spawn } from "child_process";
|
||||
import { Type } from "@sinclair/typebox";
|
||||
import type { AgentTool } from "@mariozechner/pi-agent-core";
|
||||
import { registerProcess } from "./process-registry.js";
|
||||
import {
|
||||
registerProcess,
|
||||
getOutputSnapshot,
|
||||
getFullOutput,
|
||||
PROCESS_REGISTRY,
|
||||
} from "./process-registry.js";
|
||||
|
||||
const ExecSchema = Type.Object({
|
||||
command: Type.String({ description: "Shell command to execute." }),
|
||||
|
|
@ -12,7 +17,7 @@ const ExecSchema = Type.Object({
|
|||
yieldMs: Type.Optional(
|
||||
Type.Number({
|
||||
description:
|
||||
"Auto-background threshold in milliseconds. If command doesn't complete within this time, it runs in background. Default 5000ms. Set to 0 to disable auto-backgrounding.",
|
||||
"Auto-background threshold in milliseconds. If command doesn't complete within this time, it runs in background. Default 10000ms. Set to 0 to disable auto-backgrounding.",
|
||||
minimum: 0,
|
||||
}),
|
||||
),
|
||||
|
|
@ -33,15 +38,14 @@ export type ExecResult = {
|
|||
processId?: string;
|
||||
};
|
||||
|
||||
const MAX_OUTPUT_BYTES = 64 * 1024;
|
||||
const DEFAULT_YIELD_MS = 5000;
|
||||
const DEFAULT_YIELD_MS = 10000; // Changed from 5000 to 10000
|
||||
|
||||
export function createExecTool(defaultCwd?: string): AgentTool<typeof ExecSchema, ExecResult> {
|
||||
return {
|
||||
name: "exec",
|
||||
label: "Exec",
|
||||
description:
|
||||
"Execute a shell command. If the command doesn't complete within yieldMs (default 5s), it automatically runs in background and returns a process ID. Use 'process output <id>' to check output, 'process status <id>' to check status, 'process stop <id>' to terminate.",
|
||||
"Execute a shell command. If the command doesn't complete within yieldMs (default 10s), it automatically runs in background and returns a process ID with any output collected so far. Use 'process output <id>' to check output, 'process status <id>' to check status, 'process stop <id>' to terminate.",
|
||||
parameters: ExecSchema,
|
||||
execute: async (_toolCallId, args, signal) => {
|
||||
const { command, cwd, timeoutMs, yieldMs = DEFAULT_YIELD_MS } = args as ExecArgs;
|
||||
|
|
@ -59,6 +63,10 @@ export function createExecTool(defaultCwd?: string): AgentTool<typeof ExecSchema
|
|||
let timeout: NodeJS.Timeout | undefined;
|
||||
let yieldTimer: NodeJS.Timeout | undefined;
|
||||
|
||||
// Register process immediately to start buffering output
|
||||
// This ensures output is captured even before yield timeout
|
||||
const processId = registerProcess(child, command, effectiveCwd, "exec");
|
||||
|
||||
// Timeout handling (hard kill)
|
||||
if (timeoutMs && timeoutMs > 0) {
|
||||
timeout = setTimeout(() => {
|
||||
|
|
@ -76,20 +84,27 @@ export function createExecTool(defaultCwd?: string): AgentTool<typeof ExecSchema
|
|||
// Clear timeout since we're backgrounding
|
||||
if (timeout) clearTimeout(timeout);
|
||||
|
||||
// Register to shared process registry
|
||||
const processId = registerProcess(child, command, effectiveCwd, "exec");
|
||||
// Get output collected so far (THE KEY FIX)
|
||||
const entry = PROCESS_REGISTRY.get(processId);
|
||||
const snapshot = entry
|
||||
? getOutputSnapshot(entry)
|
||||
: { output: "", truncated: false };
|
||||
|
||||
const outputPreview = snapshot.output
|
||||
? `\n\nOutput so far:\n${snapshot.output}${snapshot.truncated ? "\n[truncated]" : ""}`
|
||||
: "";
|
||||
|
||||
resolve({
|
||||
content: [
|
||||
{
|
||||
type: "text",
|
||||
text: `Command running in background. Process ID: ${processId}\nUse 'process output ${processId}' to check output.`,
|
||||
text: `Command running in background. Process ID: ${processId}${outputPreview}\n\nUse 'process output ${processId}' to check more output.`,
|
||||
},
|
||||
],
|
||||
details: {
|
||||
output: "",
|
||||
output: snapshot.output,
|
||||
exitCode: null,
|
||||
truncated: false,
|
||||
truncated: snapshot.truncated,
|
||||
backgrounded: true,
|
||||
processId,
|
||||
},
|
||||
|
|
@ -97,24 +112,7 @@ export function createExecTool(defaultCwd?: string): AgentTool<typeof ExecSchema
|
|||
}, yieldMs);
|
||||
}
|
||||
|
||||
const chunks: Buffer[] = [];
|
||||
let size = 0;
|
||||
let truncated = false;
|
||||
|
||||
const handleData = (data: Buffer) => {
|
||||
if (truncated) return;
|
||||
size += data.length;
|
||||
if (size > MAX_OUTPUT_BYTES) {
|
||||
truncated = true;
|
||||
const remaining = MAX_OUTPUT_BYTES - (size - data.length);
|
||||
if (remaining > 0) chunks.push(data.subarray(0, remaining));
|
||||
return;
|
||||
}
|
||||
chunks.push(data);
|
||||
};
|
||||
|
||||
child.stdout?.on("data", handleData);
|
||||
child.stderr?.on("data", handleData);
|
||||
// Note: Output is now collected by process-registry, no local chunk collection needed
|
||||
|
||||
let spawnError: Error | null = null;
|
||||
child.on("error", (err) => {
|
||||
|
|
@ -131,6 +129,15 @@ export function createExecTool(defaultCwd?: string): AgentTool<typeof ExecSchema
|
|||
// If already backgrounded, don't resolve again
|
||||
if (yielded) return;
|
||||
|
||||
// Get output from registry buffer
|
||||
const entry = PROCESS_REGISTRY.get(processId);
|
||||
const { output, truncated } = entry
|
||||
? getFullOutput(entry)
|
||||
: { output: "", truncated: false };
|
||||
|
||||
// Remove from registry since we're returning synchronously
|
||||
PROCESS_REGISTRY.delete(processId);
|
||||
|
||||
// If there's a spawn error, return error message
|
||||
if (spawnError) {
|
||||
resolve({
|
||||
|
|
@ -144,7 +151,6 @@ export function createExecTool(defaultCwd?: string): AgentTool<typeof ExecSchema
|
|||
return;
|
||||
}
|
||||
|
||||
const output = Buffer.concat(chunks).toString("utf8");
|
||||
resolve({
|
||||
content: [{ type: "text", text: output || (timedOut ? "Process timed out." : "") }],
|
||||
details: {
|
||||
|
|
|
|||
|
|
@ -1,8 +1,12 @@
|
|||
import { type ChildProcess } from "child_process";
|
||||
import { v7 as uuidv7 } from "uuid";
|
||||
|
||||
export const MAX_OUTPUT_BUFFER = 64 * 1024; // 64KB per process
|
||||
export const TERMINATED_PROCESS_TTL = 60 * 60 * 1000; // 1 hour TTL for terminated processes
|
||||
// Three-layer buffer size constants
|
||||
export const PENDING_MAX_SIZE = 30 * 1024; // 30KB - recent unread output
|
||||
export const AGGREGATED_MAX_SIZE = 200 * 1024; // 200KB - accumulated historical output
|
||||
export const TAIL_SIZE = 1024; // 1KB - last N bytes for quick peek
|
||||
export const TERMINATED_PROCESS_TTL = 30 * 60 * 1000; // 30 minutes TTL for terminated processes
|
||||
const SWEEPER_INTERVAL = 5 * 60 * 1000; // 5 minutes
|
||||
|
||||
export type ProcessEntry = {
|
||||
id: string;
|
||||
|
|
@ -12,13 +16,125 @@ export type ProcessEntry = {
|
|||
exitCode: number | null;
|
||||
startedAt: number;
|
||||
terminatedAt?: number | undefined;
|
||||
outputBuffer: string[];
|
||||
outputSize: number;
|
||||
source: "exec" | "process";
|
||||
|
||||
// Three-layer buffer system
|
||||
pendingBuffer: string; // Recent output (30KB)
|
||||
aggregatedBuffer: string; // Historical accumulated output (200KB)
|
||||
tailBuffer: string; // Last N bytes for quick "tail" view (1KB)
|
||||
|
||||
totalBytesReceived: number;
|
||||
truncated: boolean;
|
||||
};
|
||||
|
||||
export const PROCESS_REGISTRY = new Map<string, ProcessEntry>();
|
||||
|
||||
// Sweeper state
|
||||
let sweeperTimer: ReturnType<typeof setInterval> | null = null;
|
||||
|
||||
/**
|
||||
* Flush pending buffer to aggregated buffer.
|
||||
* Truncates from the head if aggregated exceeds max size.
|
||||
*/
|
||||
function flushPendingToAggregated(entry: ProcessEntry): void {
|
||||
if (!entry.pendingBuffer) return;
|
||||
|
||||
entry.aggregatedBuffer += entry.pendingBuffer;
|
||||
entry.pendingBuffer = "";
|
||||
|
||||
// Truncate from head if exceeds max
|
||||
if (entry.aggregatedBuffer.length > AGGREGATED_MAX_SIZE) {
|
||||
entry.truncated = true;
|
||||
entry.aggregatedBuffer = entry.aggregatedBuffer.slice(-AGGREGATED_MAX_SIZE);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Append output to process buffers with three-layer management.
|
||||
*
|
||||
* Flow:
|
||||
* 1. Append to pending buffer
|
||||
* 2. If pending exceeds max, flush to aggregated
|
||||
* 3. Always update tail buffer with last TAIL_SIZE bytes
|
||||
*/
|
||||
export function appendOutput(entry: ProcessEntry, chunk: string): void {
|
||||
entry.totalBytesReceived += chunk.length;
|
||||
|
||||
// 1. Append to pending
|
||||
entry.pendingBuffer += chunk;
|
||||
|
||||
// 2. Flush pending to aggregated if exceeds max
|
||||
if (entry.pendingBuffer.length > PENDING_MAX_SIZE) {
|
||||
flushPendingToAggregated(entry);
|
||||
}
|
||||
|
||||
// 3. Update tail buffer (always keep last TAIL_SIZE bytes)
|
||||
const combined = entry.aggregatedBuffer + entry.pendingBuffer;
|
||||
entry.tailBuffer = combined.slice(-TAIL_SIZE);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get full output for a process.
|
||||
* Returns aggregated + pending + truncation info.
|
||||
*/
|
||||
export function getFullOutput(entry: ProcessEntry): {
|
||||
output: string;
|
||||
truncated: boolean;
|
||||
} {
|
||||
// Flush pending first to get complete view
|
||||
flushPendingToAggregated(entry);
|
||||
|
||||
return {
|
||||
output: entry.aggregatedBuffer,
|
||||
truncated: entry.truncated,
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Get output snapshot for backgrounding.
|
||||
* Returns current aggregated + pending without flushing.
|
||||
*/
|
||||
export function getOutputSnapshot(entry: ProcessEntry): {
|
||||
output: string;
|
||||
truncated: boolean;
|
||||
} {
|
||||
const output = entry.aggregatedBuffer + entry.pendingBuffer;
|
||||
return {
|
||||
output,
|
||||
truncated: entry.truncated || output.length > AGGREGATED_MAX_SIZE,
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Start the sweeper if not already running.
|
||||
* Uses unref() so it doesn't prevent process exit.
|
||||
*/
|
||||
function ensureSweeperRunning(): void {
|
||||
if (sweeperTimer) return;
|
||||
|
||||
sweeperTimer = setInterval(() => {
|
||||
cleanupTerminatedProcesses();
|
||||
|
||||
// If registry is empty, stop the sweeper
|
||||
if (PROCESS_REGISTRY.size === 0) {
|
||||
stopSweeper();
|
||||
}
|
||||
}, SWEEPER_INTERVAL);
|
||||
|
||||
// Allow process to exit even if sweeper is running
|
||||
sweeperTimer.unref();
|
||||
}
|
||||
|
||||
/**
|
||||
* Stop the sweeper.
|
||||
*/
|
||||
function stopSweeper(): void {
|
||||
if (sweeperTimer) {
|
||||
clearInterval(sweeperTimer);
|
||||
sweeperTimer = null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Register a process in the shared registry.
|
||||
* Sets up output collection and exit handling.
|
||||
|
|
@ -39,33 +155,21 @@ export function registerProcess(
|
|||
child,
|
||||
exitCode: null,
|
||||
startedAt: Date.now(),
|
||||
outputBuffer: [],
|
||||
outputSize: 0,
|
||||
source,
|
||||
// Three-layer buffer initialization
|
||||
pendingBuffer: "",
|
||||
aggregatedBuffer: "",
|
||||
tailBuffer: "",
|
||||
totalBytesReceived: 0,
|
||||
truncated: false,
|
||||
};
|
||||
|
||||
PROCESS_REGISTRY.set(processId, entry);
|
||||
|
||||
// Collect output to buffer with size limit
|
||||
// Collect output using the appendOutput function
|
||||
const collectOutput = (data: Buffer) => {
|
||||
let text = data.toString("utf8");
|
||||
// Truncate if single chunk exceeds max buffer
|
||||
if (text.length > MAX_OUTPUT_BUFFER) {
|
||||
text = text.slice(-MAX_OUTPUT_BUFFER);
|
||||
entry.outputBuffer = [];
|
||||
entry.outputSize = 0;
|
||||
} else if (entry.outputSize + text.length > MAX_OUTPUT_BUFFER) {
|
||||
// Remove old entries to make room
|
||||
while (
|
||||
entry.outputBuffer.length > 0 &&
|
||||
entry.outputSize + text.length > MAX_OUTPUT_BUFFER
|
||||
) {
|
||||
const removed = entry.outputBuffer.shift();
|
||||
if (removed) entry.outputSize -= removed.length;
|
||||
}
|
||||
}
|
||||
entry.outputBuffer.push(text);
|
||||
entry.outputSize += text.length;
|
||||
const text = data.toString("utf8");
|
||||
appendOutput(entry, text);
|
||||
};
|
||||
|
||||
child.stdout?.on("data", collectOutput);
|
||||
|
|
@ -74,8 +178,13 @@ export function registerProcess(
|
|||
child.on("close", (code) => {
|
||||
entry.exitCode = code;
|
||||
entry.terminatedAt = Date.now();
|
||||
// Flush any remaining pending on close
|
||||
flushPendingToAggregated(entry);
|
||||
});
|
||||
|
||||
// Start sweeper if not already running
|
||||
ensureSweeperRunning();
|
||||
|
||||
return processId;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -6,6 +6,7 @@ import {
|
|||
PROCESS_REGISTRY,
|
||||
registerProcess,
|
||||
cleanupTerminatedProcesses,
|
||||
getFullOutput,
|
||||
} from "./process-registry.js";
|
||||
|
||||
const ProcessSchema = Type.Object({
|
||||
|
|
@ -138,10 +139,10 @@ export function createProcessTool(defaultCwd?: string): AgentTool<typeof Process
|
|||
details: { id, running: false },
|
||||
};
|
||||
}
|
||||
const output = entry.outputBuffer.join("");
|
||||
const { output, truncated } = getFullOutput(entry);
|
||||
const running = entry.exitCode === null;
|
||||
return {
|
||||
content: [{ type: "text", text: output || "(no output)" }],
|
||||
content: [{ type: "text", text: (output || "(no output)") + (truncated ? "\n[output truncated]" : "") }],
|
||||
details: { id, running, exitCode: entry.exitCode, output },
|
||||
};
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue