Merge pull request #102 from multica-ai/sessions-spawn-parent-trigger

feat(subagent): coalesce spawn announcements
This commit is contained in:
LinYushen 2026-02-06 20:03:29 +08:00 committed by GitHub
commit 0edc98142d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
23 changed files with 1538 additions and 94 deletions

View file

@ -343,6 +343,9 @@ export function registerHubIpcHandlers(): void {
* Get message history for local chat with pagination.
* Returns raw AgentMessageItem[] so the renderer can render content blocks,
* tool results, thinking blocks, etc. same format as the Gateway RPC.
*
* Reads from session storage (not in-memory state) so that internal
* orchestration messages are excluded by default.
*/
ipcMain.handle('localChat:getHistory', async (_event, agentId: string, options?: { offset?: number; limit?: number }) => {
const h = getHub()
@ -353,7 +356,7 @@ export function registerHubIpcHandlers(): void {
try {
await agent.ensureInitialized()
const allMessages = agent.getMessages()
const allMessages = agent.loadSessionMessages()
const total = allMessages.length
// Must match DEFAULT_MESSAGES_LIMIT from @multica/sdk/actions/rpc
const limit = options?.limit ?? 200

View file

@ -0,0 +1,264 @@
import { afterEach, describe, expect, it, vi } from "vitest";
import { AsyncAgent } from "./async-agent.js";
const subscribeCallbacks: Array<(event: any) => void> = [];
const internalRunState = { value: false };
const runMock = vi.fn(async () => ({ text: "", thinking: undefined, error: undefined }));
const runInternalMock = vi.fn(async () => ({ text: "", thinking: undefined, error: undefined }));
const flushSessionMock = vi.fn(async () => {});
const persistAssistantSummaryMock = vi.fn();
const subscribeAllMock = vi.fn((fn: (event: any) => void) => {
subscribeCallbacks.push(fn);
return () => {};
});
vi.mock("./runner.js", () => ({
Agent: class MockAgent {
sessionId = "test-session";
subscribeAll = subscribeAllMock;
run = runMock;
runInternal = runInternalMock;
flushSession = flushSessionMock;
persistAssistantSummary = persistAssistantSummaryMock;
get isInternalRun() {
return internalRunState.value;
}
getMessages() {
return [];
}
loadSessionMessages() {
return [];
}
async ensureInitialized() {}
getActiveTools() {
return [];
}
reloadTools() {
return [];
}
getSkillsWithStatus() {
return [];
}
getEligibleSkills() {
return [];
}
reloadSkills() {}
setToolStatus() {
return undefined;
}
getProfileId() {
return undefined;
}
getAgentName() {
return undefined;
}
setAgentName() {}
getUserContent() {
return undefined;
}
setUserContent() {}
getAgentStyle() {
return undefined;
}
setAgentStyle() {}
reloadSystemPrompt() {}
getProviderInfo() {
return { provider: "test", model: "test-model" };
}
setProvider() {
return { provider: "test", model: "test-model" };
}
},
}));
async function nextWithTimeout<T>(iter: AsyncIterator<T>, timeoutMs = 40): Promise<"timeout" | T> {
return await Promise.race([
iter.next().then((result) => (result.done ? "timeout" : result.value)),
new Promise<"timeout">((resolve) => setTimeout(() => resolve("timeout"), timeoutMs)),
]);
}
describe("AsyncAgent internal flow", () => {
afterEach(() => {
subscribeCallbacks.length = 0;
internalRunState.value = false;
runMock.mockReset();
runInternalMock.mockReset();
flushSessionMock.mockReset();
persistAssistantSummaryMock.mockReset();
subscribeAllMock.mockClear();
runMock.mockResolvedValue({ text: "", thinking: undefined, error: undefined });
runInternalMock.mockResolvedValue({ text: "", thinking: undefined, error: undefined });
flushSessionMock.mockResolvedValue(undefined);
});
it("filters internal events in direct subscribe stream", () => {
const agent = new AsyncAgent();
const events: Array<{ type: string }> = [];
const unsubscribe = agent.subscribe((event) => {
events.push(event as { type: string });
});
// subscribeAll is called twice:
// 1) constructor for read() channel forwarding
// 2) subscribe() for direct callback forwarding
const subscribeCallback = subscribeCallbacks[1];
expect(subscribeCallback).toBeDefined();
internalRunState.value = true;
subscribeCallback!({ type: "message_end" });
expect(events).toHaveLength(0);
internalRunState.value = false;
subscribeCallback!({ type: "message_end" });
expect(events).toHaveLength(1);
unsubscribe();
agent.close();
});
it("does not leak internal run errors to read() stream", async () => {
runInternalMock.mockResolvedValueOnce({ text: "", thinking: undefined, error: "internal failed" });
const agent = new AsyncAgent();
const iter = agent.read()[Symbol.asyncIterator]();
agent.writeInternal("test internal");
await agent.waitForIdle();
const value = await nextWithTimeout(iter);
expect(value).toBe("timeout");
agent.close();
});
it("does not leak internal run exceptions to read() stream", async () => {
runInternalMock.mockRejectedValueOnce(new Error("internal exception"));
const agent = new AsyncAgent();
const iter = agent.read()[Symbol.asyncIterator]();
agent.writeInternal("test internal");
await agent.waitForIdle();
const value = await nextWithTimeout(iter);
expect(value).toBe("timeout");
agent.close();
});
it("forwards assistant message stream (start/update/end) when writeInternal opts in", async () => {
let resolveRunInternal: ((value: { text: string; thinking: undefined; error: undefined }) => void) | undefined;
runInternalMock.mockImplementationOnce(
() => new Promise((resolve) => {
resolveRunInternal = resolve as typeof resolveRunInternal;
}),
);
const agent = new AsyncAgent();
const iter = agent.read()[Symbol.asyncIterator]();
const streamCallback = subscribeCallbacks[0];
expect(streamCallback).toBeDefined();
agent.writeInternal("announce", { forwardAssistant: true });
await Promise.resolve();
internalRunState.value = true;
streamCallback!({
type: "message_start",
message: { role: "assistant", content: [] },
});
streamCallback!({
type: "message_update",
message: { role: "assistant", content: [{ type: "text", text: "partial" }] },
});
streamCallback!({
type: "message_end",
message: { role: "user", content: [{ type: "text", text: "hidden internal prompt" }] },
});
streamCallback!({
type: "message_end",
message: { role: "assistant", content: [{ type: "text", text: "visible summary" }] },
});
const first = await nextWithTimeout(iter);
expect(first).not.toBe("timeout");
if (first !== "timeout") {
expect((first as { type: string }).type).toBe("message_start");
expect((first as { message: { role: string } }).message.role).toBe("assistant");
}
const second = await nextWithTimeout(iter);
expect(second).not.toBe("timeout");
if (second !== "timeout") {
expect((second as { type: string }).type).toBe("message_update");
expect((second as { message: { role: string } }).message.role).toBe("assistant");
}
const third = await nextWithTimeout(iter);
expect(third).not.toBe("timeout");
if (third !== "timeout") {
expect((third as { type: string }).type).toBe("message_end");
expect((third as { message: { role: string } }).message.role).toBe("assistant");
}
const fourth = await nextWithTimeout(iter);
expect(fourth).toBe("timeout");
resolveRunInternal!({ text: "", thinking: undefined, error: undefined });
await agent.waitForIdle();
internalRunState.value = false;
agent.close();
});
it("persists assistant summary when persistResponse is true and result has text", async () => {
runInternalMock.mockResolvedValueOnce({ text: "Summary of findings", thinking: undefined, error: undefined });
const agent = new AsyncAgent();
agent.writeInternal("announce findings", { forwardAssistant: true, persistResponse: true });
await agent.waitForIdle();
expect(persistAssistantSummaryMock).toHaveBeenCalledOnce();
expect(persistAssistantSummaryMock).toHaveBeenCalledWith("Summary of findings");
// flushSession called twice: once after runInternal, once after persistAssistantSummary
expect(flushSessionMock).toHaveBeenCalledTimes(2);
agent.close();
});
it("does not persist assistant summary when result text is NO_REPLY", async () => {
runInternalMock.mockResolvedValueOnce({ text: "NO_REPLY", thinking: undefined, error: undefined });
const agent = new AsyncAgent();
agent.writeInternal("announce findings", { forwardAssistant: true, persistResponse: true });
await agent.waitForIdle();
expect(persistAssistantSummaryMock).not.toHaveBeenCalled();
agent.close();
});
it("does not persist assistant summary when result text is empty", async () => {
runInternalMock.mockResolvedValueOnce({ text: " ", thinking: undefined, error: undefined });
const agent = new AsyncAgent();
agent.writeInternal("announce findings", { forwardAssistant: true, persistResponse: true });
await agent.waitForIdle();
expect(persistAssistantSummaryMock).not.toHaveBeenCalled();
agent.close();
});
it("does not persist assistant summary when persistResponse is not set", async () => {
runInternalMock.mockResolvedValueOnce({ text: "Summary of findings", thinking: undefined, error: undefined });
const agent = new AsyncAgent();
agent.writeInternal("announce findings", { forwardAssistant: true });
await agent.waitForIdle();
expect(persistAssistantSummaryMock).not.toHaveBeenCalled();
agent.close();
});
});

View file

@ -10,6 +10,13 @@ const devNull = { write: () => true } as unknown as NodeJS.WritableStream;
/** Discriminated union of legacy Message, raw AgentEvent, and MulticaEvent */
export type ChannelItem = Message | AgentEvent | MulticaEvent;
export interface WriteInternalOptions {
/** Forward assistant message_end events to realtime stream during internal runs */
forwardAssistant?: boolean | undefined;
/** After internal run completes, persist the LLM's summary as a non-internal assistant message */
persistResponse?: boolean | undefined;
}
export class AsyncAgent {
private readonly agent: Agent;
private readonly channel = new Channel<ChannelItem>();
@ -17,6 +24,7 @@ export class AsyncAgent {
private queue: Promise<void> = Promise.resolve();
private pendingWrites = 0;
private closeCallbacks: Array<() => void> = [];
private forwardInternalAssistant = false;
readonly sessionId: string;
constructor(options?: AgentOptions) {
@ -26,8 +34,11 @@ export class AsyncAgent {
});
this.sessionId = this.agent.sessionId;
// Forward raw AgentEvent and MulticaEvent into the channel
// Forward raw AgentEvent and MulticaEvent into the channel.
// Suppress forwarding during internal runs to avoid leaking
// orchestration messages to the frontend/real-time stream.
this.agent.subscribeAll((event: AgentEvent | MulticaEvent) => {
if (!this.shouldForwardEvent(event)) return;
this.channel.send(event);
});
}
@ -62,6 +73,44 @@ export class AsyncAgent {
});
}
/**
* Write an internal message to agent (non-blocking, serialized queue).
* Messages are persisted with `internal: true` and rolled back from
* in-memory state. Events are suppressed from the real-time stream by default.
*/
writeInternal(content: string, options?: WriteInternalOptions): void {
if (this._closed) throw new Error("Agent is closed");
const forwardAssistant = options?.forwardAssistant === true;
const persistResponse = options?.persistResponse === true;
this.queue = this.queue
.then(async () => {
if (this._closed) return;
const prevForward = this.forwardInternalAssistant;
this.forwardInternalAssistant = forwardAssistant;
try {
const result = await this.agent.runInternal(content);
await this.agent.flushSession();
if (result.error) {
// Internal run errors are for diagnostics only; do not leak to user stream.
console.error(`[AsyncAgent] Internal run error: ${result.error}`);
}
// Persist the LLM summary so it remains in parent context for future turns
if (persistResponse && result.text?.trim() && result.text.trim() !== "NO_REPLY") {
this.agent.persistAssistantSummary(result.text.trim());
await this.agent.flushSession();
}
} finally {
this.forwardInternalAssistant = prevForward;
}
})
.catch((err) => {
const message = err instanceof Error ? err.message : String(err);
// Internal run exceptions are for diagnostics only; do not leak to user stream.
console.error(`[AsyncAgent] Internal run failed: ${message}`);
});
}
/** Continuously read channel stream (AgentEvent + error Messages) */
read(): AsyncIterable<ChannelItem> {
return this.channel;
@ -75,6 +124,7 @@ export class AsyncAgent {
subscribe(callback: (event: AgentEvent | MulticaEvent) => void): () => void {
console.log(`[AsyncAgent] Adding subscriber for agent: ${this.sessionId}`);
const unsubscribe = this.agent.subscribeAll((event) => {
if (!this.shouldForwardEvent(event)) return;
console.log(`[AsyncAgent] Event received: ${event.type}`);
callback(event);
});
@ -89,6 +139,18 @@ export class AsyncAgent {
return this.queue;
}
private shouldForwardEvent(event: AgentEvent | MulticaEvent): boolean {
if (!this.agent.isInternalRun) return true;
if (!this.forwardInternalAssistant) return false;
if (event.type !== "message_start" && event.type !== "message_update" && event.type !== "message_end") {
return false;
}
const maybeMessage = (event as { message?: unknown }).message;
if (!maybeMessage || typeof maybeMessage !== "object") return false;
return (maybeMessage as { role?: unknown }).role === "assistant";
}
/** Register a callback to be invoked when the agent is closed */
onClose(callback: () => void): void {
if (this._closed) {
@ -259,12 +321,20 @@ export class AsyncAgent {
}
/**
* Get all messages from the current session.
* Get all messages from the current session (in-memory state).
*/
getMessages(): AgentMessage[] {
return this.agent.getMessages();
}
/**
* Load messages from session storage with filtering.
* By default, internal messages are excluded.
*/
loadSessionMessages(options?: { includeInternal?: boolean }): AgentMessage[] {
return this.agent.loadSessionMessages(options);
}
/**
* Get current provider and model information.
*/

View file

@ -22,7 +22,7 @@ ${cyan("Usage:")} multica session <command> [options]
${cyan("Commands:")}
${yellow("list")} List all sessions
${yellow("show")} <id> Show session details
${yellow("show")} <id> Show session details (use --show-internal to include internal messages)
${yellow("delete")} <id> Delete a session
${yellow("help")} Show this help
@ -122,7 +122,7 @@ function cmdList() {
console.log(`${dim("Resume with:")} multica --session <id>`);
}
function cmdShow(sessionId: string | undefined) {
function cmdShow(sessionId: string | undefined, showInternal = false) {
if (!sessionId) {
console.error("Error: Session ID is required");
console.error("Usage: multica session show <id>");
@ -160,14 +160,25 @@ function cmdShow(sessionId: string | undefined) {
console.log(cyan("─".repeat(60)));
console.log("");
// Parse and display messages
// Parse and display messages as SessionEntry objects
for (const line of lines) {
try {
const msg = JSON.parse(line);
const entry = JSON.parse(line);
// Only display message entries
if (entry.type !== "message") continue;
// Skip internal messages unless --show-internal
if (entry.internal && !showInternal) continue;
const msg = entry.message;
if (!msg) continue;
const role = msg.role || "unknown";
const roleColor = role === "user" ? green : role === "assistant" ? cyan : dim;
const internalTag = entry.internal ? dim(" [internal]") : "";
console.log(`${roleColor(`[${role}]`)}`);
console.log(`${roleColor(`[${role}]`)}${internalTag}`);
if (typeof msg.content === "string") {
// Truncate long content
@ -238,6 +249,7 @@ function cmdDelete(sessionId: string | undefined) {
export async function sessionCommand(args: string[]): Promise<void> {
const command = (args[0] || "help") as Command;
const arg1 = args[1];
const showInternal = args.includes("--show-internal");
if (args.includes("--help") || args.includes("-h")) {
printHelp();
@ -249,7 +261,7 @@ export async function sessionCommand(args: string[]): Promise<void> {
cmdList();
break;
case "show":
cmdShow(arg1);
cmdShow(arg1, showInternal);
break;
case "delete":
cmdDelete(arg1);

View file

@ -84,6 +84,10 @@ export class Agent {
private readonly stderr: NodeJS.WritableStream;
private initialized = false;
// Internal run state
private _internalRun = false;
private _runMutex: Promise<void> = Promise.resolve();
// MulticaEvent subscribers (parallel to PiAgentCore's subscriber list)
// Typed as AgentEvent | MulticaEvent to match subscribeAll() callback signature
private multicaListeners: Array<(event: AgentEvent | MulticaEvent) => void> = [];
@ -353,6 +357,48 @@ export class Agent {
}
async run(prompt: string): Promise<AgentRunResult> {
// Run-level mutex: prevents concurrent run/runInternal from mis-tagging messages
return this.withRunMutex(() => this._run(prompt));
}
/**
* Run a prompt as an internal turn.
* Messages are persisted with `internal: true` and rolled back from
* in-memory state after the turn completes, so they do not pollute
* the main conversation context.
*/
async runInternal(prompt: string): Promise<AgentRunResult> {
return this.withRunMutex(async () => {
const messageCountBefore = this.agent.state.messages.length;
this._internalRun = true;
try {
const result = await this._run(prompt);
return result;
} finally {
this._internalRun = false;
// Roll back internal messages from in-memory state
const current = this.agent.state.messages;
if (current.length > messageCountBefore) {
this.agent.replaceMessages(current.slice(0, messageCountBefore));
}
}
});
}
private async withRunMutex<T>(fn: () => Promise<T>): Promise<T> {
// Chain on the mutex so only one run executes at a time
const prev = this._runMutex;
let resolve: () => void;
this._runMutex = new Promise<void>((r) => { resolve = r; });
await prev;
try {
return await fn();
} finally {
resolve!();
}
}
private async _run(prompt: string): Promise<AgentRunResult> {
await this.ensureInitialized();
this.output.state.lastAssistantText = "";
@ -442,8 +488,10 @@ export class Agent {
private handleSessionEvent(event: AgentEvent) {
if (event.type === "message_end") {
const message = event.message as AgentMessage;
this.session.saveMessage(message);
if (message.role === "assistant") {
this.session.saveMessage(message, this._internalRun ? { internal: true } : undefined);
// Skip compaction during internal runs — internal messages will be
// rolled back from memory afterwards, so compacting now would be incorrect.
if (message.role === "assistant" && !this._internalRun) {
void this.maybeCompact();
}
}
@ -510,6 +558,40 @@ export class Agent {
return this.agent.state.tools?.map(t => t.name) ?? [];
}
/** Whether the agent is currently executing an internal run */
get isInternalRun(): boolean {
return this._internalRun;
}
/**
* Persist a synthetic assistant message into both in-memory state and session JSONL.
* Used after an internal run to keep the LLM summary visible in future turns
* while the internal prompt stays hidden.
*/
persistAssistantSummary(text: string): void {
const model = this.agent.state.model;
const message = {
role: "assistant" as const,
content: [{ type: "text" as const, text }],
api: model?.api ?? "openai-completions",
provider: model?.provider ?? "internal",
model: model?.id ?? "unknown",
usage: {
input: 0,
output: 0,
cacheRead: 0,
cacheWrite: 0,
totalTokens: 0,
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 },
},
stopReason: "stop" as const,
timestamp: Date.now(),
};
this.agent.appendMessage(message);
this.session.saveMessage(message);
}
/** Ensure session messages are loaded from disk (idempotent) */
async ensureInitialized(): Promise<void> {
if (this.initialized) return;
@ -521,11 +603,19 @@ export class Agent {
this.initialized = true;
}
/** Get all messages from the current session */
/** Get all messages from the current session (in-memory state) */
getMessages(): AgentMessage[] {
return this.agent.state.messages.slice();
}
/**
* Load messages from session storage with filtering.
* By default, internal messages are excluded.
*/
loadSessionMessages(options?: { includeInternal?: boolean }): AgentMessage[] {
return this.session.loadMessages(options);
}
/**
* Get all skills with their eligibility status.
* Returns empty array if skills are disabled.

View file

@ -167,11 +167,15 @@ export class SessionManager {
return repairSessionFileIfNeeded({ sessionFile: filePath, warn });
}
loadMessages(): AgentMessage[] {
loadMessages(options?: { includeInternal?: boolean }): AgentMessage[] {
const entries = this.loadEntries();
let messages = entries
.filter((entry) => entry.type === "message")
.map((entry) => entry.message);
.filter((entry) => {
if (entry.type !== "message") return false;
if (!options?.includeInternal && entry.internal) return false;
return true;
})
.map((entry) => (entry as { type: "message"; message: AgentMessage }).message);
messages = sanitizeToolCallInputs(messages);
messages = sanitizeToolUseResultPairing(messages);
return messages;
@ -203,11 +207,16 @@ export class SessionManager {
);
}
saveMessage(message: AgentMessage) {
saveMessage(message: AgentMessage, options?: { internal?: boolean }) {
void this.enqueue(() =>
appendEntry(
this.sessionId,
{ type: "message", message, timestamp: Date.now() },
{
type: "message",
message,
timestamp: Date.now(),
...(options?.internal ? { internal: true } : {}),
},
{ baseDir: this.baseDir },
),
);

View file

@ -11,7 +11,7 @@ export type SessionMeta = {
};
export type SessionEntry =
| { type: "message"; message: AgentMessage; timestamp: number }
| { type: "message"; message: AgentMessage; timestamp: number; internal?: boolean }
| { type: "meta"; meta: SessionMeta; timestamp: number }
| {
type: "compaction";

View file

@ -0,0 +1,67 @@
import { describe, it, expect, vi, beforeEach } from "vitest";
const readEntriesMock = vi.fn();
vi.mock("../session/storage.js", () => ({
readEntries: (sessionId: string) => readEntriesMock(sessionId),
}));
import { readLatestAssistantReply } from "./announce.js";
describe("readLatestAssistantReply", () => {
beforeEach(() => {
readEntriesMock.mockReset();
});
it("returns the latest non-empty assistant text when the last assistant message is tool-only", () => {
readEntriesMock.mockReturnValue([
{
type: "message",
timestamp: 1,
message: {
role: "assistant",
content: [{ type: "text", text: "南京天气12°C。" }],
},
},
{
type: "message",
timestamp: 2,
message: {
role: "assistant",
content: [{ type: "toolCall", id: "tool-1", name: "weather", arguments: { city: "Nanjing" } }],
},
},
]);
const result = readLatestAssistantReply("child-session");
expect(result).toBe("南京天气12°C。");
});
it("falls back to latest toolResult text when no assistant text exists", () => {
readEntriesMock.mockReturnValue([
{
type: "message",
timestamp: 1,
message: {
role: "assistant",
content: [{ type: "toolCall", id: "tool-2", name: "weather", arguments: { city: "Nanjing" } }],
},
},
{
type: "message",
timestamp: 2,
message: {
role: "toolResult",
toolCallId: "tool-2",
toolName: "weather",
content: [{ type: "text", text: "{\"city\":\"Nanjing\",\"tempC\":12,\"condition\":\"Sunny\"}" }],
isError: false,
},
},
]);
const result = readLatestAssistantReply("child-session");
expect(result).toContain("\"city\":\"Nanjing\"");
expect(result).toContain("\"condition\":\"Sunny\"");
});
});

View file

@ -1,6 +1,7 @@
import { describe, it, expect } from "vitest";
import { buildSubagentSystemPrompt, formatAnnouncementMessage } from "./announce.js";
import { buildSubagentSystemPrompt, formatAnnouncementMessage, formatCoalescedAnnouncementMessage } from "./announce.js";
import type { FormatAnnouncementParams } from "./announce.js";
import type { SubagentRunRecord } from "./types.js";
describe("buildSubagentSystemPrompt", () => {
it("includes task and session context", () => {
@ -126,3 +127,128 @@ describe("formatAnnouncementMessage", () => {
expect(msg).toContain("NO_REPLY");
});
});
describe("formatCoalescedAnnouncementMessage", () => {
function makeRecord(overrides: Partial<SubagentRunRecord> = {}): SubagentRunRecord {
return {
runId: "run-1",
childSessionId: "child-1",
requesterSessionId: "parent-1",
task: "Default task",
cleanup: "delete",
createdAt: 1000000,
startedAt: 1000000,
endedAt: 1030000,
outcome: { status: "ok" },
findings: "Some findings",
findingsCaptured: true,
announced: false,
...overrides,
};
}
it("delegates to formatAnnouncementMessage for a single record", () => {
const record = makeRecord({ label: "Code Analysis" });
const coalesced = formatCoalescedAnnouncementMessage([record]);
const direct = formatAnnouncementMessage({
runId: record.runId,
childSessionId: record.childSessionId,
requesterSessionId: record.requesterSessionId,
task: record.task,
label: record.label,
cleanup: record.cleanup,
outcome: record.outcome,
startedAt: record.startedAt,
endedAt: record.endedAt,
findings: record.findings,
});
expect(coalesced).toBe(direct);
});
it("formats multiple records with all task findings and stats", () => {
const records = [
makeRecord({
runId: "run-1",
childSessionId: "child-1",
label: "Task A",
findings: "Found issue A",
startedAt: 1000000,
endedAt: 1030000,
}),
makeRecord({
runId: "run-2",
childSessionId: "child-2",
label: "Task B",
findings: "Found issue B",
startedAt: 1000000,
endedAt: 1045000, // 45 seconds
}),
];
const msg = formatCoalescedAnnouncementMessage(records);
expect(msg).toContain("All 2 background tasks have completed");
expect(msg).toContain('Task 1: "Task A"');
expect(msg).toContain("Found issue A");
expect(msg).toContain('Task 2: "Task B"');
expect(msg).toContain("Found issue B");
expect(msg).toContain("Total wall time: 45s");
expect(msg).toContain("2 succeeded, 0 failed");
});
it("reports mixed outcomes correctly", () => {
const records = [
makeRecord({ runId: "run-1", label: "OK Task", outcome: { status: "ok" } }),
makeRecord({ runId: "run-2", label: "Failed Task", outcome: { status: "error", error: "crash" } }),
makeRecord({ runId: "run-3", label: "Timeout Task", outcome: { status: "timeout" } }),
];
const msg = formatCoalescedAnnouncementMessage(records);
expect(msg).toContain("completed successfully");
expect(msg).toContain("failed: crash");
expect(msg).toContain("timed out");
expect(msg).toContain("1 succeeded, 2 failed");
});
it("shows (no output) for missing findings", () => {
const records = [
makeRecord({ runId: "run-1", findings: undefined }),
makeRecord({ runId: "run-2", findings: "Has output" }),
];
const msg = formatCoalescedAnnouncementMessage(records);
expect(msg).toContain("(no output)");
expect(msg).toContain("Has output");
});
it("includes combined summary instruction for multi-record", () => {
const records = [
makeRecord({ runId: "run-1" }),
makeRecord({ runId: "run-2" }),
];
const msg = formatCoalescedAnnouncementMessage(records);
expect(msg).toContain("MUST include findings from every task item above");
expect(msg).toContain("NO_REPLY");
});
it("includes raw findings for every task in coalesced payload", () => {
const records = [
makeRecord({ runId: "run-1", label: "南京天气", findings: "南京12°C" }),
makeRecord({ runId: "run-2", label: "上海天气", findings: "上海多云9°C" }),
];
const msg = formatCoalescedAnnouncementMessage(records);
expect(msg).toContain("Raw findings from each task (MUST cover all items):");
expect(msg).toContain("[1] 南京天气:");
expect(msg).toContain("南京12°C");
expect(msg).toContain("[2] 上海天气:");
expect(msg).toContain("上海多云9°C");
expect(msg).toContain("MUST include findings from every task item above");
});
});

View file

@ -13,6 +13,7 @@ import { buildSystemPrompt } from "../system-prompt/index.js";
import type {
SubagentAnnounceParams,
SubagentRunOutcome,
SubagentRunRecord,
SubagentSystemPromptParams,
} from "./types.js";
@ -38,19 +39,29 @@ export function buildSubagentSystemPrompt(params: SubagentSystemPromptParams): s
*/
export function readLatestAssistantReply(sessionId: string): string | undefined {
const entries = readEntries(sessionId);
let latestToolResultText: string | undefined;
// Walk backwards to find last assistant message
// Walk backwards to find the last non-empty assistant reply.
// If no assistant text exists (e.g. run ended after tool execution),
// fall back to the latest non-empty toolResult content.
for (let i = entries.length - 1; i >= 0; i--) {
const entry = entries[i]!;
if (entry.type !== "message") continue;
const message = entry.message;
if (message.role !== "assistant") continue;
if (message.role === "assistant") {
const text = extractAssistantText(message);
if (text) return text;
continue;
}
return extractAssistantText(message);
if (message.role === "toolResult" && !latestToolResultText) {
const text = extractToolResultText(message);
if (text) latestToolResultText = text;
}
}
return undefined;
return latestToolResultText;
}
/**
@ -58,7 +69,17 @@ export function readLatestAssistantReply(sessionId: string): string | undefined
* AgentMessage.content for assistant is (TextContent | ThinkingContent | ToolCall)[].
*/
function extractAssistantText(message: { role: string; content: unknown }): string {
const content = message.content;
return extractTextLikeContent(message.content);
}
/**
* Extract text content from a toolResult message.
*/
function extractToolResultText(message: { role: string; content: unknown }): string {
return extractTextLikeContent(message.content);
}
function extractTextLikeContent(content: unknown): string {
if (typeof content === "string") {
return sanitizeText(content);
}
@ -67,8 +88,9 @@ function extractAssistantText(message: { role: string; content: unknown }): stri
const textParts: string[] = [];
for (const block of content) {
if (block && typeof block === "object" && "type" in block && block.type === "text" && "text" in block) {
textParts.push(String(block.text));
if (!block || typeof block !== "object") continue;
if ("text" in block) {
textParts.push(String((block as { text: unknown }).text));
}
}
@ -167,11 +189,126 @@ export function formatAnnouncementMessage(params: FormatAnnouncementParams): str
return parts.join("\n");
}
/**
* Format a coalesced announcement message from multiple completed subagent runs.
* When only one record is provided, delegates to formatAnnouncementMessage.
*/
export function formatCoalescedAnnouncementMessage(
records: SubagentRunRecord[],
): string {
// Single record: delegate to existing format for backward-compatible behavior
if (records.length === 1) {
const r = records[0]!;
return formatAnnouncementMessage({
runId: r.runId,
childSessionId: r.childSessionId,
requesterSessionId: r.requesterSessionId,
task: r.task,
label: r.label,
cleanup: r.cleanup,
outcome: r.outcome,
startedAt: r.startedAt,
endedAt: r.endedAt,
findings: r.findings,
});
}
// Multiple records: build combined message.
// Include a strict raw-findings section so parent can reliably cover every task result.
const parts: string[] = [
`All ${records.length} background tasks have completed. Here are the combined results:`,
"",
];
for (let i = 0; i < records.length; i++) {
const r = records[i]!;
const displayName = r.label || r.task.slice(0, 60);
const statusLabel = formatStatusLabel(r.outcome);
const durationStr = (r.startedAt && r.endedAt)
? ` (${formatDuration(r.startedAt, r.endedAt)})`
: "";
parts.push(
`### Task ${i + 1}: "${displayName}"`,
`Status: ${statusLabel}${durationStr}`,
"",
"Findings:",
r.findings || "(no output)",
"",
);
}
// Overall stats
const allStartTimes = records.map(r => r.startedAt).filter(Boolean) as number[];
const allEndTimes = records.map(r => r.endedAt).filter(Boolean) as number[];
if (allStartTimes.length > 0 && allEndTimes.length > 0) {
const wallTime = formatDuration(Math.min(...allStartTimes), Math.max(...allEndTimes));
parts.push(`Total wall time: ${wallTime}`);
}
const okCount = records.filter(r => r.outcome?.status === "ok").length;
const failCount = records.length - okCount;
parts.push(`Results: ${okCount} succeeded, ${failCount} failed/timed out`);
parts.push("", "Raw findings from each task (MUST cover all items):", "");
for (let i = 0; i < records.length; i++) {
const r = records[i]!;
const displayName = r.label || r.task.slice(0, 60);
parts.push(
`[${i + 1}] ${displayName}:`,
r.findings || "(no output)",
"",
);
}
parts.push(
"",
"Summarize these results naturally for the user.",
"You MUST include findings from every task item above, without omission.",
"Keep it concise, but preserve concrete findings from each task.",
"Do not mention technical details like session IDs or that these were background tasks.",
"You can respond with NO_REPLY if no announcement is needed.",
);
return parts.join("\n");
}
/**
* Run the coalesced announcement flow for all completed runs of a requester.
* Formats a single combined message and delivers it to the parent agent.
*/
export function runCoalescedAnnounceFlow(
requesterSessionId: string,
records: SubagentRunRecord[],
): boolean {
const message = formatCoalescedAnnouncementMessage(records);
try {
const hub = getHub();
const parentAgent = hub.getAgent(requesterSessionId);
if (!parentAgent || parentAgent.closed) {
console.warn(
`[SubagentAnnounce] Parent agent not found or closed: ${requesterSessionId}`,
);
return false;
}
parentAgent.writeInternal(message, { forwardAssistant: true, persistResponse: true });
return true;
} catch (err) {
console.error(`[SubagentAnnounce] Failed to coalesced-announce to parent:`, err);
return false;
}
}
/**
* Run the full subagent announcement flow:
* 1. Read child's last assistant reply
* 2. Format announcement message
* 3. Send to parent agent via Hub
*
* @deprecated Use runCoalescedAnnounceFlow instead, which supports
* batching multiple completed runs into a single announcement.
*/
export function runSubagentAnnounceFlow(params: SubagentAnnounceParams): boolean {
const { requesterSessionId, childSessionId } = params;
@ -204,7 +341,7 @@ export function runSubagentAnnounceFlow(params: SubagentAnnounceParams): boolean
return false;
}
parentAgent.write(message);
parentAgent.writeInternal(message, { forwardAssistant: true, persistResponse: true });
return true;
} catch (err) {
console.error(`[SubagentAnnounce] Failed to announce to parent:`, err);

View file

@ -28,6 +28,8 @@ export {
readLatestAssistantReply,
formatAnnouncementMessage,
runSubagentAnnounceFlow,
formatCoalescedAnnouncementMessage,
runCoalescedAnnounceFlow,
} from "./announce.js";
export type { FormatAnnouncementParams } from "./announce.js";

View file

@ -0,0 +1,75 @@
import { beforeEach, describe, expect, it, vi } from "vitest";
import type { SubagentRunRecord } from "./types.js";
const loadSubagentRunsMock = vi.fn<() => Map<string, SubagentRunRecord>>();
const saveSubagentRunsMock = vi.fn();
const readLatestAssistantReplyMock = vi.fn();
const runCoalescedAnnounceFlowMock = vi.fn(() => false);
const resolveSessionDirMock = vi.fn((sessionId: string) => `/tmp/${sessionId}`);
const closeAgentMock = vi.fn();
const getHubMock = vi.fn(() => ({ closeAgent: closeAgentMock }));
const rmSyncMock = vi.fn();
vi.mock("./registry-store.js", () => ({
loadSubagentRuns: loadSubagentRunsMock,
saveSubagentRuns: saveSubagentRunsMock,
}));
vi.mock("./announce.js", () => ({
readLatestAssistantReply: readLatestAssistantReplyMock,
runCoalescedAnnounceFlow: runCoalescedAnnounceFlowMock,
}));
vi.mock("../session/storage.js", () => ({
resolveSessionDir: resolveSessionDirMock,
}));
vi.mock("../../hub/hub-singleton.js", () => ({
getHub: getHubMock,
isHubInitialized: vi.fn(() => false),
}));
vi.mock("node:fs", async (importOriginal) => {
const actual = await importOriginal<typeof import("node:fs")>();
return {
...actual,
rmSync: rmSyncMock,
};
});
describe("subagent registry recovery cleanup", () => {
beforeEach(() => {
vi.resetModules();
vi.clearAllMocks();
loadSubagentRunsMock.mockReturnValue(new Map());
runCoalescedAnnounceFlowMock.mockReturnValue(false);
});
it("deletes child session on recovery even when findings were already captured", async () => {
const now = Date.now();
const record: SubagentRunRecord = {
runId: "run-1",
childSessionId: "child-1",
requesterSessionId: "parent-1",
task: "task",
cleanup: "delete",
createdAt: now - 1000,
startedAt: now - 900,
endedAt: now - 100,
outcome: { status: "ok" },
findings: "done",
findingsCaptured: true,
cleanupHandled: false,
announced: false,
};
loadSubagentRunsMock.mockReturnValue(new Map([["run-1", record]]));
const registry = await import("./registry.js");
registry.initSubagentRegistry();
expect(readLatestAssistantReplyMock).not.toHaveBeenCalled();
expect(resolveSessionDirMock).toHaveBeenCalledWith("child-1");
expect(rmSyncMock).toHaveBeenCalledWith("/tmp/child-1", { recursive: true, force: true });
});
});

View file

@ -78,4 +78,50 @@ describe("registry-store serialization", () => {
expect(parsed.outcome?.status).toBe("error");
expect(parsed.outcome?.error).toBe("Something went wrong");
});
it("round-trips coalescing fields (findings, findingsCaptured, announced)", () => {
const record: SubagentRunRecord = {
runId: "run-coalesce",
childSessionId: "child-1",
requesterSessionId: "parent-1",
task: "Coalesce test",
cleanup: "delete",
createdAt: Date.now(),
endedAt: Date.now() + 5000,
outcome: { status: "ok" },
findings: "Found 3 issues in auth module.",
findingsCaptured: true,
announced: true,
};
const json = JSON.stringify({ version: 1, runs: { "run-coalesce": record } });
const parsed = JSON.parse(json);
const restored = parsed.runs["run-coalesce"] as SubagentRunRecord;
expect(restored.findings).toBe("Found 3 issues in auth module.");
expect(restored.findingsCaptured).toBe(true);
expect(restored.announced).toBe(true);
});
it("round-trips record with undefined coalescing fields", () => {
const record: SubagentRunRecord = {
runId: "run-old",
childSessionId: "child-1",
requesterSessionId: "parent-1",
task: "Old record",
cleanup: "delete",
createdAt: Date.now(),
cleanupHandled: true,
// No findings, findingsCaptured, or announced fields (old format)
};
const json = JSON.stringify({ version: 1, runs: { "run-old": record } });
const parsed = JSON.parse(json);
const restored = parsed.runs["run-old"] as SubagentRunRecord;
expect(restored.findings).toBeUndefined();
expect(restored.findingsCaptured).toBeUndefined();
expect(restored.announced).toBeUndefined();
expect(restored.cleanupHandled).toBe(true);
});
});

View file

@ -1,4 +1,4 @@
import { describe, it, expect, beforeEach } from "vitest";
import { describe, it, expect, beforeEach, vi } from "vitest";
import {
registerSubagentRun,
listSubagentRuns,
@ -159,3 +159,118 @@ describe("subagent registry", () => {
expect(getSubagentRun("run-1")).toBeUndefined();
});
});
describe("subagent registry — coalescing", () => {
// Without Hub, watchChildAgent ends runs immediately with "Hub not initialized".
// This allows us to test the coalescing state transitions.
it("captures findings when a run completes (no Hub)", () => {
registerSubagentRun({
runId: "run-1",
childSessionId: "child-1",
requesterSessionId: "parent-1",
task: "Task 1",
});
const record = getSubagentRun("run-1");
// Run ended immediately due to no Hub
expect(record?.endedAt).toBeGreaterThan(0);
expect(record?.findingsCaptured).toBe(true);
});
it("does not announce while sibling runs are still pending", () => {
// Register first run — ends immediately (no Hub)
registerSubagentRun({
runId: "run-1",
childSessionId: "child-1",
requesterSessionId: "parent-1",
task: "Task 1",
});
const record1 = getSubagentRun("run-1");
expect(record1?.findingsCaptured).toBe(true);
// Register second run — also ends immediately
registerSubagentRun({
runId: "run-2",
childSessionId: "child-2",
requesterSessionId: "parent-1",
task: "Task 2",
});
const record2 = getSubagentRun("run-2");
expect(record2?.findingsCaptured).toBe(true);
// Both ended, but announce fails because no Hub for parent agent.
// The key check: both records should have findings captured.
// announced will be false because runCoalescedAnnounceFlow fails (no Hub).
expect(record1?.announced).toBeUndefined();
expect(record2?.announced).toBeUndefined();
});
it("single run captures findings immediately", () => {
registerSubagentRun({
runId: "run-solo",
childSessionId: "child-solo",
requesterSessionId: "parent-solo",
task: "Solo task",
});
const record = getSubagentRun("run-solo");
expect(record?.endedAt).toBeGreaterThan(0);
expect(record?.findingsCaptured).toBe(true);
expect(record?.outcome?.status).toBe("error");
expect(record?.outcome?.error).toContain("Hub not initialized");
});
it("shutdownSubagentRegistry captures findings for ended-but-uncaptured runs", () => {
registerSubagentRun({
runId: "run-1",
childSessionId: "child-1",
requesterSessionId: "parent-1",
task: "Task",
});
const record = getSubagentRun("run-1");
if (record) {
// Simulate: run ended but findings not yet captured
record.endedAt = Date.now();
record.outcome = { status: "ok" };
record.findingsCaptured = undefined;
}
shutdownSubagentRegistry();
expect(record?.findingsCaptured).toBe(true);
});
});
describe("subagent registry — post-announce cleanup", () => {
it("removes runs from registry after successful announcement", async () => {
// Mock runCoalescedAnnounceFlow to succeed
const announceModule = await import("./announce.js");
const spy = vi.spyOn(announceModule, "runCoalescedAnnounceFlow").mockReturnValue(true);
// Register two runs for the same parent — both end immediately (no Hub)
registerSubagentRun({
runId: "run-a",
childSessionId: "child-a",
requesterSessionId: "parent-1",
task: "Task A",
});
registerSubagentRun({
runId: "run-b",
childSessionId: "child-b",
requesterSessionId: "parent-1",
task: "Task B",
});
// Both runs should have been announced and removed from registry
expect(spy).toHaveBeenCalled();
expect(getSubagentRun("run-a")).toBeUndefined();
expect(getSubagentRun("run-b")).toBeUndefined();
expect(listSubagentRuns("parent-1")).toHaveLength(0);
spy.mockRestore();
});
});

View file

@ -7,7 +7,7 @@
import { getHub, isHubInitialized } from "../../hub/hub-singleton.js";
import { loadSubagentRuns, saveSubagentRuns } from "./registry-store.js";
import { runSubagentAnnounceFlow } from "./announce.js";
import { readLatestAssistantReply, runCoalescedAnnounceFlow } from "./announce.js";
import type {
RegisterSubagentRunParams,
SubagentRunRecord,
@ -27,7 +27,7 @@ const SWEEP_INTERVAL_MS = 60 * 1000;
const subagentRuns = new Map<string, SubagentRunRecord>();
let sweepTimer: ReturnType<typeof setInterval> | undefined;
const resumedRuns = new Set<string>();
const resumedRequesters = new Set<string>();
// ============================================================================
// Public API
@ -39,25 +39,45 @@ export function initSubagentRegistry(): void {
for (const [runId, record] of persisted) {
subagentRuns.set(runId, record);
// Resume incomplete runs
if (!record.cleanupHandled) {
if (record.endedAt) {
// Completed but cleanup not done — run announce flow
if (!resumedRuns.has(runId)) {
resumedRuns.add(runId);
handleRunCompletion(record);
}
} else {
// If not ended, the child agent session is lost on restart —
// mark as ended with unknown outcome
record.endedAt = Date.now();
record.outcome = { status: "unknown" };
persist();
if (!resumedRuns.has(runId)) {
resumedRuns.add(runId);
handleRunCompletion(record);
}
}
// Backward compat: old records with cleanupHandled but no announced field
if (record.cleanupHandled && record.announced === undefined) {
record.announced = true;
record.findingsCaptured = true;
}
}
// Process incomplete runs
const affectedRequesters = new Set<string>();
for (const record of subagentRuns.values()) {
if (record.announced && record.cleanupHandled) continue; // Already fully done
if (!record.endedAt) {
// Child was running when process crashed — mark as ended/unknown
record.endedAt = Date.now();
record.outcome = { status: "unknown" };
}
if (!record.findingsCaptured) {
captureFindings(record);
}
// Recovery cleanup must be independent from findings capture:
// the process may crash after captureFindings() persisted but before deletion.
if (record.cleanup === "delete" && !record.cleanupHandled) {
deleteChildSession(record.childSessionId);
}
affectedRequesters.add(record.requesterSessionId);
}
persist();
// For each affected requester, check if coalesced announcement is needed
for (const requesterId of affectedRequesters) {
if (!resumedRequesters.has(requesterId)) {
resumedRequesters.add(requesterId);
checkAndAnnounce(requesterId);
}
}
@ -138,11 +158,17 @@ export function shutdownSubagentRegistry(): void {
record.outcome = { status: "unknown" };
updated++;
}
// Opportunistically capture findings for ended-but-uncaptured runs
if (record.endedAt && !record.findingsCaptured) {
captureFindings(record);
updated++;
}
}
if (updated > 0) {
persist();
console.log(`[SubagentRegistry] Marked ${updated} active run(s) as ended during shutdown`);
console.log(`[SubagentRegistry] Processed ${updated} run(s) during shutdown`);
}
stopSweeper();
@ -151,7 +177,7 @@ export function shutdownSubagentRegistry(): void {
/** Reset all state (for testing). */
export function resetSubagentRegistryForTests(): void {
subagentRuns.clear();
resumedRuns.clear();
resumedRequesters.clear();
stopSweeper();
}
@ -222,44 +248,76 @@ function watchChildAgent(record: SubagentRunRecord, timeoutSeconds?: number): vo
}
// ============================================================================
// Cleanup + Announce
// Cleanup + Announce (two-phase: capture findings, then coalesced announce)
// ============================================================================
function handleRunCompletion(record: SubagentRunRecord): void {
if (record.cleanupHandled) return;
record.cleanupHandled = true;
/** Phase 1: Capture child's findings before session deletion. */
function captureFindings(record: SubagentRunRecord): void {
try {
const findings = readLatestAssistantReply(record.childSessionId);
record.findings = findings ?? undefined;
} catch {
record.findings = "(failed to read findings)";
}
record.findingsCaptured = true;
persist();
}
// Run announce flow
const announced = runSubagentAnnounceFlow({
runId: record.runId,
childSessionId: record.childSessionId,
requesterSessionId: record.requesterSessionId,
task: record.task,
label: record.label,
cleanup: record.cleanup,
outcome: record.outcome,
startedAt: record.startedAt,
endedAt: record.endedAt,
});
/**
* Phase 2: Check if all unannounced runs for this requester have completed.
* If so, send a single coalesced announcement to the parent.
*/
function checkAndAnnounce(requesterSessionId: string): void {
const allRuns = listSubagentRuns(requesterSessionId);
if (!announced) {
console.warn(`[SubagentRegistry] Announce flow failed for run ${record.runId}`);
// Allow retry on next restart if announce failed.
record.cleanupHandled = false;
// Only consider unannounced runs
const pending = allRuns.filter(r => !r.announced);
if (pending.length === 0) return;
// Are all unannounced runs done?
const allDone = pending.every(r => r.endedAt !== undefined);
if (!allDone) return;
// Have all had findings captured?
const allCaptured = pending.every(r => r.findingsCaptured);
if (!allCaptured) return;
// All done — send coalesced announcement
const announced = runCoalescedAnnounceFlow(requesterSessionId, pending);
if (announced) {
for (const r of pending) {
r.announced = true;
r.cleanupHandled = true;
// Remove from registry immediately — findings already delivered to parent
subagentRuns.delete(r.runId);
}
persist();
return;
if (subagentRuns.size === 0) {
stopSweeper();
}
} else {
console.warn(
`[SubagentRegistry] Coalesced announce failed for requester ${requesterSessionId}`,
);
// Leave announced=false so initSubagentRegistry() can retry on restart
}
}
/** Entry point: called when a child completes. */
function handleRunCompletion(record: SubagentRunRecord): void {
// Phase 1: capture findings (before session deletion)
if (!record.findingsCaptured) {
captureFindings(record);
// Session cleanup (safe now that findings are persisted)
if (record.cleanup === "delete") {
deleteChildSession(record.childSessionId);
}
}
// Handle session cleanup
if (record.cleanup === "delete") {
deleteChildSession(record.childSessionId);
}
// Schedule archive
record.archiveAtMs = Date.now() + DEFAULT_ARCHIVE_AFTER_MS;
record.cleanupCompletedAt = Date.now();
persist();
// Phase 2: coalesced announce check
checkAndAnnounce(record.requesterSessionId);
}
function deleteChildSession(sessionId: string): void {
@ -305,7 +363,6 @@ function sweep(): void {
for (const [runId, record] of subagentRuns) {
if (record.archiveAtMs !== undefined && record.archiveAtMs <= now) {
subagentRuns.delete(runId);
resumedRuns.delete(runId);
removed++;
}
}

View file

@ -39,6 +39,12 @@ export type SubagentRunRecord = {
cleanupHandled?: boolean | undefined;
/** Timestamp when cleanup completed */
cleanupCompletedAt?: number | undefined;
/** Captured findings from the child session's last assistant reply */
findings?: string | undefined;
/** Whether findings have been captured (safe to delete session after this) */
findingsCaptured?: boolean | undefined;
/** Whether the coalesced announcement has been sent to parent */
announced?: boolean | undefined;
};
/** Parameters for registering a new subagent run */

View file

@ -7,6 +7,7 @@ import { createProcessTool } from "./tools/process.js";
import { createGlobTool } from "./tools/glob.js";
import { createWebFetchTool, createWebSearchTool } from "./tools/web/index.js";
import { createSessionsSpawnTool } from "./tools/sessions-spawn.js";
import { createSessionsListTool } from "./tools/sessions-list.js";
import { createMemorySearchTool } from "./tools/memory-search.js";
import { createCronTool } from "./tools/cron/index.js";
import { filterTools } from "./tools/policy.js";
@ -133,6 +134,10 @@ export function createAllTools(options: CreateToolsOptions | string): AgentTool<
});
tools.push(sessionsSpawnTool as AgentTool<any>);
// Add sessions_list tool
const sessionsListTool = createSessionsListTool({ sessionId });
tools.push(sessionsListTool as AgentTool<any>);
return tools;
}

View file

@ -164,9 +164,22 @@ export function createExecTool(
// Don't reject, let close event handle
});
// Signal handling: don't kill if already backgrounded
const onAbort = signal ? () => {
if (yielded) return; // Already backgrounded, ignore abort
if (timeout) clearTimeout(timeout);
if (yieldTimer) clearTimeout(yieldTimer);
child.kill("SIGTERM");
} : undefined;
if (signal && onAbort) {
signal.addEventListener("abort", onAbort, { once: true });
}
child.on("close", (code) => {
if (timeout) clearTimeout(timeout);
if (yieldTimer) clearTimeout(yieldTimer);
if (signal && onAbort) signal.removeEventListener("abort", onAbort);
// If already backgrounded, don't resolve again
if (yielded) return;
@ -202,16 +215,6 @@ export function createExecTool(
},
});
});
// Signal handling: don't kill if already backgrounded
if (signal) {
signal.addEventListener("abort", () => {
if (yielded) return; // Already backgrounded, ignore abort
if (timeout) clearTimeout(timeout);
if (yieldTimer) clearTimeout(yieldTimer);
child.kill("SIGTERM");
});
}
});
},
};

View file

@ -34,7 +34,7 @@ export const TOOL_GROUPS: Record<string, string[]> = {
"group:memory": ["memory_search"],
// Subagent tools
"group:subagent": ["sessions_spawn"],
"group:subagent": ["sessions_spawn", "sessions_list"],
// Cron/scheduling tools
"group:cron": ["cron"],

View file

@ -8,6 +8,7 @@ export { createProcessTool } from "./process.js";
export { createGlobTool } from "./glob.js";
export { createWebFetchTool, createWebSearchTool } from "./web/index.js";
export { createCronTool } from "./cron/index.js";
export { createSessionsListTool } from "./sessions-list.js";
// Tool groups
export {

View file

@ -112,7 +112,7 @@ export function createProcessTool(defaultCwd?: string): AgentTool<typeof Process
if (signal) {
signal.addEventListener("abort", () => {
child.kill("SIGTERM");
});
}, { once: true });
}
resolve({ success: true });

View file

@ -0,0 +1,169 @@
import { describe, it, expect, vi, beforeEach } from "vitest";
import type { SubagentRunRecord } from "../subagent/types.js";
// Mock the registry module before importing the tool
vi.mock("../subagent/registry.js", () => ({
listSubagentRuns: vi.fn(),
getSubagentRun: vi.fn(),
}));
import { createSessionsListTool } from "./sessions-list.js";
import { listSubagentRuns, getSubagentRun } from "../subagent/registry.js";
const mockListSubagentRuns = vi.mocked(listSubagentRuns);
const mockGetSubagentRun = vi.mocked(getSubagentRun);
function makeRecord(overrides: Partial<SubagentRunRecord> = {}): SubagentRunRecord {
return {
runId: "run-001",
childSessionId: "child-001",
requesterSessionId: "parent-001",
task: "Test task",
cleanup: "delete",
createdAt: 1700000000000,
...overrides,
};
}
describe("sessions_list tool", () => {
beforeEach(() => {
vi.clearAllMocks();
});
it("returns empty message when no runs exist", async () => {
mockListSubagentRuns.mockReturnValue([]);
const tool = createSessionsListTool({ sessionId: "parent-001" });
const result = await tool.execute("call-1", {});
expect(result.content[0]).toEqual({
type: "text",
text: "No subagent runs for this session.",
});
expect(result.details).toEqual({ runs: [] });
});
it("lists multiple runs with correct status mapping", async () => {
const now = Date.now();
const runs: SubagentRunRecord[] = [
makeRecord({
runId: "run-aaa",
label: "Code Review",
startedAt: now - 45000,
}),
makeRecord({
runId: "run-bbb",
label: "Test Analysis",
startedAt: now - 60000,
endedAt: now - 30000,
outcome: { status: "ok" },
}),
makeRecord({
runId: "run-ccc",
label: "Lint Check",
startedAt: now - 60000,
endedAt: now,
outcome: { status: "error", error: "timeout" },
}),
];
mockListSubagentRuns.mockReturnValue(runs);
const tool = createSessionsListTool({ sessionId: "parent-001" });
const result = await tool.execute("call-1", {});
const text = result.content[0]!;
expect(text.type).toBe("text");
expect((text as { text: string }).text).toContain("3 total");
expect((text as { text: string }).text).toContain("[running]");
expect((text as { text: string }).text).toContain("[ok]");
expect((text as { text: string }).text).toContain("[error]");
expect((text as { text: string }).text).toContain("Code Review");
expect((text as { text: string }).text).toContain("Test Analysis");
expect((text as { text: string }).text).toContain("Lint Check");
expect(result.details!.runs).toHaveLength(3);
expect(result.details!.runs[0]!.status).toBe("running");
expect(result.details!.runs[1]!.status).toBe("ok");
expect(result.details!.runs[2]!.status).toBe("error");
});
it("returns detail for a specific runId", async () => {
const now = Date.now();
const record = makeRecord({
runId: "run-detail",
label: "Deep Analysis",
task: "Analyze the authentication module thoroughly",
startedAt: now - 90000,
endedAt: now - 10000,
outcome: { status: "ok" },
findings: "Found 2 potential issues in token validation.",
findingsCaptured: true,
});
mockGetSubagentRun.mockReturnValue(record);
const tool = createSessionsListTool({ sessionId: "parent-001" });
const result = await tool.execute("call-1", { runId: "run-detail" });
const text = (result.content[0] as { text: string }).text;
expect(text).toContain("Run: run-detail");
expect(text).toContain("Label: Deep Analysis");
expect(text).toContain("Status: ok");
expect(text).toContain("Found 2 potential issues");
expect(text).toContain("Duration:");
expect(result.details!.runs).toHaveLength(1);
expect(result.details!.runs[0]!.runId).toBe("run-detail");
});
it("returns not found for unknown runId", async () => {
mockGetSubagentRun.mockReturnValue(undefined);
const tool = createSessionsListTool({ sessionId: "parent-001" });
const result = await tool.execute("call-1", { runId: "nonexistent" });
const text = (result.content[0] as { text: string }).text;
expect(text).toContain("Run not found");
expect(result.details).toEqual({ runs: [] });
});
it("rejects runId belonging to a different requester", async () => {
const record = makeRecord({
runId: "run-other",
requesterSessionId: "other-parent",
});
mockGetSubagentRun.mockReturnValue(record);
const tool = createSessionsListTool({ sessionId: "parent-001" });
const result = await tool.execute("call-1", { runId: "run-other" });
const text = (result.content[0] as { text: string }).text;
expect(text).toContain("Run not found");
expect(result.details).toEqual({ runs: [] });
});
it("handles missing sessionId gracefully", async () => {
const tool = createSessionsListTool({});
const result = await tool.execute("call-1", {});
const text = (result.content[0] as { text: string }).text;
expect(text).toContain("No session ID available");
expect(result.details).toEqual({ runs: [] });
});
it("shows findings status for running task", async () => {
const now = Date.now();
const record = makeRecord({
runId: "run-running",
label: "Still Running",
startedAt: now - 30000,
// no endedAt
});
mockGetSubagentRun.mockReturnValue(record);
const tool = createSessionsListTool({ sessionId: "parent-001" });
const result = await tool.execute("call-1", { runId: "run-running" });
const text = (result.content[0] as { text: string }).text;
expect(text).toContain("Status: running");
expect(text).toContain("Findings: (still running)");
});
});

View file

@ -0,0 +1,187 @@
/**
* sessions_list tool allows an agent to view its spawned subagent runs.
*
* Lists all subagent runs for the current session, or shows details for a
* specific run when a runId is provided.
*/
import { Type } from "@sinclair/typebox";
import type { AgentTool } from "@mariozechner/pi-agent-core";
import { listSubagentRuns, getSubagentRun } from "../subagent/registry.js";
import type { SubagentRunRecord } from "../subagent/types.js";
const SessionsListSchema = Type.Object({
runId: Type.Optional(
Type.String({ description: "Optional run ID to get details for a specific run. If omitted, lists all runs." }),
),
});
type SessionsListArgs = {
runId?: string;
};
export type SessionsListResult = {
runs: Array<{
runId: string;
label?: string;
task: string;
status: "running" | "ok" | "error" | "timeout" | "unknown";
startedAt?: number;
endedAt?: number;
findings?: string;
}>;
};
export interface CreateSessionsListToolOptions {
/** Session ID of the current (requester) agent */
sessionId?: string;
}
function resolveStatus(record: SubagentRunRecord): "running" | "ok" | "error" | "timeout" | "unknown" {
if (!record.endedAt) return "running";
return record.outcome?.status ?? "unknown";
}
function formatElapsed(ms: number): string {
const totalSeconds = Math.round(ms / 1000);
if (totalSeconds < 60) return `${totalSeconds}s`;
const minutes = Math.floor(totalSeconds / 60);
const seconds = totalSeconds % 60;
if (minutes < 60) return seconds > 0 ? `${minutes}m${seconds}s` : `${minutes}m`;
const hours = Math.floor(minutes / 60);
const remainingMinutes = minutes % 60;
return remainingMinutes > 0 ? `${hours}h${remainingMinutes}m` : `${hours}h`;
}
function formatRunSummary(record: SubagentRunRecord, index: number, now: number): string {
const status = resolveStatus(record);
const displayName = record.label || record.task.slice(0, 60);
const statusTag = `[${status}]`.padEnd(10);
let timing = "";
if (status === "running" && record.startedAt) {
timing = `started ${formatElapsed(now - record.startedAt)} ago`;
} else if (record.startedAt && record.endedAt) {
timing = `completed in ${formatElapsed(record.endedAt - record.startedAt)}`;
}
const parts = [`#${index + 1} ${statusTag} "${displayName}"`];
if (timing) parts.push(`(${record.runId.slice(0, 8)}…, ${timing})`);
else parts.push(`(${record.runId.slice(0, 8)}…)`);
return parts.join(" ");
}
function formatRunDetail(record: SubagentRunRecord, now: number): string {
const status = resolveStatus(record);
const lines: string[] = [
`Run: ${record.runId}`,
];
if (record.label) lines.push(`Label: ${record.label}`);
lines.push(`Task: ${record.task}`);
lines.push(`Status: ${status}${record.outcome?.error ? `${record.outcome.error}` : ""}`);
lines.push(`Child Session: ${record.childSessionId}`);
lines.push(`Created: ${new Date(record.createdAt).toISOString()} (${formatElapsed(now - record.createdAt)} ago)`);
if (record.startedAt) {
lines.push(`Started: ${new Date(record.startedAt).toISOString()} (${formatElapsed(now - record.startedAt)} ago)`);
}
if (record.endedAt) {
lines.push(`Ended: ${new Date(record.endedAt).toISOString()}`);
if (record.startedAt) {
lines.push(`Duration: ${formatElapsed(record.endedAt - record.startedAt)}`);
}
}
if (record.findingsCaptured) {
lines.push(`Findings: ${record.findings || "(no output)"}`);
} else if (record.endedAt) {
lines.push("Findings: (not yet captured)");
} else {
lines.push("Findings: (still running)");
}
if (record.announced) lines.push("Announced: yes");
return lines.join("\n");
}
function toResultRun(record: SubagentRunRecord) {
return {
runId: record.runId,
label: record.label,
task: record.task,
status: resolveStatus(record),
startedAt: record.startedAt,
endedAt: record.endedAt,
findings: record.findings,
};
}
export function createSessionsListTool(
options: CreateSessionsListToolOptions,
): AgentTool<typeof SessionsListSchema, SessionsListResult> {
return {
name: "sessions_list",
label: "List Subagent Runs",
description:
"List all subagent runs spawned by this session and their current status. " +
"Optionally pass a runId to get detailed information about a specific run.",
parameters: SessionsListSchema,
execute: async (_toolCallId, args) => {
const { runId } = args as SessionsListArgs;
const requesterSessionId = options.sessionId;
if (!requesterSessionId) {
return {
content: [{ type: "text", text: "No session ID available. Cannot list subagent runs." }],
details: { runs: [] },
};
}
const now = Date.now();
// Detail mode: specific run
if (runId) {
const record = getSubagentRun(runId);
if (!record) {
return {
content: [{ type: "text", text: `Run not found: ${runId}` }],
details: { runs: [] },
};
}
if (record.requesterSessionId !== requesterSessionId) {
return {
content: [{ type: "text", text: `Run not found: ${runId}` }],
details: { runs: [] },
};
}
return {
content: [{ type: "text", text: formatRunDetail(record, now) }],
details: { runs: [toResultRun(record)] },
};
}
// List mode: all runs for this session
const runs = listSubagentRuns(requesterSessionId);
if (runs.length === 0) {
return {
content: [{ type: "text", text: "No subagent runs for this session." }],
details: { runs: [] },
};
}
const lines = [`Subagent runs for this session: ${runs.length} total`, ""];
for (let i = 0; i < runs.length; i++) {
lines.push(formatRunSummary(runs[i]!, i, now));
}
return {
content: [{ type: "text", text: lines.join("\n") }],
details: { runs: runs.map(toResultRun) },
};
},
};
}