Merge pull request #41 from multica-ai/forrestchang/exec-blocking

fix(agent): improve exec tool with three-layer buffers and real-time updates
This commit is contained in:
Jiayuan Zhang 2026-01-31 18:54:36 +08:00 committed by GitHub
commit ace1a2fe6b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 244 additions and 59 deletions

View file

@ -120,7 +120,18 @@ export function createAgentOutput(params: {
case "tool_execution_start":
params.stderr.write(`${formatToolLine(event.toolName, event.args)}\n`);
break;
case "tool_execution_update": {
// Show real-time output updates (e.g., from exec tool)
const updateText = extractText(event.partialResult);
if (updateText) {
// Clear line and show latest tail output
params.stderr.write(`\r\x1b[K ↳ ${updateText.slice(-60).replace(/\n/g, " ")}`);
}
break;
}
case "tool_execution_end":
// Clear any update line from tool_execution_update
params.stderr.write("\r\x1b[K");
if (event.isError) {
const errorText = extractText(event.result) || "Tool failed";
params.stderr.write(`• Tool error (${toolDisplayName(event.toolName)}): ${errorText}\n`);

View file

@ -1,7 +1,12 @@
import { spawn } from "child_process";
import { Type } from "@sinclair/typebox";
import type { AgentTool } from "@mariozechner/pi-agent-core";
import { registerProcess } from "./process-registry.js";
import {
registerProcess,
getOutputSnapshot,
getFullOutput,
PROCESS_REGISTRY,
} from "./process-registry.js";
const ExecSchema = Type.Object({
command: Type.String({ description: "Shell command to execute." }),
@ -12,7 +17,7 @@ const ExecSchema = Type.Object({
yieldMs: Type.Optional(
Type.Number({
description:
"Auto-background threshold in milliseconds. If command doesn't complete within this time, it runs in background. Default 5000ms. Set to 0 to disable auto-backgrounding.",
"Auto-background threshold in milliseconds. If command doesn't complete within this time, it runs in background. Default 10000ms. Set to 0 to disable auto-backgrounding.",
minimum: 0,
}),
),
@ -33,17 +38,16 @@ export type ExecResult = {
processId?: string;
};
const MAX_OUTPUT_BYTES = 64 * 1024;
const DEFAULT_YIELD_MS = 5000;
const DEFAULT_YIELD_MS = 10000; // Changed from 5000 to 10000
export function createExecTool(defaultCwd?: string): AgentTool<typeof ExecSchema, ExecResult> {
return {
name: "exec",
label: "Exec",
description:
"Execute a shell command. If the command doesn't complete within yieldMs (default 5s), it automatically runs in background and returns a process ID. Use 'process output <id>' to check output, 'process status <id>' to check status, 'process stop <id>' to terminate.",
"Execute a shell command. If the command doesn't complete within yieldMs (default 10s), it automatically runs in background and returns a process ID with any output collected so far. Use 'process output <id>' to check output, 'process status <id>' to check status, 'process stop <id>' to terminate.",
parameters: ExecSchema,
execute: async (_toolCallId, args, signal) => {
execute: async (_toolCallId, args, signal, onUpdate) => {
const { command, cwd, timeoutMs, yieldMs = DEFAULT_YIELD_MS } = args as ExecArgs;
const effectiveCwd = cwd || defaultCwd;
@ -59,6 +63,33 @@ export function createExecTool(defaultCwd?: string): AgentTool<typeof ExecSchema
let timeout: NodeJS.Timeout | undefined;
let yieldTimer: NodeJS.Timeout | undefined;
// Register process immediately to start buffering output
// This ensures output is captured even before yield timeout
const processId = registerProcess(child, command, effectiveCwd, "exec");
// Stream output updates via onUpdate callback
// Note: appendOutput is already called by registerProcess, we just emit updates here
const emitUpdate = () => {
if (!onUpdate || yielded) return;
const entry = PROCESS_REGISTRY.get(processId);
if (!entry) return;
onUpdate({
content: [{ type: "text", text: entry.tailBuffer || "(running...)" }],
details: {
output: entry.tailBuffer,
exitCode: null,
truncated: false,
processId,
},
});
};
// Listen to stdout/stderr to trigger onUpdate (data collection is handled by registerProcess)
if (onUpdate) {
child.stdout?.on("data", emitUpdate);
child.stderr?.on("data", emitUpdate);
}
// Timeout handling (hard kill)
if (timeoutMs && timeoutMs > 0) {
timeout = setTimeout(() => {
@ -76,20 +107,27 @@ export function createExecTool(defaultCwd?: string): AgentTool<typeof ExecSchema
// Clear timeout since we're backgrounding
if (timeout) clearTimeout(timeout);
// Register to shared process registry
const processId = registerProcess(child, command, effectiveCwd, "exec");
// Get output collected so far (THE KEY FIX)
const entry = PROCESS_REGISTRY.get(processId);
const snapshot = entry
? getOutputSnapshot(entry)
: { output: "", truncated: false };
const outputPreview = snapshot.output
? `\n\nOutput so far:\n${snapshot.output}${snapshot.truncated ? "\n[truncated]" : ""}`
: "";
resolve({
content: [
{
type: "text",
text: `Command running in background. Process ID: ${processId}\nUse 'process output ${processId}' to check output.`,
text: `Command running in background. Process ID: ${processId}${outputPreview}\n\nUse 'process output ${processId}' to check more output.`,
},
],
details: {
output: "",
output: snapshot.output,
exitCode: null,
truncated: false,
truncated: snapshot.truncated,
backgrounded: true,
processId,
},
@ -97,24 +135,7 @@ export function createExecTool(defaultCwd?: string): AgentTool<typeof ExecSchema
}, yieldMs);
}
const chunks: Buffer[] = [];
let size = 0;
let truncated = false;
const handleData = (data: Buffer) => {
if (truncated) return;
size += data.length;
if (size > MAX_OUTPUT_BYTES) {
truncated = true;
const remaining = MAX_OUTPUT_BYTES - (size - data.length);
if (remaining > 0) chunks.push(data.subarray(0, remaining));
return;
}
chunks.push(data);
};
child.stdout?.on("data", handleData);
child.stderr?.on("data", handleData);
// Note: Output is now collected by process-registry, no local chunk collection needed
let spawnError: Error | null = null;
child.on("error", (err) => {
@ -131,6 +152,15 @@ export function createExecTool(defaultCwd?: string): AgentTool<typeof ExecSchema
// If already backgrounded, don't resolve again
if (yielded) return;
// Get output from registry buffer
const entry = PROCESS_REGISTRY.get(processId);
const { output, truncated } = entry
? getFullOutput(entry)
: { output: "", truncated: false };
// Remove from registry since we're returning synchronously
PROCESS_REGISTRY.delete(processId);
// If there's a spawn error, return error message
if (spawnError) {
resolve({
@ -144,7 +174,6 @@ export function createExecTool(defaultCwd?: string): AgentTool<typeof ExecSchema
return;
}
const output = Buffer.concat(chunks).toString("utf8");
resolve({
content: [{ type: "text", text: output || (timedOut ? "Process timed out." : "") }],
details: {

View file

@ -1,8 +1,12 @@
import { type ChildProcess } from "child_process";
import { v7 as uuidv7 } from "uuid";
export const MAX_OUTPUT_BUFFER = 64 * 1024; // 64KB per process
export const TERMINATED_PROCESS_TTL = 60 * 60 * 1000; // 1 hour TTL for terminated processes
// Three-layer buffer size constants
export const PENDING_MAX_SIZE = 30 * 1024; // 30KB - recent unread output
export const AGGREGATED_MAX_SIZE = 200 * 1024; // 200KB - accumulated historical output
export const TAIL_SIZE = 1024; // 1KB - last N bytes for quick peek
export const TERMINATED_PROCESS_TTL = 30 * 60 * 1000; // 30 minutes TTL for terminated processes
const SWEEPER_INTERVAL = 5 * 60 * 1000; // 5 minutes
export type ProcessEntry = {
id: string;
@ -12,13 +16,125 @@ export type ProcessEntry = {
exitCode: number | null;
startedAt: number;
terminatedAt?: number | undefined;
outputBuffer: string[];
outputSize: number;
source: "exec" | "process";
// Three-layer buffer system
pendingBuffer: string; // Recent output (30KB)
aggregatedBuffer: string; // Historical accumulated output (200KB)
tailBuffer: string; // Last N bytes for quick "tail" view (1KB)
totalBytesReceived: number;
truncated: boolean;
};
export const PROCESS_REGISTRY = new Map<string, ProcessEntry>();
// Sweeper state
let sweeperTimer: ReturnType<typeof setInterval> | null = null;
/**
* Flush pending buffer to aggregated buffer.
* Truncates from the head if aggregated exceeds max size.
*/
function flushPendingToAggregated(entry: ProcessEntry): void {
if (!entry.pendingBuffer) return;
entry.aggregatedBuffer += entry.pendingBuffer;
entry.pendingBuffer = "";
// Truncate from head if exceeds max
if (entry.aggregatedBuffer.length > AGGREGATED_MAX_SIZE) {
entry.truncated = true;
entry.aggregatedBuffer = entry.aggregatedBuffer.slice(-AGGREGATED_MAX_SIZE);
}
}
/**
* Append output to process buffers with three-layer management.
*
* Flow:
* 1. Append to pending buffer
* 2. If pending exceeds max, flush to aggregated
* 3. Always update tail buffer with last TAIL_SIZE bytes
*/
export function appendOutput(entry: ProcessEntry, chunk: string): void {
entry.totalBytesReceived += chunk.length;
// 1. Append to pending
entry.pendingBuffer += chunk;
// 2. Flush pending to aggregated if exceeds max
if (entry.pendingBuffer.length > PENDING_MAX_SIZE) {
flushPendingToAggregated(entry);
}
// 3. Update tail buffer (always keep last TAIL_SIZE bytes)
const combined = entry.aggregatedBuffer + entry.pendingBuffer;
entry.tailBuffer = combined.slice(-TAIL_SIZE);
}
/**
* Get full output for a process.
* Returns aggregated + pending + truncation info.
*/
export function getFullOutput(entry: ProcessEntry): {
output: string;
truncated: boolean;
} {
// Flush pending first to get complete view
flushPendingToAggregated(entry);
return {
output: entry.aggregatedBuffer,
truncated: entry.truncated,
};
}
/**
* Get output snapshot for backgrounding.
* Returns current aggregated + pending without flushing.
*/
export function getOutputSnapshot(entry: ProcessEntry): {
output: string;
truncated: boolean;
} {
const output = entry.aggregatedBuffer + entry.pendingBuffer;
return {
output,
truncated: entry.truncated || output.length > AGGREGATED_MAX_SIZE,
};
}
/**
* Start the sweeper if not already running.
* Uses unref() so it doesn't prevent process exit.
*/
function ensureSweeperRunning(): void {
if (sweeperTimer) return;
sweeperTimer = setInterval(() => {
cleanupTerminatedProcesses();
// If registry is empty, stop the sweeper
if (PROCESS_REGISTRY.size === 0) {
stopSweeper();
}
}, SWEEPER_INTERVAL);
// Allow process to exit even if sweeper is running
sweeperTimer.unref();
}
/**
* Stop the sweeper.
*/
function stopSweeper(): void {
if (sweeperTimer) {
clearInterval(sweeperTimer);
sweeperTimer = null;
}
}
/**
* Register a process in the shared registry.
* Sets up output collection and exit handling.
@ -39,33 +155,21 @@ export function registerProcess(
child,
exitCode: null,
startedAt: Date.now(),
outputBuffer: [],
outputSize: 0,
source,
// Three-layer buffer initialization
pendingBuffer: "",
aggregatedBuffer: "",
tailBuffer: "",
totalBytesReceived: 0,
truncated: false,
};
PROCESS_REGISTRY.set(processId, entry);
// Collect output to buffer with size limit
// Collect output using the appendOutput function
const collectOutput = (data: Buffer) => {
let text = data.toString("utf8");
// Truncate if single chunk exceeds max buffer
if (text.length > MAX_OUTPUT_BUFFER) {
text = text.slice(-MAX_OUTPUT_BUFFER);
entry.outputBuffer = [];
entry.outputSize = 0;
} else if (entry.outputSize + text.length > MAX_OUTPUT_BUFFER) {
// Remove old entries to make room
while (
entry.outputBuffer.length > 0 &&
entry.outputSize + text.length > MAX_OUTPUT_BUFFER
) {
const removed = entry.outputBuffer.shift();
if (removed) entry.outputSize -= removed.length;
}
}
entry.outputBuffer.push(text);
entry.outputSize += text.length;
const text = data.toString("utf8");
appendOutput(entry, text);
};
child.stdout?.on("data", collectOutput);
@ -74,8 +178,13 @@ export function registerProcess(
child.on("close", (code) => {
entry.exitCode = code;
entry.terminatedAt = Date.now();
// Flush any remaining pending on close
flushPendingToAggregated(entry);
});
// Start sweeper if not already running
ensureSweeperRunning();
return processId;
}

View file

@ -6,10 +6,11 @@ import {
PROCESS_REGISTRY,
registerProcess,
cleanupTerminatedProcesses,
getFullOutput,
} from "./process-registry.js";
const ProcessSchema = Type.Object({
action: Type.String({ description: "Action: start | status | stop | output | cleanup." }),
action: Type.String({ description: "Action: list | start | status | stop | output | cleanup." }),
id: Type.Optional(Type.String({ description: "Process id for status/stop/output." })),
command: Type.Optional(Type.String({ description: "Command to run for start." })),
cwd: Type.Optional(Type.String({ description: "Working directory." })),
@ -27,7 +28,7 @@ export function createProcessTool(defaultCwd?: string): AgentTool<typeof Process
return {
name: "process",
label: "Process",
description: "Manage long-running background processes like servers, watchers, or daemons. Actions: 'start' to launch (returns immediately with process id), 'status' to check if running, 'output' to read stdout/stderr, 'stop' to terminate, 'cleanup' to remove terminated processes from memory. Use this for servers (e.g., python server.py, npm run dev) instead of 'exec'.",
description: "Manage long-running background processes like servers, watchers, or daemons. Actions: 'list' to show all processes, 'start' to launch (returns immediately with process id), 'status' to check if running, 'output' to read stdout/stderr, 'stop' to terminate, 'cleanup' to remove terminated processes from memory. Use this for servers (e.g., python server.py, npm run dev) instead of 'exec'.",
parameters: ProcessSchema,
execute: async (_toolCallId, params, signal) => {
// Auto-cleanup old terminated processes on each invocation
@ -38,6 +39,41 @@ export function createProcessTool(defaultCwd?: string): AgentTool<typeof Process
throw new Error("Missing action");
}
if (action === "list") {
const processes = Array.from(PROCESS_REGISTRY.values()).map((entry) => {
const running = entry.exitCode === null;
const durationMs = Date.now() - entry.startedAt;
const durationSec = (durationMs / 1000).toFixed(1);
return {
id: entry.id,
command: entry.command.length > 50 ? entry.command.slice(0, 47) + "..." : entry.command,
running,
exitCode: entry.exitCode,
duration: `${durationSec}s`,
source: entry.source,
};
});
if (processes.length === 0) {
return {
content: [{ type: "text", text: "No processes in registry." }],
details: { processes: [] },
};
}
const lines = processes.map((p) => {
const status = p.running ? "running" : `exited(${p.exitCode})`;
return `${p.id} ${p.command.padEnd(50)} ${status.padEnd(12)} ${p.duration.padStart(8)} [${p.source}]`;
});
const header = "ID".padEnd(36) + " " + "COMMAND".padEnd(50) + " " + "STATUS".padEnd(12) + " " + "DURATION".padStart(8) + " SOURCE";
const output = [header, "-".repeat(header.length), ...lines].join("\n");
return {
content: [{ type: "text", text: output }],
details: { processes },
};
}
if (action === "start") {
const command = String(params.command ?? "");
if (!command) throw new Error("Missing command");
@ -138,10 +174,10 @@ export function createProcessTool(defaultCwd?: string): AgentTool<typeof Process
details: { id, running: false },
};
}
const output = entry.outputBuffer.join("");
const { output, truncated } = getFullOutput(entry);
const running = entry.exitCode === null;
return {
content: [{ type: "text", text: output || "(no output)" }],
content: [{ type: "text", text: (output || "(no output)") + (truncated ? "\n[output truncated]" : "") }],
details: { id, running, exitCode: entry.exitCode, output },
};
}