feat(subagent): add SubagentGroup type and registry support

Add SubagentGroup for "collect all, then act" workflows where multiple
subagents complete before a continuation task runs. Groups are persisted
alongside run records and support a `next` continuation prompt.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Jiang Bohan 2026-02-12 17:11:20 +08:00
parent c251d94c8a
commit 12fb12b895
4 changed files with 149 additions and 19 deletions

View file

@ -12,6 +12,7 @@ const rmSyncMock = vi.fn();
vi.mock("./registry-store.js", () => ({
loadSubagentRuns: loadSubagentRunsMock,
loadSubagentGroups: vi.fn(() => new Map()),
saveSubagentRuns: saveSubagentRunsMock,
}));

View file

@ -7,7 +7,7 @@
import { existsSync, mkdirSync, readFileSync, writeFileSync } from "node:fs";
import { join } from "node:path";
import { DATA_DIR } from "@multica/utils";
import type { SubagentRunRecord } from "./types.js";
import type { SubagentRunRecord, SubagentGroup } from "./types.js";
const SUBAGENTS_DIR = join(DATA_DIR, "subagents");
const RUNS_FILE = join(SUBAGENTS_DIR, "runs.json");
@ -15,6 +15,7 @@ const RUNS_FILE = join(SUBAGENTS_DIR, "runs.json");
interface SubagentRunsStore {
version: 1;
runs: Record<string, SubagentRunRecord>;
groups?: Record<string, SubagentGroup> | undefined;
}
function ensureDir(): void {
@ -48,13 +49,31 @@ export function loadSubagentRuns(): Map<string, SubagentRunRecord> {
}
}
/** Save all subagent runs to disk */
export function saveSubagentRuns(runs: Map<string, SubagentRunRecord>): void {
/** Load all persisted subagent groups */
export function loadSubagentGroups(): Map<string, SubagentGroup> {
if (!existsSync(RUNS_FILE)) return new Map();
try {
const content = readFileSync(RUNS_FILE, "utf-8");
const store = JSON.parse(content) as SubagentRunsStore;
if (store.version !== 1 || !store.groups) return new Map();
return new Map(Object.entries(store.groups));
} catch {
return new Map();
}
}
/** Save all subagent runs and groups to disk */
export function saveSubagentRuns(
runs: Map<string, SubagentRunRecord>,
groups?: Map<string, SubagentGroup>,
): void {
ensureDir();
const store: SubagentRunsStore = {
version: 1,
runs: Object.fromEntries(runs),
groups: groups && groups.size > 0 ? Object.fromEntries(groups) : undefined,
};
writeFileSync(RUNS_FILE, JSON.stringify(store, null, 2), "utf-8");

View file

@ -6,11 +6,12 @@
*/
import { getHub, isHubInitialized } from "../../hub/hub-singleton.js";
import { loadSubagentRuns, saveSubagentRuns } from "./registry-store.js";
import { loadSubagentRuns, saveSubagentRuns, loadSubagentGroups } from "./registry-store.js";
import { readLatestAssistantReply, runCoalescedAnnounceFlow } from "./announce.js";
import type {
RegisterSubagentRunParams,
SubagentRunRecord,
SubagentGroup,
} from "./types.js";
import { resolveSessionDir } from "../session/storage.js";
import { rmSync } from "node:fs";
@ -28,6 +29,7 @@ const SWEEP_INTERVAL_MS = 60 * 1000;
// ============================================================================
const subagentRuns = new Map<string, SubagentRunRecord>();
const subagentGroups = new Map<string, SubagentGroup>();
let sweepTimer: ReturnType<typeof setInterval> | undefined;
const resumedRequesters = new Set<string>();
@ -50,6 +52,12 @@ export function initSubagentRegistry(): void {
}
}
// Restore groups
const persistedGroups = loadSubagentGroups();
for (const [groupId, group] of persistedGroups) {
subagentGroups.set(groupId, group);
}
// Process incomplete runs
const affectedRequesters = new Set<string>();
@ -91,6 +99,45 @@ export function initSubagentRegistry(): void {
}
}
// ============================================================================
// Group management
// ============================================================================
/** Create a new subagent group. Returns the group record. */
export function createSubagentGroup(params: {
groupId: string;
requesterSessionId: string;
label?: string;
next?: string;
}): SubagentGroup {
const group: SubagentGroup = {
groupId: params.groupId,
requesterSessionId: params.requesterSessionId,
label: params.label,
next: params.next,
createdAt: Date.now(),
};
subagentGroups.set(params.groupId, group);
persist();
return group;
}
/** Get a group by ID. */
export function getSubagentGroup(groupId: string): SubagentGroup | undefined {
return subagentGroups.get(groupId);
}
/** List all runs belonging to a group. */
export function listGroupRuns(groupId: string): SubagentRunRecord[] {
const result: SubagentRunRecord[] = [];
for (const record of subagentRuns.values()) {
if (record.groupId === groupId) {
result.push(record);
}
}
return result;
}
/** Register a new subagent run and start tracking its lifecycle. */
export function registerSubagentRun(params: RegisterSubagentRunParams): SubagentRunRecord {
const {
@ -102,6 +149,7 @@ export function registerSubagentRun(params: RegisterSubagentRunParams): Subagent
cleanup = "delete",
timeoutSeconds,
announce,
groupId,
start,
} = params;
@ -113,6 +161,7 @@ export function registerSubagentRun(params: RegisterSubagentRunParams): Subagent
label,
cleanup,
announce,
groupId,
createdAt: Date.now(),
};
@ -190,6 +239,7 @@ export function shutdownSubagentRegistry(): void {
/** Reset all state (for testing). */
export function resetSubagentRegistryForTests(): void {
subagentRuns.clear();
subagentGroups.clear();
resumedRequesters.clear();
stopSweeper();
}
@ -300,37 +350,59 @@ function captureFindings(record: SubagentRunRecord): void {
/**
* Phase 2: Announce completed-but-unannounced runs.
*
* Runs with announce="silent" are held back until ALL silent runs from the
* same requester have completed. All other runs (immediate / undefined) are
* announced per-completion as before.
* Three announcement paths:
* 1. Grouped runs wait for all runs in the group to complete, then announce
* together with the group's `next` continuation prompt (if any).
* 2. Ungrouped silent runs legacy behavior: wait for ALL silent runs from
* the same requester to complete, then announce together.
* 3. Ungrouped immediate runs announce per-completion (default).
*/
function checkAndAnnounce(requesterSessionId: string): void {
const allRuns = listSubagentRuns(requesterSessionId);
// ── Immediate runs: announce per-completion (default behavior) ──
const immediateReady = allRuns.filter(
// ── 1. Grouped runs: announce by group when all members complete ──
const groupIds = new Set<string>();
for (const r of allRuns) {
if (r.groupId && !r.announced) groupIds.add(r.groupId);
}
for (const groupId of groupIds) {
const groupRuns = allRuns.filter(r => r.groupId === groupId);
const unannounced = groupRuns.filter(r => !r.announced);
const ready = unannounced.filter(r => r.endedAt !== undefined && r.findingsCaptured);
if (ready.length > 0 && ready.length === unannounced.length) {
const group = subagentGroups.get(groupId);
announceRuns(requesterSessionId, ready, group?.next);
}
}
// ── 2. Ungrouped runs: original immediate/silent logic ──
const ungrouped = allRuns.filter(r => !r.groupId);
// Immediate: announce per-completion
const immediateReady = ungrouped.filter(
r => !r.announced && r.endedAt !== undefined && r.findingsCaptured && r.announce !== "silent",
);
if (immediateReady.length > 0) {
announceGroup(requesterSessionId, immediateReady);
announceRuns(requesterSessionId, immediateReady);
}
// ── Silent runs: announce only when ALL silent runs are done ──
const silentRuns = allRuns.filter(r => r.announce === "silent");
// Silent: announce only when ALL ungrouped silent runs are done
const silentRuns = ungrouped.filter(r => r.announce === "silent");
const unannouncedSilent = silentRuns.filter(r => !r.announced);
const silentReady = unannouncedSilent.filter(
r => r.endedAt !== undefined && r.findingsCaptured,
);
// All unannounced silent runs must be ready (ended + findings captured)
if (silentReady.length > 0 && silentReady.length === unannouncedSilent.length) {
announceGroup(requesterSessionId, silentReady);
announceRuns(requesterSessionId, silentReady);
}
}
/** Announce a group of runs and mark them as announced. */
function announceGroup(requesterSessionId: string, runs: SubagentRunRecord[]): void {
const announced = runCoalescedAnnounceFlow(requesterSessionId, runs);
/** Announce a batch of completed runs and mark them as announced. */
function announceRuns(requesterSessionId: string, runs: SubagentRunRecord[], next?: string): void {
const announced = runCoalescedAnnounceFlow(requesterSessionId, runs, next);
if (announced) {
for (const r of runs) {
@ -415,9 +487,18 @@ function sweep(): void {
}
}
// Clean up groups whose runs have all been archived
for (const [groupId] of subagentGroups) {
const hasActiveRuns = [...subagentRuns.values()].some(r => r.groupId === groupId);
if (!hasActiveRuns) {
subagentGroups.delete(groupId);
removed++;
}
}
if (removed > 0) {
persist();
console.log(`[SubagentRegistry] Archived ${removed} completed run(s)`);
console.log(`[SubagentRegistry] Archived ${removed} completed run(s)/group(s)`);
}
if (subagentRuns.size === 0) {
@ -431,7 +512,7 @@ function sweep(): void {
function persist(): void {
try {
saveSubagentRuns(subagentRuns);
saveSubagentRuns(subagentRuns, subagentGroups);
} catch (err) {
console.error(`[SubagentRegistry] Failed to persist runs:`, err);
}

View file

@ -11,6 +11,26 @@ export type SubagentRunOutcome = {
error?: string | undefined;
};
/**
* A logical group of subagent runs that are tracked together.
* Groups enable "collect all, then act" workflows:
* all runs in a group must complete before the combined results
* (plus an optional `next` continuation) are announced to the parent.
*/
export type SubagentGroup = {
/** Unique group identifier (UUIDv7) */
groupId: string;
/** Session ID of the parent (requester) agent */
requesterSessionId: string;
/** Optional human-readable label for the group */
label?: string | undefined;
/** Continuation prompt executed after all runs in the group complete.
* Injected into the announcement so the parent agent acts on the combined findings. */
next?: string | undefined;
/** Timestamp when the group was created */
createdAt: number;
};
/** Persistent record tracking a single subagent run */
export type SubagentRunRecord = {
/** Unique run identifier (UUIDv7) */
@ -48,6 +68,9 @@ export type SubagentRunRecord = {
/** Announcement mode: "immediate" (default) announces per-completion,
* "silent" defers until all silent runs from the same requester complete. */
announce?: "immediate" | "silent" | undefined;
/** Group ID this run belongs to (if any). Runs in a group are announced
* together when all complete, regardless of the `announce` field. */
groupId?: string | undefined;
};
/** Parameters for registering a new subagent run */
@ -63,6 +86,12 @@ export type RegisterSubagentRunParams = {
start?: (() => void) | undefined;
/** Announcement mode: "immediate" (default) or "silent" (defer until all silent runs complete). */
announce?: "immediate" | "silent" | undefined;
/** Group ID to join. Runs in a group are announced together when all complete. */
groupId?: string | undefined;
/** Continuation prompt for the group. Only used on group creation (first spawn).
* After all runs in the group complete, this prompt is included in the announcement
* so the parent agent can act on the combined findings (e.g. summarize, write PDF). */
next?: string | undefined;
};
/** Parameters for the announce flow */