diff --git a/packages/core/src/agent/subagent/announce.test.ts b/packages/core/src/agent/subagent/announce.test.ts index f532dd5c..efba367b 100644 --- a/packages/core/src/agent/subagent/announce.test.ts +++ b/packages/core/src/agent/subagent/announce.test.ts @@ -188,7 +188,7 @@ describe("formatCoalescedAnnouncementMessage", () => { const msg = formatCoalescedAnnouncementMessage(records); - expect(msg).toContain("All 2 background tasks have completed"); + expect(msg).toContain("All 2 background task(s) have completed"); expect(msg).toContain('Task 1: "Task A"'); expect(msg).toContain("Found issue A"); expect(msg).toContain('Task 2: "Task B"'); @@ -251,4 +251,44 @@ describe("formatCoalescedAnnouncementMessage", () => { expect(msg).toContain("上海:多云,9°C"); expect(msg).toContain("MUST include findings from every task item above"); }); + + it("includes continuation prompt when next is provided", () => { + const records = [ + makeRecord({ runId: "run-1", label: "AAPL data", findings: "AAPL revenue: $100B" }), + makeRecord({ runId: "run-2", label: "MSFT data", findings: "MSFT revenue: $200B" }), + ]; + + const msg = formatCoalescedAnnouncementMessage(records, "Summarize all data and write a PDF investment report"); + + expect(msg).toContain("CONTINUATION TASK"); + expect(msg).toContain("Summarize all data and write a PDF investment report"); + expect(msg).toContain("AAPL revenue: $100B"); + expect(msg).toContain("MSFT revenue: $200B"); + // Should NOT contain the default summarize instruction + expect(msg).not.toContain("Summarize these results naturally for the user"); + }); + + it("uses continuation prompt even for single record when next is provided", () => { + const records = [ + makeRecord({ runId: "run-1", label: "Data collection", findings: "All data collected" }), + ]; + + const msg = formatCoalescedAnnouncementMessage(records, "Generate the final report"); + + expect(msg).toContain("CONTINUATION TASK"); + expect(msg).toContain("Generate the final report"); + expect(msg).toContain("All data collected"); + }); + + it("uses default summarize instruction when next is not provided", () => { + const records = [ + makeRecord({ runId: "run-1" }), + makeRecord({ runId: "run-2" }), + ]; + + const msg = formatCoalescedAnnouncementMessage(records); + + expect(msg).not.toContain("CONTINUATION TASK"); + expect(msg).toContain("Summarize these results naturally for the user"); + }); }); diff --git a/packages/core/src/agent/subagent/announce.ts b/packages/core/src/agent/subagent/announce.ts index c1cf3330..de6091f3 100644 --- a/packages/core/src/agent/subagent/announce.ts +++ b/packages/core/src/agent/subagent/announce.ts @@ -193,12 +193,17 @@ export function formatAnnouncementMessage(params: FormatAnnouncementParams): str /** * Format a coalesced announcement message from multiple completed subagent runs. * When only one record is provided, delegates to formatAnnouncementMessage. + * + * @param next — Optional continuation prompt from a SubagentGroup. When present, + * the parent agent is instructed to execute the continuation using the combined + * findings, rather than just summarizing. */ export function formatCoalescedAnnouncementMessage( records: SubagentRunRecord[], + next?: string, ): string { - // Single record: delegate to existing format for backward-compatible behavior - if (records.length === 1) { + // Single record without continuation: delegate to existing format + if (records.length === 1 && !next) { const r = records[0]!; return formatAnnouncementMessage({ runId: r.runId, @@ -214,10 +219,9 @@ export function formatCoalescedAnnouncementMessage( }); } - // Multiple records: build combined message. - // Include a strict raw-findings section so parent can reliably cover every task result. + // Multiple records (or single with continuation): build combined message. const parts: string[] = [ - `All ${records.length} background tasks have completed. Here are the combined results:`, + `All ${records.length} background task(s) have completed. Here are the combined results:`, "", ]; @@ -262,14 +266,30 @@ export function formatCoalescedAnnouncementMessage( ); } - parts.push( - "", - "Summarize these results naturally for the user.", - "You MUST include findings from every task item above, without omission.", - "Keep it concise, but preserve concrete findings from each task.", - "Do not mention technical details like session IDs or that these were background tasks.", - "You can respond with NO_REPLY if no announcement is needed.", - ); + // Continuation vs. summarization + if (next) { + parts.push( + "", + "---", + "", + "CONTINUATION TASK: The user's original request requires further work using the findings above.", + "Execute the following task now, using ALL the collected data:", + "", + next, + "", + "Use the raw findings above as your data source. Call tools as needed to complete this task.", + "Do not mention technical details like session IDs or that these were background tasks.", + ); + } else { + parts.push( + "", + "Summarize these results naturally for the user.", + "You MUST include findings from every task item above, without omission.", + "Keep it concise, but preserve concrete findings from each task.", + "Do not mention technical details like session IDs or that these were background tasks.", + "You can respond with NO_REPLY if no announcement is needed.", + ); + } return parts.join("\n"); } @@ -289,8 +309,9 @@ export function formatCoalescedAnnouncementMessage( export function runCoalescedAnnounceFlow( requesterSessionId: string, records: SubagentRunRecord[], + next?: string, ): boolean { - const message = formatCoalescedAnnouncementMessage(records); + const message = formatCoalescedAnnouncementMessage(records, next); try { const hub = getHub(); diff --git a/packages/core/src/agent/subagent/registry-recovery.test.ts b/packages/core/src/agent/subagent/registry-recovery.test.ts index 046db13f..da9bc14f 100644 --- a/packages/core/src/agent/subagent/registry-recovery.test.ts +++ b/packages/core/src/agent/subagent/registry-recovery.test.ts @@ -12,6 +12,7 @@ const rmSyncMock = vi.fn(); vi.mock("./registry-store.js", () => ({ loadSubagentRuns: loadSubagentRunsMock, + loadSubagentGroups: vi.fn(() => new Map()), saveSubagentRuns: saveSubagentRunsMock, })); diff --git a/packages/core/src/agent/subagent/registry-store.ts b/packages/core/src/agent/subagent/registry-store.ts index f33edd38..17fc8c8f 100644 --- a/packages/core/src/agent/subagent/registry-store.ts +++ b/packages/core/src/agent/subagent/registry-store.ts @@ -7,7 +7,7 @@ import { existsSync, mkdirSync, readFileSync, writeFileSync } from "node:fs"; import { join } from "node:path"; import { DATA_DIR } from "@multica/utils"; -import type { SubagentRunRecord } from "./types.js"; +import type { SubagentRunRecord, SubagentGroup } from "./types.js"; const SUBAGENTS_DIR = join(DATA_DIR, "subagents"); const RUNS_FILE = join(SUBAGENTS_DIR, "runs.json"); @@ -15,6 +15,7 @@ const RUNS_FILE = join(SUBAGENTS_DIR, "runs.json"); interface SubagentRunsStore { version: 1; runs: Record; + groups?: Record | undefined; } function ensureDir(): void { @@ -48,13 +49,31 @@ export function loadSubagentRuns(): Map { } } -/** Save all subagent runs to disk */ -export function saveSubagentRuns(runs: Map): void { +/** Load all persisted subagent groups */ +export function loadSubagentGroups(): Map { + if (!existsSync(RUNS_FILE)) return new Map(); + + try { + const content = readFileSync(RUNS_FILE, "utf-8"); + const store = JSON.parse(content) as SubagentRunsStore; + if (store.version !== 1 || !store.groups) return new Map(); + return new Map(Object.entries(store.groups)); + } catch { + return new Map(); + } +} + +/** Save all subagent runs and groups to disk */ +export function saveSubagentRuns( + runs: Map, + groups?: Map, +): void { ensureDir(); const store: SubagentRunsStore = { version: 1, runs: Object.fromEntries(runs), + groups: groups && groups.size > 0 ? Object.fromEntries(groups) : undefined, }; writeFileSync(RUNS_FILE, JSON.stringify(store, null, 2), "utf-8"); diff --git a/packages/core/src/agent/subagent/registry.ts b/packages/core/src/agent/subagent/registry.ts index c19a5529..8a311561 100644 --- a/packages/core/src/agent/subagent/registry.ts +++ b/packages/core/src/agent/subagent/registry.ts @@ -6,11 +6,12 @@ */ import { getHub, isHubInitialized } from "../../hub/hub-singleton.js"; -import { loadSubagentRuns, saveSubagentRuns } from "./registry-store.js"; +import { loadSubagentRuns, saveSubagentRuns, loadSubagentGroups } from "./registry-store.js"; import { readLatestAssistantReply, runCoalescedAnnounceFlow } from "./announce.js"; import type { RegisterSubagentRunParams, SubagentRunRecord, + SubagentGroup, } from "./types.js"; import { resolveSessionDir } from "../session/storage.js"; import { rmSync } from "node:fs"; @@ -28,6 +29,7 @@ const SWEEP_INTERVAL_MS = 60 * 1000; // ============================================================================ const subagentRuns = new Map(); +const subagentGroups = new Map(); let sweepTimer: ReturnType | undefined; const resumedRequesters = new Set(); @@ -50,6 +52,12 @@ export function initSubagentRegistry(): void { } } + // Restore groups + const persistedGroups = loadSubagentGroups(); + for (const [groupId, group] of persistedGroups) { + subagentGroups.set(groupId, group); + } + // Process incomplete runs const affectedRequesters = new Set(); @@ -91,6 +99,45 @@ export function initSubagentRegistry(): void { } } +// ============================================================================ +// Group management +// ============================================================================ + +/** Create a new subagent group. Returns the group record. */ +export function createSubagentGroup(params: { + groupId: string; + requesterSessionId: string; + label?: string; + next?: string; +}): SubagentGroup { + const group: SubagentGroup = { + groupId: params.groupId, + requesterSessionId: params.requesterSessionId, + label: params.label, + next: params.next, + createdAt: Date.now(), + }; + subagentGroups.set(params.groupId, group); + persist(); + return group; +} + +/** Get a group by ID. */ +export function getSubagentGroup(groupId: string): SubagentGroup | undefined { + return subagentGroups.get(groupId); +} + +/** List all runs belonging to a group. */ +export function listGroupRuns(groupId: string): SubagentRunRecord[] { + const result: SubagentRunRecord[] = []; + for (const record of subagentRuns.values()) { + if (record.groupId === groupId) { + result.push(record); + } + } + return result; +} + /** Register a new subagent run and start tracking its lifecycle. */ export function registerSubagentRun(params: RegisterSubagentRunParams): SubagentRunRecord { const { @@ -102,6 +149,7 @@ export function registerSubagentRun(params: RegisterSubagentRunParams): Subagent cleanup = "delete", timeoutSeconds, announce, + groupId, start, } = params; @@ -113,6 +161,7 @@ export function registerSubagentRun(params: RegisterSubagentRunParams): Subagent label, cleanup, announce, + groupId, createdAt: Date.now(), }; @@ -190,6 +239,7 @@ export function shutdownSubagentRegistry(): void { /** Reset all state (for testing). */ export function resetSubagentRegistryForTests(): void { subagentRuns.clear(); + subagentGroups.clear(); resumedRequesters.clear(); stopSweeper(); } @@ -300,37 +350,59 @@ function captureFindings(record: SubagentRunRecord): void { /** * Phase 2: Announce completed-but-unannounced runs. * - * Runs with announce="silent" are held back until ALL silent runs from the - * same requester have completed. All other runs (immediate / undefined) are - * announced per-completion as before. + * Three announcement paths: + * 1. Grouped runs — wait for all runs in the group to complete, then announce + * together with the group's `next` continuation prompt (if any). + * 2. Ungrouped silent runs — legacy behavior: wait for ALL silent runs from + * the same requester to complete, then announce together. + * 3. Ungrouped immediate runs — announce per-completion (default). */ function checkAndAnnounce(requesterSessionId: string): void { const allRuns = listSubagentRuns(requesterSessionId); - // ── Immediate runs: announce per-completion (default behavior) ── - const immediateReady = allRuns.filter( + // ── 1. Grouped runs: announce by group when all members complete ── + const groupIds = new Set(); + for (const r of allRuns) { + if (r.groupId && !r.announced) groupIds.add(r.groupId); + } + + for (const groupId of groupIds) { + const groupRuns = allRuns.filter(r => r.groupId === groupId); + const unannounced = groupRuns.filter(r => !r.announced); + const ready = unannounced.filter(r => r.endedAt !== undefined && r.findingsCaptured); + + if (ready.length > 0 && ready.length === unannounced.length) { + const group = subagentGroups.get(groupId); + announceRuns(requesterSessionId, ready, group?.next); + } + } + + // ── 2. Ungrouped runs: original immediate/silent logic ── + const ungrouped = allRuns.filter(r => !r.groupId); + + // Immediate: announce per-completion + const immediateReady = ungrouped.filter( r => !r.announced && r.endedAt !== undefined && r.findingsCaptured && r.announce !== "silent", ); if (immediateReady.length > 0) { - announceGroup(requesterSessionId, immediateReady); + announceRuns(requesterSessionId, immediateReady); } - // ── Silent runs: announce only when ALL silent runs are done ── - const silentRuns = allRuns.filter(r => r.announce === "silent"); + // Silent: announce only when ALL ungrouped silent runs are done + const silentRuns = ungrouped.filter(r => r.announce === "silent"); const unannouncedSilent = silentRuns.filter(r => !r.announced); const silentReady = unannouncedSilent.filter( r => r.endedAt !== undefined && r.findingsCaptured, ); - // All unannounced silent runs must be ready (ended + findings captured) if (silentReady.length > 0 && silentReady.length === unannouncedSilent.length) { - announceGroup(requesterSessionId, silentReady); + announceRuns(requesterSessionId, silentReady); } } -/** Announce a group of runs and mark them as announced. */ -function announceGroup(requesterSessionId: string, runs: SubagentRunRecord[]): void { - const announced = runCoalescedAnnounceFlow(requesterSessionId, runs); +/** Announce a batch of completed runs and mark them as announced. */ +function announceRuns(requesterSessionId: string, runs: SubagentRunRecord[], next?: string): void { + const announced = runCoalescedAnnounceFlow(requesterSessionId, runs, next); if (announced) { for (const r of runs) { @@ -415,9 +487,18 @@ function sweep(): void { } } + // Clean up groups whose runs have all been archived + for (const [groupId] of subagentGroups) { + const hasActiveRuns = [...subagentRuns.values()].some(r => r.groupId === groupId); + if (!hasActiveRuns) { + subagentGroups.delete(groupId); + removed++; + } + } + if (removed > 0) { persist(); - console.log(`[SubagentRegistry] Archived ${removed} completed run(s)`); + console.log(`[SubagentRegistry] Archived ${removed} completed run(s)/group(s)`); } if (subagentRuns.size === 0) { @@ -431,7 +512,7 @@ function sweep(): void { function persist(): void { try { - saveSubagentRuns(subagentRuns); + saveSubagentRuns(subagentRuns, subagentGroups); } catch (err) { console.error(`[SubagentRegistry] Failed to persist runs:`, err); } diff --git a/packages/core/src/agent/subagent/types.ts b/packages/core/src/agent/subagent/types.ts index 96277181..50b07ba6 100644 --- a/packages/core/src/agent/subagent/types.ts +++ b/packages/core/src/agent/subagent/types.ts @@ -11,6 +11,26 @@ export type SubagentRunOutcome = { error?: string | undefined; }; +/** + * A logical group of subagent runs that are tracked together. + * Groups enable "collect all, then act" workflows: + * all runs in a group must complete before the combined results + * (plus an optional `next` continuation) are announced to the parent. + */ +export type SubagentGroup = { + /** Unique group identifier (UUIDv7) */ + groupId: string; + /** Session ID of the parent (requester) agent */ + requesterSessionId: string; + /** Optional human-readable label for the group */ + label?: string | undefined; + /** Continuation prompt executed after all runs in the group complete. + * Injected into the announcement so the parent agent acts on the combined findings. */ + next?: string | undefined; + /** Timestamp when the group was created */ + createdAt: number; +}; + /** Persistent record tracking a single subagent run */ export type SubagentRunRecord = { /** Unique run identifier (UUIDv7) */ @@ -48,6 +68,9 @@ export type SubagentRunRecord = { /** Announcement mode: "immediate" (default) announces per-completion, * "silent" defers until all silent runs from the same requester complete. */ announce?: "immediate" | "silent" | undefined; + /** Group ID this run belongs to (if any). Runs in a group are announced + * together when all complete, regardless of the `announce` field. */ + groupId?: string | undefined; }; /** Parameters for registering a new subagent run */ @@ -63,6 +86,12 @@ export type RegisterSubagentRunParams = { start?: (() => void) | undefined; /** Announcement mode: "immediate" (default) or "silent" (defer until all silent runs complete). */ announce?: "immediate" | "silent" | undefined; + /** Group ID to join. Runs in a group are announced together when all complete. */ + groupId?: string | undefined; + /** Continuation prompt for the group. Only used on group creation (first spawn). + * After all runs in the group complete, this prompt is included in the announcement + * so the parent agent can act on the combined findings (e.g. summarize, write PDF). */ + next?: string | undefined; }; /** Parameters for the announce flow */ diff --git a/packages/core/src/agent/system-prompt/sections.ts b/packages/core/src/agent/system-prompt/sections.ts index 70d1b7f1..5236affa 100644 --- a/packages/core/src/agent/system-prompt/sections.ts +++ b/packages/core/src/agent/system-prompt/sections.ts @@ -262,23 +262,47 @@ export function buildConditionalToolSections( lines.push( "## Sub-Agents", "If a task is complex or long-running, spawn a sub-agent. It will do the work and report back when done.", - "IMPORTANT: After spawning sub-agents, do NOT immediately check on them with sessions_list. " + - "Results are delivered directly into your context automatically when the sub-agent finishes. " + - "Continue with other tasks or finish your turn and wait for the results to arrive.", - "You may use sessions_list to check on sub-agents only if a long time has passed or the user explicitly asks about their status.", - "Sub-agents cannot spawn nested sub-agents.", + "", + "### Critical Rules", + "- **NEVER fabricate, guess, or make up data that a sub-agent has not yet returned.** " + + "This includes completion status — do NOT claim tasks are done until you receive actual results.", + "- After spawning, do NOT proceed with work that depends on the sub-agent results. " + + "You can still chat with the user, do unrelated tasks, or explain what the sub-agents are working on.", + "- Sub-agents cannot spawn nested sub-agents.", + "- You can use `sessions_list` to check sub-agent status if needed.", + "", + "### Groups and Continuation (`next`) — ALWAYS use for multi-agent tasks", + "When spawning multiple sub-agents, **always** use `next` to define the follow-up work. " + + "This is the standard pattern — do NOT use bare `announce: \"silent\"` for multi-agent collect-then-act workflows.", + "", + "```", + "// First spawn — creates a group automatically, returns groupId", + 'sessions_spawn({ task: "Get AAPL financials", next: "Summarize all data and write a PDF report", label: "AAPL" })', + "// → { groupId: \"grp-abc\", runId: \"...\" }", + "", + "// Subsequent spawns — join the same group", + 'sessions_spawn({ task: "Get MSFT financials", groupId: "grp-abc", label: "MSFT" })', + 'sessions_spawn({ task: "Get GOOG financials", groupId: "grp-abc", label: "GOOG" })', + "```", + "", + "The system waits for ALL runs in the group to complete, then delivers the combined findings " + + "plus the `next` continuation prompt back to you. You can then use tools (write files, call APIs, etc.) " + + "to complete the follow-up work. The user is NOT blocked during this process — they can keep chatting.", + "", + "Use `next` whenever the user's request involves: collect data → then act on it (summarize, analyze, generate files).", + "Without `next`, findings are summarized but no further action is taken.", + "", + "### Announce Modes (when not using groups)", + "- `announce: \"immediate\"` (default): findings delivered per sub-agent as each completes.", + "- `announce: \"silent\"`: all findings held until every silent sub-agent finishes, then delivered together.", + "Groups always use silent collection internally — you don't need to set announce when using groupId.", "", "### Timeout Guidelines", "Set timeoutSeconds generously — a sub-agent that times out loses all its work.", "- Simple tasks (search, read, summarize): 600 (10 min, the default)", "- Moderate tasks (multi-step research, file downloads + analysis): 900–1200 (15–20 min)", "- Complex tasks (code generation, PDF creation, multi-file operations): 1200–1800 (20–30 min)", - "When in doubt, use a longer timeout. It is always better to wait longer than to lose completed work.", - "", - "### Announce Modes", - "- `announce: \"immediate\"` (default): Each sub-agent's findings are delivered to you as soon as it completes.", - "- `announce: \"silent\"`: All findings are held back until every silent sub-agent finishes, then delivered as ONE combined report.", - "Use \"silent\" when you want to collect data from multiple sub-agents first, then summarize everything at once.", + "When in doubt, use a longer timeout.", "", ); } diff --git a/packages/core/src/agent/tools/sessions-list.ts b/packages/core/src/agent/tools/sessions-list.ts index 988bfa1c..93ef896c 100644 --- a/packages/core/src/agent/tools/sessions-list.ts +++ b/packages/core/src/agent/tools/sessions-list.ts @@ -7,7 +7,7 @@ import { Type } from "@sinclair/typebox"; import type { AgentTool } from "@mariozechner/pi-agent-core"; -import { listSubagentRuns, getSubagentRun } from "../subagent/registry.js"; +import { listSubagentRuns, getSubagentRun, getSubagentGroup } from "../subagent/registry.js"; import type { SubagentRunRecord } from "../subagent/types.js"; const SessionsListSchema = Type.Object({ @@ -79,6 +79,11 @@ function formatRunDetail(record: SubagentRunRecord, now: number): string { ]; if (record.label) lines.push(`Label: ${record.label}`); + if (record.groupId) { + const group = getSubagentGroup(record.groupId); + lines.push(`Group: ${record.groupId}${group?.label ? ` (${group.label})` : ""}`); + if (group?.next) lines.push(`Continuation: ${group.next.slice(0, 120)}${group.next.length > 120 ? "…" : ""}`); + } lines.push(`Task: ${record.task}`); lines.push(`Status: ${status}${record.outcome?.error ? ` — ${record.outcome.error}` : ""}`); lines.push(`Child Session: ${record.childSessionId}`); @@ -128,8 +133,7 @@ export function createSessionsListTool( description: "List all subagent runs spawned by this session and their current status. " + "Optionally pass a runId to get detailed information about a specific run. " + - "NOTE: Do NOT call this immediately after spawning subagents — results arrive automatically in your context when subagents complete. " + - "Only use this if a long time has passed or the user explicitly asks about subagent status.", + "Use this to check subagent progress or when the user asks about status.", parameters: SessionsListSchema, execute: async (_toolCallId, args) => { const { runId } = args as SessionsListArgs; @@ -177,21 +181,59 @@ export function createSessionsListTool( const someRunning = runs.some((r) => !r.endedAt); - // Build status lines for each run + // Build status lines, grouping runs by groupId const statusLines: string[] = []; - for (let i = 0; i < runs.length; i++) { - const r = runs[i]!; + const groupedRuns = new Map(); + const ungroupedRuns: SubagentRunRecord[] = []; + + for (const r of runs) { + if (r.groupId) { + const list = groupedRuns.get(r.groupId) ?? []; + list.push(r); + groupedRuns.set(r.groupId, list); + } else { + ungroupedRuns.push(r); + } + } + + let idx = 0; + + // Grouped runs + for (const [gId, gRuns] of groupedRuns) { + const group = getSubagentGroup(gId); + const groupLabel = group?.label || `Group ${gId.slice(0, 8)}…`; + const done = gRuns.filter(r => r.endedAt).length; + const nextSnippet = group?.next ? ` → next: "${group.next.slice(0, 60)}${group.next.length > 60 ? "…" : ""}"` : ""; + statusLines.push(`\n 📦 ${groupLabel} (${done}/${gRuns.length} done${nextSnippet})`); + + for (const r of gRuns) { + idx++; + const displayName = r.label || r.task.slice(0, 60); + const status = resolveStatus(r); + if (status === "running") { + const elapsed = r.startedAt ? formatElapsed(now - r.startedAt) : "just spawned"; + statusLines.push(` ${idx}. [RUNNING] "${displayName}" (${elapsed})`); + } else { + const elapsed = r.startedAt && r.endedAt ? formatElapsed(r.endedAt - r.startedAt) : ""; + statusLines.push(` ${idx}. [${status.toUpperCase()}] "${displayName}" (${elapsed})`); + } + } + } + + // Ungrouped runs + for (const r of ungroupedRuns) { + idx++; const displayName = r.label || r.task.slice(0, 60); const status = resolveStatus(r); if (status === "running") { const elapsed = r.startedAt ? formatElapsed(now - r.startedAt) : "just spawned"; - statusLines.push(` ${i + 1}. [RUNNING] "${displayName}" (${elapsed})`); + statusLines.push(` ${idx}. [RUNNING] "${displayName}" (${elapsed})`); } else { const elapsed = r.startedAt && r.endedAt ? formatElapsed(r.endedAt - r.startedAt) : ""; const findings = r.findingsCaptured ? (r.findings ? r.findings.slice(0, 200) + (r.findings.length > 200 ? "…" : "") : "(no output)") : "(findings not yet captured)"; - statusLines.push(` ${i + 1}. [${status.toUpperCase()}] "${displayName}" (${elapsed})\n Findings: ${findings}`); + statusLines.push(` ${idx}. [${status.toUpperCase()}] "${displayName}" (${elapsed})\n Findings: ${findings}`); } } diff --git a/packages/core/src/agent/tools/sessions-spawn.ts b/packages/core/src/agent/tools/sessions-spawn.ts index 1df31e41..87ba89dc 100644 --- a/packages/core/src/agent/tools/sessions-spawn.ts +++ b/packages/core/src/agent/tools/sessions-spawn.ts @@ -10,7 +10,7 @@ import { Type } from "@sinclair/typebox"; import type { AgentTool } from "@mariozechner/pi-agent-core"; import { getHub } from "../../hub/hub-singleton.js"; import { buildSubagentSystemPrompt } from "../subagent/announce.js"; -import { registerSubagentRun } from "../subagent/registry.js"; +import { registerSubagentRun, createSubagentGroup, getSubagentGroup } from "../subagent/registry.js"; import { resolveTools } from "../tools.js"; const SessionsSpawnSchema = Type.Object({ @@ -41,7 +41,26 @@ const SessionsSpawnSchema = Type.Object({ "Announcement mode. 'immediate' (default): findings delivered as each subagent completes. " + "'silent': defer all announcements until every silent subagent from this session finishes, " + "then deliver one combined report. Use 'silent' when spawning multiple subagents to collect " + - "data in parallel and you want to summarize everything at once.", + "data in parallel and you want to summarize everything at once. " + + "Ignored when groupId is provided (groups always collect all results before announcing).", + }), + ), + groupId: Type.Optional( + Type.String({ + description: + "Join an existing group. Pass the groupId returned by a previous sessions_spawn call " + + "to add this subagent to the same group. All runs in a group are announced together " + + "when the last one completes. If omitted AND 'next' is provided, a new group is created automatically.", + }), + ), + next: Type.Optional( + Type.String({ + description: + "Continuation task to execute after ALL subagents in the group complete. " + + "Only used when creating a new group (first spawn without groupId). " + + "When set, the combined findings from all subagents plus this 'next' prompt " + + "are delivered to you so you can perform follow-up work (e.g. summarize, generate reports, write files). " + + "Setting 'next' automatically creates a group and implies silent collection.", }), ), }); @@ -53,12 +72,15 @@ type SessionsSpawnArgs = { cleanup?: "delete" | "keep"; timeoutSeconds?: number; announce?: "immediate" | "silent"; + groupId?: string; + next?: string; }; export type SessionsSpawnResult = { status: "accepted" | "error"; childSessionId?: string; runId?: string; + groupId?: string; error?: string; }; @@ -79,13 +101,15 @@ export function createSessionsSpawnTool( label: "Spawn Subagent", description: "Spawn a background subagent to handle a specific task. The subagent runs in an isolated session with its own tool set. " + - "When it completes, its findings are delivered directly into your context automatically — you do NOT need to poll or check. " + - "IMPORTANT: After spawning subagents, continue with any other immediate tasks you have, or simply finish your turn and wait. " + - "Do NOT call sessions_list to check on subagents you just spawned — results take time and will arrive on their own. " + + "When it completes, its findings are delivered directly into your context automatically. " + + "After spawning, do NOT proceed with work that depends on the results — but you can still chat or do unrelated tasks. " + + "When spawning multiple subagents for a collect-then-act workflow, ALWAYS use the `next` parameter " + + "on the first spawn to define follow-up work, then pass the returned groupId to subsequent spawns. " + "Use this for parallelizable work, long-running analysis, or tasks that benefit from isolation.", parameters: SessionsSpawnSchema, execute: async (_toolCallId, args) => { - const { task, label, model, cleanup = "delete", timeoutSeconds, announce } = args as SessionsSpawnArgs; + const { task, label, model, cleanup = "delete", timeoutSeconds, announce, next } = args as SessionsSpawnArgs; + let { groupId } = args as SessionsSpawnArgs; // Guard: subagents cannot spawn subagents if (options.isSubagent) { @@ -102,6 +126,28 @@ export function createSessionsSpawnTool( const runId = uuidv7(); const childSessionId = uuidv7(); + // Validate groupId if provided + if (groupId) { + const existingGroup = getSubagentGroup(groupId); + if (!existingGroup) { + return { + content: [{ type: "text", text: `Error: group not found: ${groupId}. Use the groupId returned by a previous sessions_spawn call.` }], + details: { status: "error", error: `group not found: ${groupId}` }, + }; + } + } + + // Auto-create group when `next` is provided without an existing groupId + if (!groupId && next) { + groupId = uuidv7(); + createSubagentGroup({ + groupId, + requesterSessionId, + label: label ? `Group: ${label}` : undefined, + next, + }); + } + // Resolve tools for the subagent (with isSubagent=true for policy filtering) const subagentTools = resolveTools({ isSubagent: true }); const toolNames = subagentTools.map((t) => t.name); @@ -135,21 +181,27 @@ export function createSessionsSpawnTool( label, cleanup, timeoutSeconds, - announce, + announce: groupId ? "silent" : announce, + groupId, start: () => childAgent.write(task), }); + // Build response text + const groupInfo = groupId ? `\nGroup: ${groupId}` : ""; + const nextInfo = next ? `\nContinuation: "${next.slice(0, 100)}${next.length > 100 ? "…" : ""}"` : ""; + const responseText = + `Subagent spawned: ${label || task.slice(0, 80)}\n` + + `Run: ${runId}${groupInfo}${nextInfo}\n\n` + + `⏳ WAITING FOR RESULTS — do NOT proceed with work that depends on these results.\n` + + `Do NOT fabricate data or completion status. Results will arrive in your context automatically.`; + return { - content: [ - { - type: "text", - text: `Subagent spawned successfully.\n\nRun ID: ${runId}\nSession: ${childSessionId}\nTask: ${label || task.slice(0, 80)}\n\nThe subagent is now working in the background. Its findings will be delivered directly into your context when it completes — do NOT poll or call sessions_list for it. Continue with other tasks or finish your turn.`, - }, - ], + content: [{ type: "text", text: responseText }], details: { status: "accepted", childSessionId, runId, + groupId, }, }; } catch (err) {