From 12fb12b89520dcbd49f0d5bea5dfd59ce86eef9c Mon Sep 17 00:00:00 2001 From: Jiang Bohan Date: Thu, 12 Feb 2026 17:11:20 +0800 Subject: [PATCH] feat(subagent): add SubagentGroup type and registry support Add SubagentGroup for "collect all, then act" workflows where multiple subagents complete before a continuation task runs. Groups are persisted alongside run records and support a `next` continuation prompt. Co-Authored-By: Claude Opus 4.6 --- .../agent/subagent/registry-recovery.test.ts | 1 + .../core/src/agent/subagent/registry-store.ts | 25 +++- packages/core/src/agent/subagent/registry.ts | 113 +++++++++++++++--- packages/core/src/agent/subagent/types.ts | 29 +++++ 4 files changed, 149 insertions(+), 19 deletions(-) diff --git a/packages/core/src/agent/subagent/registry-recovery.test.ts b/packages/core/src/agent/subagent/registry-recovery.test.ts index 046db13f..da9bc14f 100644 --- a/packages/core/src/agent/subagent/registry-recovery.test.ts +++ b/packages/core/src/agent/subagent/registry-recovery.test.ts @@ -12,6 +12,7 @@ const rmSyncMock = vi.fn(); vi.mock("./registry-store.js", () => ({ loadSubagentRuns: loadSubagentRunsMock, + loadSubagentGroups: vi.fn(() => new Map()), saveSubagentRuns: saveSubagentRunsMock, })); diff --git a/packages/core/src/agent/subagent/registry-store.ts b/packages/core/src/agent/subagent/registry-store.ts index f33edd38..17fc8c8f 100644 --- a/packages/core/src/agent/subagent/registry-store.ts +++ b/packages/core/src/agent/subagent/registry-store.ts @@ -7,7 +7,7 @@ import { existsSync, mkdirSync, readFileSync, writeFileSync } from "node:fs"; import { join } from "node:path"; import { DATA_DIR } from "@multica/utils"; -import type { SubagentRunRecord } from "./types.js"; +import type { SubagentRunRecord, SubagentGroup } from "./types.js"; const SUBAGENTS_DIR = join(DATA_DIR, "subagents"); const RUNS_FILE = join(SUBAGENTS_DIR, "runs.json"); @@ -15,6 +15,7 @@ const RUNS_FILE = join(SUBAGENTS_DIR, "runs.json"); interface SubagentRunsStore { version: 1; runs: Record; + groups?: Record | undefined; } function ensureDir(): void { @@ -48,13 +49,31 @@ export function loadSubagentRuns(): Map { } } -/** Save all subagent runs to disk */ -export function saveSubagentRuns(runs: Map): void { +/** Load all persisted subagent groups */ +export function loadSubagentGroups(): Map { + if (!existsSync(RUNS_FILE)) return new Map(); + + try { + const content = readFileSync(RUNS_FILE, "utf-8"); + const store = JSON.parse(content) as SubagentRunsStore; + if (store.version !== 1 || !store.groups) return new Map(); + return new Map(Object.entries(store.groups)); + } catch { + return new Map(); + } +} + +/** Save all subagent runs and groups to disk */ +export function saveSubagentRuns( + runs: Map, + groups?: Map, +): void { ensureDir(); const store: SubagentRunsStore = { version: 1, runs: Object.fromEntries(runs), + groups: groups && groups.size > 0 ? Object.fromEntries(groups) : undefined, }; writeFileSync(RUNS_FILE, JSON.stringify(store, null, 2), "utf-8"); diff --git a/packages/core/src/agent/subagent/registry.ts b/packages/core/src/agent/subagent/registry.ts index c19a5529..8a311561 100644 --- a/packages/core/src/agent/subagent/registry.ts +++ b/packages/core/src/agent/subagent/registry.ts @@ -6,11 +6,12 @@ */ import { getHub, isHubInitialized } from "../../hub/hub-singleton.js"; -import { loadSubagentRuns, saveSubagentRuns } from "./registry-store.js"; +import { loadSubagentRuns, saveSubagentRuns, loadSubagentGroups } from "./registry-store.js"; import { readLatestAssistantReply, runCoalescedAnnounceFlow } from "./announce.js"; import type { RegisterSubagentRunParams, SubagentRunRecord, + SubagentGroup, } from "./types.js"; import { resolveSessionDir } from "../session/storage.js"; import { rmSync } from "node:fs"; @@ -28,6 +29,7 @@ const SWEEP_INTERVAL_MS = 60 * 1000; // ============================================================================ const subagentRuns = new Map(); +const subagentGroups = new Map(); let sweepTimer: ReturnType | undefined; const resumedRequesters = new Set(); @@ -50,6 +52,12 @@ export function initSubagentRegistry(): void { } } + // Restore groups + const persistedGroups = loadSubagentGroups(); + for (const [groupId, group] of persistedGroups) { + subagentGroups.set(groupId, group); + } + // Process incomplete runs const affectedRequesters = new Set(); @@ -91,6 +99,45 @@ export function initSubagentRegistry(): void { } } +// ============================================================================ +// Group management +// ============================================================================ + +/** Create a new subagent group. Returns the group record. */ +export function createSubagentGroup(params: { + groupId: string; + requesterSessionId: string; + label?: string; + next?: string; +}): SubagentGroup { + const group: SubagentGroup = { + groupId: params.groupId, + requesterSessionId: params.requesterSessionId, + label: params.label, + next: params.next, + createdAt: Date.now(), + }; + subagentGroups.set(params.groupId, group); + persist(); + return group; +} + +/** Get a group by ID. */ +export function getSubagentGroup(groupId: string): SubagentGroup | undefined { + return subagentGroups.get(groupId); +} + +/** List all runs belonging to a group. */ +export function listGroupRuns(groupId: string): SubagentRunRecord[] { + const result: SubagentRunRecord[] = []; + for (const record of subagentRuns.values()) { + if (record.groupId === groupId) { + result.push(record); + } + } + return result; +} + /** Register a new subagent run and start tracking its lifecycle. */ export function registerSubagentRun(params: RegisterSubagentRunParams): SubagentRunRecord { const { @@ -102,6 +149,7 @@ export function registerSubagentRun(params: RegisterSubagentRunParams): Subagent cleanup = "delete", timeoutSeconds, announce, + groupId, start, } = params; @@ -113,6 +161,7 @@ export function registerSubagentRun(params: RegisterSubagentRunParams): Subagent label, cleanup, announce, + groupId, createdAt: Date.now(), }; @@ -190,6 +239,7 @@ export function shutdownSubagentRegistry(): void { /** Reset all state (for testing). */ export function resetSubagentRegistryForTests(): void { subagentRuns.clear(); + subagentGroups.clear(); resumedRequesters.clear(); stopSweeper(); } @@ -300,37 +350,59 @@ function captureFindings(record: SubagentRunRecord): void { /** * Phase 2: Announce completed-but-unannounced runs. * - * Runs with announce="silent" are held back until ALL silent runs from the - * same requester have completed. All other runs (immediate / undefined) are - * announced per-completion as before. + * Three announcement paths: + * 1. Grouped runs — wait for all runs in the group to complete, then announce + * together with the group's `next` continuation prompt (if any). + * 2. Ungrouped silent runs — legacy behavior: wait for ALL silent runs from + * the same requester to complete, then announce together. + * 3. Ungrouped immediate runs — announce per-completion (default). */ function checkAndAnnounce(requesterSessionId: string): void { const allRuns = listSubagentRuns(requesterSessionId); - // ── Immediate runs: announce per-completion (default behavior) ── - const immediateReady = allRuns.filter( + // ── 1. Grouped runs: announce by group when all members complete ── + const groupIds = new Set(); + for (const r of allRuns) { + if (r.groupId && !r.announced) groupIds.add(r.groupId); + } + + for (const groupId of groupIds) { + const groupRuns = allRuns.filter(r => r.groupId === groupId); + const unannounced = groupRuns.filter(r => !r.announced); + const ready = unannounced.filter(r => r.endedAt !== undefined && r.findingsCaptured); + + if (ready.length > 0 && ready.length === unannounced.length) { + const group = subagentGroups.get(groupId); + announceRuns(requesterSessionId, ready, group?.next); + } + } + + // ── 2. Ungrouped runs: original immediate/silent logic ── + const ungrouped = allRuns.filter(r => !r.groupId); + + // Immediate: announce per-completion + const immediateReady = ungrouped.filter( r => !r.announced && r.endedAt !== undefined && r.findingsCaptured && r.announce !== "silent", ); if (immediateReady.length > 0) { - announceGroup(requesterSessionId, immediateReady); + announceRuns(requesterSessionId, immediateReady); } - // ── Silent runs: announce only when ALL silent runs are done ── - const silentRuns = allRuns.filter(r => r.announce === "silent"); + // Silent: announce only when ALL ungrouped silent runs are done + const silentRuns = ungrouped.filter(r => r.announce === "silent"); const unannouncedSilent = silentRuns.filter(r => !r.announced); const silentReady = unannouncedSilent.filter( r => r.endedAt !== undefined && r.findingsCaptured, ); - // All unannounced silent runs must be ready (ended + findings captured) if (silentReady.length > 0 && silentReady.length === unannouncedSilent.length) { - announceGroup(requesterSessionId, silentReady); + announceRuns(requesterSessionId, silentReady); } } -/** Announce a group of runs and mark them as announced. */ -function announceGroup(requesterSessionId: string, runs: SubagentRunRecord[]): void { - const announced = runCoalescedAnnounceFlow(requesterSessionId, runs); +/** Announce a batch of completed runs and mark them as announced. */ +function announceRuns(requesterSessionId: string, runs: SubagentRunRecord[], next?: string): void { + const announced = runCoalescedAnnounceFlow(requesterSessionId, runs, next); if (announced) { for (const r of runs) { @@ -415,9 +487,18 @@ function sweep(): void { } } + // Clean up groups whose runs have all been archived + for (const [groupId] of subagentGroups) { + const hasActiveRuns = [...subagentRuns.values()].some(r => r.groupId === groupId); + if (!hasActiveRuns) { + subagentGroups.delete(groupId); + removed++; + } + } + if (removed > 0) { persist(); - console.log(`[SubagentRegistry] Archived ${removed} completed run(s)`); + console.log(`[SubagentRegistry] Archived ${removed} completed run(s)/group(s)`); } if (subagentRuns.size === 0) { @@ -431,7 +512,7 @@ function sweep(): void { function persist(): void { try { - saveSubagentRuns(subagentRuns); + saveSubagentRuns(subagentRuns, subagentGroups); } catch (err) { console.error(`[SubagentRegistry] Failed to persist runs:`, err); } diff --git a/packages/core/src/agent/subagent/types.ts b/packages/core/src/agent/subagent/types.ts index 96277181..50b07ba6 100644 --- a/packages/core/src/agent/subagent/types.ts +++ b/packages/core/src/agent/subagent/types.ts @@ -11,6 +11,26 @@ export type SubagentRunOutcome = { error?: string | undefined; }; +/** + * A logical group of subagent runs that are tracked together. + * Groups enable "collect all, then act" workflows: + * all runs in a group must complete before the combined results + * (plus an optional `next` continuation) are announced to the parent. + */ +export type SubagentGroup = { + /** Unique group identifier (UUIDv7) */ + groupId: string; + /** Session ID of the parent (requester) agent */ + requesterSessionId: string; + /** Optional human-readable label for the group */ + label?: string | undefined; + /** Continuation prompt executed after all runs in the group complete. + * Injected into the announcement so the parent agent acts on the combined findings. */ + next?: string | undefined; + /** Timestamp when the group was created */ + createdAt: number; +}; + /** Persistent record tracking a single subagent run */ export type SubagentRunRecord = { /** Unique run identifier (UUIDv7) */ @@ -48,6 +68,9 @@ export type SubagentRunRecord = { /** Announcement mode: "immediate" (default) announces per-completion, * "silent" defers until all silent runs from the same requester complete. */ announce?: "immediate" | "silent" | undefined; + /** Group ID this run belongs to (if any). Runs in a group are announced + * together when all complete, regardless of the `announce` field. */ + groupId?: string | undefined; }; /** Parameters for registering a new subagent run */ @@ -63,6 +86,12 @@ export type RegisterSubagentRunParams = { start?: (() => void) | undefined; /** Announcement mode: "immediate" (default) or "silent" (defer until all silent runs complete). */ announce?: "immediate" | "silent" | undefined; + /** Group ID to join. Runs in a group are announced together when all complete. */ + groupId?: string | undefined; + /** Continuation prompt for the group. Only used on group creation (first spawn). + * After all runs in the group complete, this prompt is included in the announcement + * so the parent agent can act on the combined findings (e.g. summarize, write PDF). */ + next?: string | undefined; }; /** Parameters for the announce flow */