feat(agent): add auto-backgrounding and process management (#17)

* feat(agent): add auto-backgrounding to exec tool

- Add yieldMs parameter to exec tool (default 5s) - commands that don't
  complete within this time automatically run in background
- Create shared process-registry.ts for unified process management
- Refactor process.ts to use shared registry
- Add --debug CLI flag for session message logging
- Signal isolation: backgrounded processes ignore abort signals

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* fix(session): preserve tool_use/tool_result pairs during compaction

Previously, session compaction simply kept the last N messages, which
could break tool_use/tool_result pairs if the cut point fell between
them. This caused "tool_call_id is not found" errors from the API.

Now compaction finds a safe cut point that starts from either:
- A user message without tool_result
- An assistant message whose tool_use is needed by the next tool_result

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* feat(session): use Kimi as default model for summary compaction

- Auto-detect MOONSHOT_API_KEY from environment
- Use moonshot-v1-128k (cheaper than k2-thinking)
- Fall back to tokens mode if API key not available

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* docs: add rule to never use git commit --amend

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* docs: clarify git amend rule for immediate fixes

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Jiayuan 2026-01-30 04:22:42 +08:00 committed by GitHub
parent e2e8cc15d6
commit 5931e8f84e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 289 additions and 81 deletions

View file

@ -60,3 +60,4 @@ Create three commits:
- Never combine unrelated changes in one commit
- Keep commit messages concise but descriptive
- If all changes are related to one logical unit, a single commit is fine
- `git commit --amend` can be used for immediate small fixes to the last commit, but not for unrelated changes

View file

@ -9,6 +9,7 @@ type CliOptions = {
thinking?: string | undefined;
cwd?: string | undefined;
session?: string | undefined;
debug?: boolean | undefined;
help?: boolean | undefined;
};
@ -24,6 +25,7 @@ function printUsage() {
console.log(" --thinking LEVEL Thinking level");
console.log(" --cwd DIR Working directory for commands");
console.log(" --session ID Session ID for conversation persistence");
console.log(" --debug Enable debug logging");
console.log(" --help, -h Show this help");
}
@ -67,6 +69,10 @@ function parseArgs(argv: string[]) {
opts.session = args.shift();
continue;
}
if (arg === "--debug") {
opts.debug = true;
continue;
}
if (arg === "--") {
promptParts.push(...args);
break;
@ -110,6 +116,7 @@ async function main() {
thinkingLevel: opts.thinking as any,
cwd: opts.cwd,
sessionId: opts.session,
debug: opts.debug,
});
// If it's a newly created session, notify user of sessionId

View file

@ -44,6 +44,7 @@ export class Agent {
private readonly session: SessionManager;
private readonly profile?: ProfileManager;
private readonly contextWindowGuard: ContextWindowGuardResult;
private readonly debug: boolean;
/** Current session ID */
readonly sessionId: string;
@ -52,6 +53,7 @@ export class Agent {
const stdout = options.logger?.stdout ?? process.stdout;
const stderr = options.logger?.stderr ?? process.stderr;
this.output = createAgentOutput({ stdout, stderr });
this.debug = options.debug ?? false;
this.agent = new PiAgentCore();
@ -142,6 +144,26 @@ export class Agent {
const restoredMessages = this.session.loadMessages();
if (restoredMessages.length > 0) {
if (this.debug) {
console.error(`[debug] Restoring ${restoredMessages.length} messages from session`);
for (const msg of restoredMessages) {
const msgAny = msg as any;
const content = Array.isArray(msgAny.content)
? msgAny.content.map((c: any) => c.type || "text").join(", ")
: typeof msgAny.content;
console.error(`[debug] ${msg.role}: ${content}`);
if (Array.isArray(msgAny.content)) {
for (const block of msgAny.content) {
if (block.type === "tool_use") {
console.error(`[debug] tool_use id: ${block.id}, name: ${block.name}`);
}
if (block.type === "tool_result") {
console.error(`[debug] tool_result tool_use_id: ${block.tool_use_id}`);
}
}
}
}
}
this.agent.replaceMessages(restoredMessages);
}

View file

@ -22,6 +22,58 @@ export type CompactionResult = {
reason: "count" | "tokens" | "summary";
};
/**
* Find a safe compaction point that doesn't break tool_use/tool_result pairs.
* Returns the index to start keeping messages from.
*/
function findSafeCompactionPoint(messages: AgentMessage[], targetStart: number): number {
let start = targetStart;
// Move forward until we find a safe starting point
while (start < messages.length) {
const msg = messages[start];
if (!msg) {
start++;
continue;
}
// Safe to start from a user message
if (msg.role === "user") {
// But make sure it's not a toolResult without corresponding tool_use
const msgAny = msg as any;
if (Array.isArray(msgAny.content)) {
const hasToolResult = msgAny.content.some((b: any) => b.type === "tool_result");
if (!hasToolResult) {
break; // Safe: user message without tool_result
}
} else {
break; // Safe: simple user message
}
}
// toolResult messages need their corresponding tool_use, skip them
// assistant messages are ok to start from if they don't reference missing tool calls
if (msg.role === "assistant") {
// Check if previous messages have the required tool_use for any following tool_result
const nextMsg = messages[start + 1];
if (nextMsg && nextMsg.role === "user") {
const nextAny = nextMsg as any;
if (Array.isArray(nextAny.content)) {
const hasToolResult = nextAny.content.some((b: any) => b.type === "tool_result");
if (hasToolResult) {
// This assistant message has tool_use that's needed by next message
break;
}
}
}
}
start++;
}
return start;
}
/**
* Simple compression based on message count (legacy logic, maintains backward compatibility)
*/
@ -31,7 +83,22 @@ export function compactMessagesByCount(
keepLast: number,
): CompactionResult | null {
if (messages.length <= maxMessages) return null;
const kept = messages.slice(-keepLast);
const targetStart = messages.length - keepLast;
const safeStart = findSafeCompactionPoint(messages, targetStart);
// If we can't find a safe point, don't compact
if (safeStart >= messages.length) {
return null;
}
const kept = messages.slice(safeStart);
// Don't compact if we'd keep almost everything anyway
if (kept.length >= messages.length - 2) {
return null;
}
return {
kept,
removedCount: messages.length - kept.length,

View file

@ -1,9 +1,19 @@
import type { AgentMessage } from "@mariozechner/pi-agent-core";
import type { Model } from "@mariozechner/pi-ai";
import { getModel, type Model } from "@mariozechner/pi-ai";
import type { SessionEntry, SessionMeta } from "./types.js";
import { appendEntry, readEntries, writeEntries } from "./storage.js";
import { compactMessages, compactMessagesAsync } from "./compaction.js";
/** Get Kimi model for summarization (use a cheaper model than k2-thinking) */
function getSummaryModel(): Model<any> {
return (getModel as (p: string, m: string) => Model<any>)("kimi", "moonshot-v1-128k");
}
/** Get Kimi API key */
function getSummaryApiKey(): string | undefined {
return process.env.KIMI_API_KEY ?? process.env.MOONSHOT_API_KEY;
}
export type SessionManagerOptions = {
sessionId: string;
baseDir?: string | undefined;
@ -171,9 +181,12 @@ export class SessionManager {
let result;
if (this.compactionMode === "summary") {
// Summary mode requires model and apiKey
if (!this.model || !this.apiKey) {
// Downgrade to tokens mode
// Use provided model/apiKey or fall back to Kimi
const model = this.model ?? getSummaryModel();
const apiKey = this.apiKey ?? getSummaryApiKey();
if (!apiKey) {
// No API key available, downgrade to tokens mode
result = compactMessages(messages, {
mode: "tokens",
contextWindowTokens: this.contextWindowTokens,
@ -185,8 +198,8 @@ export class SessionManager {
} else {
result = await compactMessagesAsync(messages, {
mode: "summary",
model: this.model,
apiKey: this.apiKey,
model,
apiKey,
contextWindowTokens: this.contextWindowTokens,
systemPrompt: this.systemPrompt,
reserveTokens: this.reserveTokens,

View file

@ -1,6 +1,7 @@
import { spawn } from "child_process";
import { Type } from "@sinclair/typebox";
import type { AgentTool } from "@mariozechner/pi-agent-core";
import { registerProcess } from "./process-registry.js";
const ExecSchema = Type.Object({
command: Type.String({ description: "Shell command to execute." }),
@ -8,39 +9,57 @@ const ExecSchema = Type.Object({
timeoutMs: Type.Optional(
Type.Number({ description: "Timeout in milliseconds.", minimum: 0 }),
),
yieldMs: Type.Optional(
Type.Number({
description:
"Auto-background threshold in milliseconds. If command doesn't complete within this time, it runs in background. Default 5000ms. Set to 0 to disable auto-backgrounding.",
minimum: 0,
}),
),
});
type ExecArgs = {
command: string;
cwd?: string;
timeoutMs?: number;
yieldMs?: number;
};
export type ExecResult = {
output: string;
exitCode: number | null;
truncated: boolean;
backgrounded?: boolean;
processId?: string;
};
const MAX_OUTPUT_BYTES = 64 * 1024;
const DEFAULT_YIELD_MS = 5000;
export function createExecTool(defaultCwd?: string): AgentTool<typeof ExecSchema, ExecResult> {
return {
name: "exec",
label: "Exec",
description: "Execute a shell command and wait for it to complete. Returns stdout/stderr output. Use this for short-lived commands only (e.g., ls, cat, pip install). Do NOT use for long-running processes like servers - use the 'process' tool instead.",
description:
"Execute a shell command. If the command doesn't complete within yieldMs (default 5s), it automatically runs in background and returns a process ID. Use 'process output <id>' to check output, 'process status <id>' to check status, 'process stop <id>' to terminate.",
parameters: ExecSchema,
execute: async (_toolCallId, args, signal) => {
const { command, cwd, timeoutMs } = args as ExecArgs;
return new Promise((resolve, reject) => {
const { command, cwd, timeoutMs, yieldMs = DEFAULT_YIELD_MS } = args as ExecArgs;
const effectiveCwd = cwd || defaultCwd;
return new Promise((resolve) => {
const child = spawn(command, {
shell: true,
cwd: cwd || defaultCwd,
cwd: effectiveCwd,
stdio: ["ignore", "pipe", "pipe"],
});
let timedOut = false;
let yielded = false;
let timeout: NodeJS.Timeout | undefined;
let yieldTimer: NodeJS.Timeout | undefined;
// Timeout handling (hard kill)
if (timeoutMs && timeoutMs > 0) {
timeout = setTimeout(() => {
timedOut = true;
@ -48,6 +67,36 @@ export function createExecTool(defaultCwd?: string): AgentTool<typeof ExecSchema
}, timeoutMs);
}
// Yield window handling (auto-background)
if (yieldMs > 0) {
yieldTimer = setTimeout(() => {
if (yielded) return;
yielded = true;
// Clear timeout since we're backgrounding
if (timeout) clearTimeout(timeout);
// Register to shared process registry
const processId = registerProcess(child, command, effectiveCwd, "exec");
resolve({
content: [
{
type: "text",
text: `Command running in background. Process ID: ${processId}\nUse 'process output ${processId}' to check output.`,
},
],
details: {
output: "",
exitCode: null,
truncated: false,
backgrounded: true,
processId,
},
});
}, yieldMs);
}
const chunks: Buffer[] = [];
let size = 0;
let truncated = false;
@ -70,11 +119,17 @@ export function createExecTool(defaultCwd?: string): AgentTool<typeof ExecSchema
let spawnError: Error | null = null;
child.on("error", (err) => {
if (timeout) clearTimeout(timeout);
if (yieldTimer) clearTimeout(yieldTimer);
spawnError = err;
// Don't reject, let close event handle
});
child.on("close", (code) => {
if (timeout) clearTimeout(timeout);
if (yieldTimer) clearTimeout(yieldTimer);
// If already backgrounded, don't resolve again
if (yielded) return;
// If there's a spawn error, return error message
if (spawnError) {
@ -100,8 +155,12 @@ export function createExecTool(defaultCwd?: string): AgentTool<typeof ExecSchema
});
});
// Signal handling: don't kill if already backgrounded
if (signal) {
signal.addEventListener("abort", () => {
if (yielded) return; // Already backgrounded, ignore abort
if (timeout) clearTimeout(timeout);
if (yieldTimer) clearTimeout(yieldTimer);
child.kill("SIGTERM");
});
}

View file

@ -0,0 +1,96 @@
import { type ChildProcess } from "child_process";
import { v7 as uuidv7 } from "uuid";
export const MAX_OUTPUT_BUFFER = 64 * 1024; // 64KB per process
export const TERMINATED_PROCESS_TTL = 60 * 60 * 1000; // 1 hour TTL for terminated processes
export type ProcessEntry = {
id: string;
command: string;
cwd?: string | undefined;
child: ChildProcess;
exitCode: number | null;
startedAt: number;
terminatedAt?: number | undefined;
outputBuffer: string[];
outputSize: number;
source: "exec" | "process";
};
export const PROCESS_REGISTRY = new Map<string, ProcessEntry>();
/**
* Register a process in the shared registry.
* Sets up output collection and exit handling.
*/
export function registerProcess(
child: ChildProcess,
command: string,
cwd: string | undefined,
source: "exec" | "process",
id?: string,
): string {
const processId = id ?? uuidv7();
const entry: ProcessEntry = {
id: processId,
command,
cwd,
child,
exitCode: null,
startedAt: Date.now(),
outputBuffer: [],
outputSize: 0,
source,
};
PROCESS_REGISTRY.set(processId, entry);
// Collect output to buffer with size limit
const collectOutput = (data: Buffer) => {
let text = data.toString("utf8");
// Truncate if single chunk exceeds max buffer
if (text.length > MAX_OUTPUT_BUFFER) {
text = text.slice(-MAX_OUTPUT_BUFFER);
entry.outputBuffer = [];
entry.outputSize = 0;
} else if (entry.outputSize + text.length > MAX_OUTPUT_BUFFER) {
// Remove old entries to make room
while (
entry.outputBuffer.length > 0 &&
entry.outputSize + text.length > MAX_OUTPUT_BUFFER
) {
const removed = entry.outputBuffer.shift();
if (removed) entry.outputSize -= removed.length;
}
}
entry.outputBuffer.push(text);
entry.outputSize += text.length;
};
child.stdout?.on("data", collectOutput);
child.stderr?.on("data", collectOutput);
child.on("close", (code) => {
entry.exitCode = code;
entry.terminatedAt = Date.now();
});
return processId;
}
/**
* Remove terminated processes older than TTL.
* Returns the number of processes removed.
*/
export function cleanupTerminatedProcesses(): number {
const now = Date.now();
let removed = 0;
for (const [id, entry] of PROCESS_REGISTRY) {
if (entry.terminatedAt && now - entry.terminatedAt > TERMINATED_PROCESS_TTL) {
PROCESS_REGISTRY.delete(id);
removed++;
}
}
return removed;
}

View file

@ -1,7 +1,12 @@
import { spawn, type ChildProcess } from "child_process";
import { spawn } from "child_process";
import { Type } from "@sinclair/typebox";
import type { AgentTool } from "@mariozechner/pi-agent-core";
import { v7 as uuidv7 } from "uuid";
import {
PROCESS_REGISTRY,
registerProcess,
cleanupTerminatedProcesses,
} from "./process-registry.js";
const ProcessSchema = Type.Object({
action: Type.String({ description: "Action: start | status | stop | output | cleanup." }),
@ -10,36 +15,6 @@ const ProcessSchema = Type.Object({
cwd: Type.Optional(Type.String({ description: "Working directory." })),
});
const MAX_OUTPUT_BUFFER = 64 * 1024; // 64KB per process
const TERMINATED_PROCESS_TTL = 60 * 60 * 1000; // 1 hour TTL for terminated processes
type ProcessEntry = {
id: string;
command: string;
cwd?: string | undefined;
child: ChildProcess;
exitCode: number | null;
startedAt: number;
terminatedAt?: number | undefined;
outputBuffer: string[];
outputSize: number;
};
const PROCESS_REGISTRY = new Map<string, ProcessEntry>();
/** Remove terminated processes older than TTL */
function cleanupTerminatedProcesses(): number {
const now = Date.now();
let removed = 0;
for (const [id, entry] of PROCESS_REGISTRY) {
if (entry.terminatedAt && now - entry.terminatedAt > TERMINATED_PROCESS_TTL) {
PROCESS_REGISTRY.delete(id);
removed++;
}
}
return removed;
}
export type ProcessResult = {
id?: string | undefined;
running?: boolean | undefined;
@ -71,11 +46,13 @@ export function createProcessTool(defaultCwd?: string): AgentTool<typeof Process
throw new Error(`Process already exists: ${id}`);
}
const cwd = params.cwd || defaultCwd;
// 使用 Promise 等待进程启动或失败
const result = await new Promise<{ success: boolean; error?: string }>((resolve) => {
const child = spawn(command, {
shell: true,
cwd: params.cwd || defaultCwd,
cwd,
stdio: ["ignore", "pipe", "pipe"],
detached: true,
});
@ -94,44 +71,7 @@ export function createProcessTool(defaultCwd?: string): AgentTool<typeof Process
child.on("spawn", () => {
if (!resolved) {
resolved = true;
const entry: ProcessEntry = {
id,
command,
cwd: params.cwd || defaultCwd,
child,
exitCode: null,
startedAt: Date.now(),
outputBuffer: [],
outputSize: 0,
};
PROCESS_REGISTRY.set(id, entry);
// Collect output to buffer with size limit
const collectOutput = (data: Buffer) => {
let text = data.toString("utf8");
// Truncate if single chunk exceeds max buffer
if (text.length > MAX_OUTPUT_BUFFER) {
text = text.slice(-MAX_OUTPUT_BUFFER);
entry.outputBuffer = [];
entry.outputSize = 0;
} else if (entry.outputSize + text.length > MAX_OUTPUT_BUFFER) {
// Remove old entries to make room
while (entry.outputBuffer.length > 0 && entry.outputSize + text.length > MAX_OUTPUT_BUFFER) {
const removed = entry.outputBuffer.shift();
if (removed) entry.outputSize -= removed.length;
}
}
entry.outputBuffer.push(text);
entry.outputSize += text.length;
};
child.stdout?.on("data", collectOutput);
child.stderr?.on("data", collectOutput);
child.on("close", (code) => {
entry.exitCode = code;
entry.terminatedAt = Date.now();
});
registerProcess(child, command, cwd, "process", id);
if (signal) {
signal.addEventListener("abort", () => {

View file

@ -45,4 +45,7 @@ export type AgentOptions = {
// === Summary Compaction Configuration ===
/** Custom summary generation instructions */
summaryInstructions?: string | undefined;
/** Enable debug logging */
debug?: boolean | undefined;
};