Merge pull request #139 from multica-ai/feat/subagent-group-next
feat(subagent): add group/next continuation for multi-agent workflows
This commit is contained in:
commit
b0f3e1e38a
9 changed files with 375 additions and 66 deletions
|
|
@ -188,7 +188,7 @@ describe("formatCoalescedAnnouncementMessage", () => {
|
|||
|
||||
const msg = formatCoalescedAnnouncementMessage(records);
|
||||
|
||||
expect(msg).toContain("All 2 background tasks have completed");
|
||||
expect(msg).toContain("All 2 background task(s) have completed");
|
||||
expect(msg).toContain('Task 1: "Task A"');
|
||||
expect(msg).toContain("Found issue A");
|
||||
expect(msg).toContain('Task 2: "Task B"');
|
||||
|
|
@ -251,4 +251,44 @@ describe("formatCoalescedAnnouncementMessage", () => {
|
|||
expect(msg).toContain("上海:多云,9°C");
|
||||
expect(msg).toContain("MUST include findings from every task item above");
|
||||
});
|
||||
|
||||
it("includes continuation prompt when next is provided", () => {
|
||||
const records = [
|
||||
makeRecord({ runId: "run-1", label: "AAPL data", findings: "AAPL revenue: $100B" }),
|
||||
makeRecord({ runId: "run-2", label: "MSFT data", findings: "MSFT revenue: $200B" }),
|
||||
];
|
||||
|
||||
const msg = formatCoalescedAnnouncementMessage(records, "Summarize all data and write a PDF investment report");
|
||||
|
||||
expect(msg).toContain("CONTINUATION TASK");
|
||||
expect(msg).toContain("Summarize all data and write a PDF investment report");
|
||||
expect(msg).toContain("AAPL revenue: $100B");
|
||||
expect(msg).toContain("MSFT revenue: $200B");
|
||||
// Should NOT contain the default summarize instruction
|
||||
expect(msg).not.toContain("Summarize these results naturally for the user");
|
||||
});
|
||||
|
||||
it("uses continuation prompt even for single record when next is provided", () => {
|
||||
const records = [
|
||||
makeRecord({ runId: "run-1", label: "Data collection", findings: "All data collected" }),
|
||||
];
|
||||
|
||||
const msg = formatCoalescedAnnouncementMessage(records, "Generate the final report");
|
||||
|
||||
expect(msg).toContain("CONTINUATION TASK");
|
||||
expect(msg).toContain("Generate the final report");
|
||||
expect(msg).toContain("All data collected");
|
||||
});
|
||||
|
||||
it("uses default summarize instruction when next is not provided", () => {
|
||||
const records = [
|
||||
makeRecord({ runId: "run-1" }),
|
||||
makeRecord({ runId: "run-2" }),
|
||||
];
|
||||
|
||||
const msg = formatCoalescedAnnouncementMessage(records);
|
||||
|
||||
expect(msg).not.toContain("CONTINUATION TASK");
|
||||
expect(msg).toContain("Summarize these results naturally for the user");
|
||||
});
|
||||
});
|
||||
|
|
|
|||
|
|
@ -193,12 +193,17 @@ export function formatAnnouncementMessage(params: FormatAnnouncementParams): str
|
|||
/**
|
||||
* Format a coalesced announcement message from multiple completed subagent runs.
|
||||
* When only one record is provided, delegates to formatAnnouncementMessage.
|
||||
*
|
||||
* @param next — Optional continuation prompt from a SubagentGroup. When present,
|
||||
* the parent agent is instructed to execute the continuation using the combined
|
||||
* findings, rather than just summarizing.
|
||||
*/
|
||||
export function formatCoalescedAnnouncementMessage(
|
||||
records: SubagentRunRecord[],
|
||||
next?: string,
|
||||
): string {
|
||||
// Single record: delegate to existing format for backward-compatible behavior
|
||||
if (records.length === 1) {
|
||||
// Single record without continuation: delegate to existing format
|
||||
if (records.length === 1 && !next) {
|
||||
const r = records[0]!;
|
||||
return formatAnnouncementMessage({
|
||||
runId: r.runId,
|
||||
|
|
@ -214,10 +219,9 @@ export function formatCoalescedAnnouncementMessage(
|
|||
});
|
||||
}
|
||||
|
||||
// Multiple records: build combined message.
|
||||
// Include a strict raw-findings section so parent can reliably cover every task result.
|
||||
// Multiple records (or single with continuation): build combined message.
|
||||
const parts: string[] = [
|
||||
`All ${records.length} background tasks have completed. Here are the combined results:`,
|
||||
`All ${records.length} background task(s) have completed. Here are the combined results:`,
|
||||
"",
|
||||
];
|
||||
|
||||
|
|
@ -262,14 +266,30 @@ export function formatCoalescedAnnouncementMessage(
|
|||
);
|
||||
}
|
||||
|
||||
parts.push(
|
||||
"",
|
||||
"Summarize these results naturally for the user.",
|
||||
"You MUST include findings from every task item above, without omission.",
|
||||
"Keep it concise, but preserve concrete findings from each task.",
|
||||
"Do not mention technical details like session IDs or that these were background tasks.",
|
||||
"You can respond with NO_REPLY if no announcement is needed.",
|
||||
);
|
||||
// Continuation vs. summarization
|
||||
if (next) {
|
||||
parts.push(
|
||||
"",
|
||||
"---",
|
||||
"",
|
||||
"CONTINUATION TASK: The user's original request requires further work using the findings above.",
|
||||
"Execute the following task now, using ALL the collected data:",
|
||||
"",
|
||||
next,
|
||||
"",
|
||||
"Use the raw findings above as your data source. Call tools as needed to complete this task.",
|
||||
"Do not mention technical details like session IDs or that these were background tasks.",
|
||||
);
|
||||
} else {
|
||||
parts.push(
|
||||
"",
|
||||
"Summarize these results naturally for the user.",
|
||||
"You MUST include findings from every task item above, without omission.",
|
||||
"Keep it concise, but preserve concrete findings from each task.",
|
||||
"Do not mention technical details like session IDs or that these were background tasks.",
|
||||
"You can respond with NO_REPLY if no announcement is needed.",
|
||||
);
|
||||
}
|
||||
|
||||
return parts.join("\n");
|
||||
}
|
||||
|
|
@ -289,8 +309,9 @@ export function formatCoalescedAnnouncementMessage(
|
|||
export function runCoalescedAnnounceFlow(
|
||||
requesterSessionId: string,
|
||||
records: SubagentRunRecord[],
|
||||
next?: string,
|
||||
): boolean {
|
||||
const message = formatCoalescedAnnouncementMessage(records);
|
||||
const message = formatCoalescedAnnouncementMessage(records, next);
|
||||
|
||||
try {
|
||||
const hub = getHub();
|
||||
|
|
|
|||
|
|
@ -12,6 +12,7 @@ const rmSyncMock = vi.fn();
|
|||
|
||||
vi.mock("./registry-store.js", () => ({
|
||||
loadSubagentRuns: loadSubagentRunsMock,
|
||||
loadSubagentGroups: vi.fn(() => new Map()),
|
||||
saveSubagentRuns: saveSubagentRunsMock,
|
||||
}));
|
||||
|
||||
|
|
|
|||
|
|
@ -7,7 +7,7 @@
|
|||
import { existsSync, mkdirSync, readFileSync, writeFileSync } from "node:fs";
|
||||
import { join } from "node:path";
|
||||
import { DATA_DIR } from "@multica/utils";
|
||||
import type { SubagentRunRecord } from "./types.js";
|
||||
import type { SubagentRunRecord, SubagentGroup } from "./types.js";
|
||||
|
||||
const SUBAGENTS_DIR = join(DATA_DIR, "subagents");
|
||||
const RUNS_FILE = join(SUBAGENTS_DIR, "runs.json");
|
||||
|
|
@ -15,6 +15,7 @@ const RUNS_FILE = join(SUBAGENTS_DIR, "runs.json");
|
|||
interface SubagentRunsStore {
|
||||
version: 1;
|
||||
runs: Record<string, SubagentRunRecord>;
|
||||
groups?: Record<string, SubagentGroup> | undefined;
|
||||
}
|
||||
|
||||
function ensureDir(): void {
|
||||
|
|
@ -48,13 +49,31 @@ export function loadSubagentRuns(): Map<string, SubagentRunRecord> {
|
|||
}
|
||||
}
|
||||
|
||||
/** Save all subagent runs to disk */
|
||||
export function saveSubagentRuns(runs: Map<string, SubagentRunRecord>): void {
|
||||
/** Load all persisted subagent groups */
|
||||
export function loadSubagentGroups(): Map<string, SubagentGroup> {
|
||||
if (!existsSync(RUNS_FILE)) return new Map();
|
||||
|
||||
try {
|
||||
const content = readFileSync(RUNS_FILE, "utf-8");
|
||||
const store = JSON.parse(content) as SubagentRunsStore;
|
||||
if (store.version !== 1 || !store.groups) return new Map();
|
||||
return new Map(Object.entries(store.groups));
|
||||
} catch {
|
||||
return new Map();
|
||||
}
|
||||
}
|
||||
|
||||
/** Save all subagent runs and groups to disk */
|
||||
export function saveSubagentRuns(
|
||||
runs: Map<string, SubagentRunRecord>,
|
||||
groups?: Map<string, SubagentGroup>,
|
||||
): void {
|
||||
ensureDir();
|
||||
|
||||
const store: SubagentRunsStore = {
|
||||
version: 1,
|
||||
runs: Object.fromEntries(runs),
|
||||
groups: groups && groups.size > 0 ? Object.fromEntries(groups) : undefined,
|
||||
};
|
||||
|
||||
writeFileSync(RUNS_FILE, JSON.stringify(store, null, 2), "utf-8");
|
||||
|
|
|
|||
|
|
@ -6,11 +6,12 @@
|
|||
*/
|
||||
|
||||
import { getHub, isHubInitialized } from "../../hub/hub-singleton.js";
|
||||
import { loadSubagentRuns, saveSubagentRuns } from "./registry-store.js";
|
||||
import { loadSubagentRuns, saveSubagentRuns, loadSubagentGroups } from "./registry-store.js";
|
||||
import { readLatestAssistantReply, runCoalescedAnnounceFlow } from "./announce.js";
|
||||
import type {
|
||||
RegisterSubagentRunParams,
|
||||
SubagentRunRecord,
|
||||
SubagentGroup,
|
||||
} from "./types.js";
|
||||
import { resolveSessionDir } from "../session/storage.js";
|
||||
import { rmSync } from "node:fs";
|
||||
|
|
@ -28,6 +29,7 @@ const SWEEP_INTERVAL_MS = 60 * 1000;
|
|||
// ============================================================================
|
||||
|
||||
const subagentRuns = new Map<string, SubagentRunRecord>();
|
||||
const subagentGroups = new Map<string, SubagentGroup>();
|
||||
let sweepTimer: ReturnType<typeof setInterval> | undefined;
|
||||
const resumedRequesters = new Set<string>();
|
||||
|
||||
|
|
@ -50,6 +52,12 @@ export function initSubagentRegistry(): void {
|
|||
}
|
||||
}
|
||||
|
||||
// Restore groups
|
||||
const persistedGroups = loadSubagentGroups();
|
||||
for (const [groupId, group] of persistedGroups) {
|
||||
subagentGroups.set(groupId, group);
|
||||
}
|
||||
|
||||
// Process incomplete runs
|
||||
const affectedRequesters = new Set<string>();
|
||||
|
||||
|
|
@ -91,6 +99,45 @@ export function initSubagentRegistry(): void {
|
|||
}
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Group management
|
||||
// ============================================================================
|
||||
|
||||
/** Create a new subagent group. Returns the group record. */
|
||||
export function createSubagentGroup(params: {
|
||||
groupId: string;
|
||||
requesterSessionId: string;
|
||||
label?: string;
|
||||
next?: string;
|
||||
}): SubagentGroup {
|
||||
const group: SubagentGroup = {
|
||||
groupId: params.groupId,
|
||||
requesterSessionId: params.requesterSessionId,
|
||||
label: params.label,
|
||||
next: params.next,
|
||||
createdAt: Date.now(),
|
||||
};
|
||||
subagentGroups.set(params.groupId, group);
|
||||
persist();
|
||||
return group;
|
||||
}
|
||||
|
||||
/** Get a group by ID. */
|
||||
export function getSubagentGroup(groupId: string): SubagentGroup | undefined {
|
||||
return subagentGroups.get(groupId);
|
||||
}
|
||||
|
||||
/** List all runs belonging to a group. */
|
||||
export function listGroupRuns(groupId: string): SubagentRunRecord[] {
|
||||
const result: SubagentRunRecord[] = [];
|
||||
for (const record of subagentRuns.values()) {
|
||||
if (record.groupId === groupId) {
|
||||
result.push(record);
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
/** Register a new subagent run and start tracking its lifecycle. */
|
||||
export function registerSubagentRun(params: RegisterSubagentRunParams): SubagentRunRecord {
|
||||
const {
|
||||
|
|
@ -102,6 +149,7 @@ export function registerSubagentRun(params: RegisterSubagentRunParams): Subagent
|
|||
cleanup = "delete",
|
||||
timeoutSeconds,
|
||||
announce,
|
||||
groupId,
|
||||
start,
|
||||
} = params;
|
||||
|
||||
|
|
@ -113,6 +161,7 @@ export function registerSubagentRun(params: RegisterSubagentRunParams): Subagent
|
|||
label,
|
||||
cleanup,
|
||||
announce,
|
||||
groupId,
|
||||
createdAt: Date.now(),
|
||||
};
|
||||
|
||||
|
|
@ -190,6 +239,7 @@ export function shutdownSubagentRegistry(): void {
|
|||
/** Reset all state (for testing). */
|
||||
export function resetSubagentRegistryForTests(): void {
|
||||
subagentRuns.clear();
|
||||
subagentGroups.clear();
|
||||
resumedRequesters.clear();
|
||||
stopSweeper();
|
||||
}
|
||||
|
|
@ -300,37 +350,59 @@ function captureFindings(record: SubagentRunRecord): void {
|
|||
/**
|
||||
* Phase 2: Announce completed-but-unannounced runs.
|
||||
*
|
||||
* Runs with announce="silent" are held back until ALL silent runs from the
|
||||
* same requester have completed. All other runs (immediate / undefined) are
|
||||
* announced per-completion as before.
|
||||
* Three announcement paths:
|
||||
* 1. Grouped runs — wait for all runs in the group to complete, then announce
|
||||
* together with the group's `next` continuation prompt (if any).
|
||||
* 2. Ungrouped silent runs — legacy behavior: wait for ALL silent runs from
|
||||
* the same requester to complete, then announce together.
|
||||
* 3. Ungrouped immediate runs — announce per-completion (default).
|
||||
*/
|
||||
function checkAndAnnounce(requesterSessionId: string): void {
|
||||
const allRuns = listSubagentRuns(requesterSessionId);
|
||||
|
||||
// ── Immediate runs: announce per-completion (default behavior) ──
|
||||
const immediateReady = allRuns.filter(
|
||||
// ── 1. Grouped runs: announce by group when all members complete ──
|
||||
const groupIds = new Set<string>();
|
||||
for (const r of allRuns) {
|
||||
if (r.groupId && !r.announced) groupIds.add(r.groupId);
|
||||
}
|
||||
|
||||
for (const groupId of groupIds) {
|
||||
const groupRuns = allRuns.filter(r => r.groupId === groupId);
|
||||
const unannounced = groupRuns.filter(r => !r.announced);
|
||||
const ready = unannounced.filter(r => r.endedAt !== undefined && r.findingsCaptured);
|
||||
|
||||
if (ready.length > 0 && ready.length === unannounced.length) {
|
||||
const group = subagentGroups.get(groupId);
|
||||
announceRuns(requesterSessionId, ready, group?.next);
|
||||
}
|
||||
}
|
||||
|
||||
// ── 2. Ungrouped runs: original immediate/silent logic ──
|
||||
const ungrouped = allRuns.filter(r => !r.groupId);
|
||||
|
||||
// Immediate: announce per-completion
|
||||
const immediateReady = ungrouped.filter(
|
||||
r => !r.announced && r.endedAt !== undefined && r.findingsCaptured && r.announce !== "silent",
|
||||
);
|
||||
if (immediateReady.length > 0) {
|
||||
announceGroup(requesterSessionId, immediateReady);
|
||||
announceRuns(requesterSessionId, immediateReady);
|
||||
}
|
||||
|
||||
// ── Silent runs: announce only when ALL silent runs are done ──
|
||||
const silentRuns = allRuns.filter(r => r.announce === "silent");
|
||||
// Silent: announce only when ALL ungrouped silent runs are done
|
||||
const silentRuns = ungrouped.filter(r => r.announce === "silent");
|
||||
const unannouncedSilent = silentRuns.filter(r => !r.announced);
|
||||
const silentReady = unannouncedSilent.filter(
|
||||
r => r.endedAt !== undefined && r.findingsCaptured,
|
||||
);
|
||||
|
||||
// All unannounced silent runs must be ready (ended + findings captured)
|
||||
if (silentReady.length > 0 && silentReady.length === unannouncedSilent.length) {
|
||||
announceGroup(requesterSessionId, silentReady);
|
||||
announceRuns(requesterSessionId, silentReady);
|
||||
}
|
||||
}
|
||||
|
||||
/** Announce a group of runs and mark them as announced. */
|
||||
function announceGroup(requesterSessionId: string, runs: SubagentRunRecord[]): void {
|
||||
const announced = runCoalescedAnnounceFlow(requesterSessionId, runs);
|
||||
/** Announce a batch of completed runs and mark them as announced. */
|
||||
function announceRuns(requesterSessionId: string, runs: SubagentRunRecord[], next?: string): void {
|
||||
const announced = runCoalescedAnnounceFlow(requesterSessionId, runs, next);
|
||||
|
||||
if (announced) {
|
||||
for (const r of runs) {
|
||||
|
|
@ -415,9 +487,18 @@ function sweep(): void {
|
|||
}
|
||||
}
|
||||
|
||||
// Clean up groups whose runs have all been archived
|
||||
for (const [groupId] of subagentGroups) {
|
||||
const hasActiveRuns = [...subagentRuns.values()].some(r => r.groupId === groupId);
|
||||
if (!hasActiveRuns) {
|
||||
subagentGroups.delete(groupId);
|
||||
removed++;
|
||||
}
|
||||
}
|
||||
|
||||
if (removed > 0) {
|
||||
persist();
|
||||
console.log(`[SubagentRegistry] Archived ${removed} completed run(s)`);
|
||||
console.log(`[SubagentRegistry] Archived ${removed} completed run(s)/group(s)`);
|
||||
}
|
||||
|
||||
if (subagentRuns.size === 0) {
|
||||
|
|
@ -431,7 +512,7 @@ function sweep(): void {
|
|||
|
||||
function persist(): void {
|
||||
try {
|
||||
saveSubagentRuns(subagentRuns);
|
||||
saveSubagentRuns(subagentRuns, subagentGroups);
|
||||
} catch (err) {
|
||||
console.error(`[SubagentRegistry] Failed to persist runs:`, err);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -11,6 +11,26 @@ export type SubagentRunOutcome = {
|
|||
error?: string | undefined;
|
||||
};
|
||||
|
||||
/**
|
||||
* A logical group of subagent runs that are tracked together.
|
||||
* Groups enable "collect all, then act" workflows:
|
||||
* all runs in a group must complete before the combined results
|
||||
* (plus an optional `next` continuation) are announced to the parent.
|
||||
*/
|
||||
export type SubagentGroup = {
|
||||
/** Unique group identifier (UUIDv7) */
|
||||
groupId: string;
|
||||
/** Session ID of the parent (requester) agent */
|
||||
requesterSessionId: string;
|
||||
/** Optional human-readable label for the group */
|
||||
label?: string | undefined;
|
||||
/** Continuation prompt executed after all runs in the group complete.
|
||||
* Injected into the announcement so the parent agent acts on the combined findings. */
|
||||
next?: string | undefined;
|
||||
/** Timestamp when the group was created */
|
||||
createdAt: number;
|
||||
};
|
||||
|
||||
/** Persistent record tracking a single subagent run */
|
||||
export type SubagentRunRecord = {
|
||||
/** Unique run identifier (UUIDv7) */
|
||||
|
|
@ -48,6 +68,9 @@ export type SubagentRunRecord = {
|
|||
/** Announcement mode: "immediate" (default) announces per-completion,
|
||||
* "silent" defers until all silent runs from the same requester complete. */
|
||||
announce?: "immediate" | "silent" | undefined;
|
||||
/** Group ID this run belongs to (if any). Runs in a group are announced
|
||||
* together when all complete, regardless of the `announce` field. */
|
||||
groupId?: string | undefined;
|
||||
};
|
||||
|
||||
/** Parameters for registering a new subagent run */
|
||||
|
|
@ -63,6 +86,12 @@ export type RegisterSubagentRunParams = {
|
|||
start?: (() => void) | undefined;
|
||||
/** Announcement mode: "immediate" (default) or "silent" (defer until all silent runs complete). */
|
||||
announce?: "immediate" | "silent" | undefined;
|
||||
/** Group ID to join. Runs in a group are announced together when all complete. */
|
||||
groupId?: string | undefined;
|
||||
/** Continuation prompt for the group. Only used on group creation (first spawn).
|
||||
* After all runs in the group complete, this prompt is included in the announcement
|
||||
* so the parent agent can act on the combined findings (e.g. summarize, write PDF). */
|
||||
next?: string | undefined;
|
||||
};
|
||||
|
||||
/** Parameters for the announce flow */
|
||||
|
|
|
|||
|
|
@ -262,23 +262,47 @@ export function buildConditionalToolSections(
|
|||
lines.push(
|
||||
"## Sub-Agents",
|
||||
"If a task is complex or long-running, spawn a sub-agent. It will do the work and report back when done.",
|
||||
"IMPORTANT: After spawning sub-agents, do NOT immediately check on them with sessions_list. " +
|
||||
"Results are delivered directly into your context automatically when the sub-agent finishes. " +
|
||||
"Continue with other tasks or finish your turn and wait for the results to arrive.",
|
||||
"You may use sessions_list to check on sub-agents only if a long time has passed or the user explicitly asks about their status.",
|
||||
"Sub-agents cannot spawn nested sub-agents.",
|
||||
"",
|
||||
"### Critical Rules",
|
||||
"- **NEVER fabricate, guess, or make up data that a sub-agent has not yet returned.** " +
|
||||
"This includes completion status — do NOT claim tasks are done until you receive actual results.",
|
||||
"- After spawning, do NOT proceed with work that depends on the sub-agent results. " +
|
||||
"You can still chat with the user, do unrelated tasks, or explain what the sub-agents are working on.",
|
||||
"- Sub-agents cannot spawn nested sub-agents.",
|
||||
"- You can use `sessions_list` to check sub-agent status if needed.",
|
||||
"",
|
||||
"### Groups and Continuation (`next`) — ALWAYS use for multi-agent tasks",
|
||||
"When spawning multiple sub-agents, **always** use `next` to define the follow-up work. " +
|
||||
"This is the standard pattern — do NOT use bare `announce: \"silent\"` for multi-agent collect-then-act workflows.",
|
||||
"",
|
||||
"```",
|
||||
"// First spawn — creates a group automatically, returns groupId",
|
||||
'sessions_spawn({ task: "Get AAPL financials", next: "Summarize all data and write a PDF report", label: "AAPL" })',
|
||||
"// → { groupId: \"grp-abc\", runId: \"...\" }",
|
||||
"",
|
||||
"// Subsequent spawns — join the same group",
|
||||
'sessions_spawn({ task: "Get MSFT financials", groupId: "grp-abc", label: "MSFT" })',
|
||||
'sessions_spawn({ task: "Get GOOG financials", groupId: "grp-abc", label: "GOOG" })',
|
||||
"```",
|
||||
"",
|
||||
"The system waits for ALL runs in the group to complete, then delivers the combined findings " +
|
||||
"plus the `next` continuation prompt back to you. You can then use tools (write files, call APIs, etc.) " +
|
||||
"to complete the follow-up work. The user is NOT blocked during this process — they can keep chatting.",
|
||||
"",
|
||||
"Use `next` whenever the user's request involves: collect data → then act on it (summarize, analyze, generate files).",
|
||||
"Without `next`, findings are summarized but no further action is taken.",
|
||||
"",
|
||||
"### Announce Modes (when not using groups)",
|
||||
"- `announce: \"immediate\"` (default): findings delivered per sub-agent as each completes.",
|
||||
"- `announce: \"silent\"`: all findings held until every silent sub-agent finishes, then delivered together.",
|
||||
"Groups always use silent collection internally — you don't need to set announce when using groupId.",
|
||||
"",
|
||||
"### Timeout Guidelines",
|
||||
"Set timeoutSeconds generously — a sub-agent that times out loses all its work.",
|
||||
"- Simple tasks (search, read, summarize): 600 (10 min, the default)",
|
||||
"- Moderate tasks (multi-step research, file downloads + analysis): 900–1200 (15–20 min)",
|
||||
"- Complex tasks (code generation, PDF creation, multi-file operations): 1200–1800 (20–30 min)",
|
||||
"When in doubt, use a longer timeout. It is always better to wait longer than to lose completed work.",
|
||||
"",
|
||||
"### Announce Modes",
|
||||
"- `announce: \"immediate\"` (default): Each sub-agent's findings are delivered to you as soon as it completes.",
|
||||
"- `announce: \"silent\"`: All findings are held back until every silent sub-agent finishes, then delivered as ONE combined report.",
|
||||
"Use \"silent\" when you want to collect data from multiple sub-agents first, then summarize everything at once.",
|
||||
"When in doubt, use a longer timeout.",
|
||||
"",
|
||||
);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -7,7 +7,7 @@
|
|||
|
||||
import { Type } from "@sinclair/typebox";
|
||||
import type { AgentTool } from "@mariozechner/pi-agent-core";
|
||||
import { listSubagentRuns, getSubagentRun } from "../subagent/registry.js";
|
||||
import { listSubagentRuns, getSubagentRun, getSubagentGroup } from "../subagent/registry.js";
|
||||
import type { SubagentRunRecord } from "../subagent/types.js";
|
||||
|
||||
const SessionsListSchema = Type.Object({
|
||||
|
|
@ -79,6 +79,11 @@ function formatRunDetail(record: SubagentRunRecord, now: number): string {
|
|||
];
|
||||
|
||||
if (record.label) lines.push(`Label: ${record.label}`);
|
||||
if (record.groupId) {
|
||||
const group = getSubagentGroup(record.groupId);
|
||||
lines.push(`Group: ${record.groupId}${group?.label ? ` (${group.label})` : ""}`);
|
||||
if (group?.next) lines.push(`Continuation: ${group.next.slice(0, 120)}${group.next.length > 120 ? "…" : ""}`);
|
||||
}
|
||||
lines.push(`Task: ${record.task}`);
|
||||
lines.push(`Status: ${status}${record.outcome?.error ? ` — ${record.outcome.error}` : ""}`);
|
||||
lines.push(`Child Session: ${record.childSessionId}`);
|
||||
|
|
@ -128,8 +133,7 @@ export function createSessionsListTool(
|
|||
description:
|
||||
"List all subagent runs spawned by this session and their current status. " +
|
||||
"Optionally pass a runId to get detailed information about a specific run. " +
|
||||
"NOTE: Do NOT call this immediately after spawning subagents — results arrive automatically in your context when subagents complete. " +
|
||||
"Only use this if a long time has passed or the user explicitly asks about subagent status.",
|
||||
"Use this to check subagent progress or when the user asks about status.",
|
||||
parameters: SessionsListSchema,
|
||||
execute: async (_toolCallId, args) => {
|
||||
const { runId } = args as SessionsListArgs;
|
||||
|
|
@ -177,21 +181,59 @@ export function createSessionsListTool(
|
|||
|
||||
const someRunning = runs.some((r) => !r.endedAt);
|
||||
|
||||
// Build status lines for each run
|
||||
// Build status lines, grouping runs by groupId
|
||||
const statusLines: string[] = [];
|
||||
for (let i = 0; i < runs.length; i++) {
|
||||
const r = runs[i]!;
|
||||
const groupedRuns = new Map<string, SubagentRunRecord[]>();
|
||||
const ungroupedRuns: SubagentRunRecord[] = [];
|
||||
|
||||
for (const r of runs) {
|
||||
if (r.groupId) {
|
||||
const list = groupedRuns.get(r.groupId) ?? [];
|
||||
list.push(r);
|
||||
groupedRuns.set(r.groupId, list);
|
||||
} else {
|
||||
ungroupedRuns.push(r);
|
||||
}
|
||||
}
|
||||
|
||||
let idx = 0;
|
||||
|
||||
// Grouped runs
|
||||
for (const [gId, gRuns] of groupedRuns) {
|
||||
const group = getSubagentGroup(gId);
|
||||
const groupLabel = group?.label || `Group ${gId.slice(0, 8)}…`;
|
||||
const done = gRuns.filter(r => r.endedAt).length;
|
||||
const nextSnippet = group?.next ? ` → next: "${group.next.slice(0, 60)}${group.next.length > 60 ? "…" : ""}"` : "";
|
||||
statusLines.push(`\n 📦 ${groupLabel} (${done}/${gRuns.length} done${nextSnippet})`);
|
||||
|
||||
for (const r of gRuns) {
|
||||
idx++;
|
||||
const displayName = r.label || r.task.slice(0, 60);
|
||||
const status = resolveStatus(r);
|
||||
if (status === "running") {
|
||||
const elapsed = r.startedAt ? formatElapsed(now - r.startedAt) : "just spawned";
|
||||
statusLines.push(` ${idx}. [RUNNING] "${displayName}" (${elapsed})`);
|
||||
} else {
|
||||
const elapsed = r.startedAt && r.endedAt ? formatElapsed(r.endedAt - r.startedAt) : "";
|
||||
statusLines.push(` ${idx}. [${status.toUpperCase()}] "${displayName}" (${elapsed})`);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Ungrouped runs
|
||||
for (const r of ungroupedRuns) {
|
||||
idx++;
|
||||
const displayName = r.label || r.task.slice(0, 60);
|
||||
const status = resolveStatus(r);
|
||||
if (status === "running") {
|
||||
const elapsed = r.startedAt ? formatElapsed(now - r.startedAt) : "just spawned";
|
||||
statusLines.push(` ${i + 1}. [RUNNING] "${displayName}" (${elapsed})`);
|
||||
statusLines.push(` ${idx}. [RUNNING] "${displayName}" (${elapsed})`);
|
||||
} else {
|
||||
const elapsed = r.startedAt && r.endedAt ? formatElapsed(r.endedAt - r.startedAt) : "";
|
||||
const findings = r.findingsCaptured
|
||||
? (r.findings ? r.findings.slice(0, 200) + (r.findings.length > 200 ? "…" : "") : "(no output)")
|
||||
: "(findings not yet captured)";
|
||||
statusLines.push(` ${i + 1}. [${status.toUpperCase()}] "${displayName}" (${elapsed})\n Findings: ${findings}`);
|
||||
statusLines.push(` ${idx}. [${status.toUpperCase()}] "${displayName}" (${elapsed})\n Findings: ${findings}`);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -10,7 +10,7 @@ import { Type } from "@sinclair/typebox";
|
|||
import type { AgentTool } from "@mariozechner/pi-agent-core";
|
||||
import { getHub } from "../../hub/hub-singleton.js";
|
||||
import { buildSubagentSystemPrompt } from "../subagent/announce.js";
|
||||
import { registerSubagentRun } from "../subagent/registry.js";
|
||||
import { registerSubagentRun, createSubagentGroup, getSubagentGroup } from "../subagent/registry.js";
|
||||
import { resolveTools } from "../tools.js";
|
||||
|
||||
const SessionsSpawnSchema = Type.Object({
|
||||
|
|
@ -41,7 +41,26 @@ const SessionsSpawnSchema = Type.Object({
|
|||
"Announcement mode. 'immediate' (default): findings delivered as each subagent completes. " +
|
||||
"'silent': defer all announcements until every silent subagent from this session finishes, " +
|
||||
"then deliver one combined report. Use 'silent' when spawning multiple subagents to collect " +
|
||||
"data in parallel and you want to summarize everything at once.",
|
||||
"data in parallel and you want to summarize everything at once. " +
|
||||
"Ignored when groupId is provided (groups always collect all results before announcing).",
|
||||
}),
|
||||
),
|
||||
groupId: Type.Optional(
|
||||
Type.String({
|
||||
description:
|
||||
"Join an existing group. Pass the groupId returned by a previous sessions_spawn call " +
|
||||
"to add this subagent to the same group. All runs in a group are announced together " +
|
||||
"when the last one completes. If omitted AND 'next' is provided, a new group is created automatically.",
|
||||
}),
|
||||
),
|
||||
next: Type.Optional(
|
||||
Type.String({
|
||||
description:
|
||||
"Continuation task to execute after ALL subagents in the group complete. " +
|
||||
"Only used when creating a new group (first spawn without groupId). " +
|
||||
"When set, the combined findings from all subagents plus this 'next' prompt " +
|
||||
"are delivered to you so you can perform follow-up work (e.g. summarize, generate reports, write files). " +
|
||||
"Setting 'next' automatically creates a group and implies silent collection.",
|
||||
}),
|
||||
),
|
||||
});
|
||||
|
|
@ -53,12 +72,15 @@ type SessionsSpawnArgs = {
|
|||
cleanup?: "delete" | "keep";
|
||||
timeoutSeconds?: number;
|
||||
announce?: "immediate" | "silent";
|
||||
groupId?: string;
|
||||
next?: string;
|
||||
};
|
||||
|
||||
export type SessionsSpawnResult = {
|
||||
status: "accepted" | "error";
|
||||
childSessionId?: string;
|
||||
runId?: string;
|
||||
groupId?: string;
|
||||
error?: string;
|
||||
};
|
||||
|
||||
|
|
@ -79,13 +101,15 @@ export function createSessionsSpawnTool(
|
|||
label: "Spawn Subagent",
|
||||
description:
|
||||
"Spawn a background subagent to handle a specific task. The subagent runs in an isolated session with its own tool set. " +
|
||||
"When it completes, its findings are delivered directly into your context automatically — you do NOT need to poll or check. " +
|
||||
"IMPORTANT: After spawning subagents, continue with any other immediate tasks you have, or simply finish your turn and wait. " +
|
||||
"Do NOT call sessions_list to check on subagents you just spawned — results take time and will arrive on their own. " +
|
||||
"When it completes, its findings are delivered directly into your context automatically. " +
|
||||
"After spawning, do NOT proceed with work that depends on the results — but you can still chat or do unrelated tasks. " +
|
||||
"When spawning multiple subagents for a collect-then-act workflow, ALWAYS use the `next` parameter " +
|
||||
"on the first spawn to define follow-up work, then pass the returned groupId to subsequent spawns. " +
|
||||
"Use this for parallelizable work, long-running analysis, or tasks that benefit from isolation.",
|
||||
parameters: SessionsSpawnSchema,
|
||||
execute: async (_toolCallId, args) => {
|
||||
const { task, label, model, cleanup = "delete", timeoutSeconds, announce } = args as SessionsSpawnArgs;
|
||||
const { task, label, model, cleanup = "delete", timeoutSeconds, announce, next } = args as SessionsSpawnArgs;
|
||||
let { groupId } = args as SessionsSpawnArgs;
|
||||
|
||||
// Guard: subagents cannot spawn subagents
|
||||
if (options.isSubagent) {
|
||||
|
|
@ -102,6 +126,28 @@ export function createSessionsSpawnTool(
|
|||
const runId = uuidv7();
|
||||
const childSessionId = uuidv7();
|
||||
|
||||
// Validate groupId if provided
|
||||
if (groupId) {
|
||||
const existingGroup = getSubagentGroup(groupId);
|
||||
if (!existingGroup) {
|
||||
return {
|
||||
content: [{ type: "text", text: `Error: group not found: ${groupId}. Use the groupId returned by a previous sessions_spawn call.` }],
|
||||
details: { status: "error", error: `group not found: ${groupId}` },
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
// Auto-create group when `next` is provided without an existing groupId
|
||||
if (!groupId && next) {
|
||||
groupId = uuidv7();
|
||||
createSubagentGroup({
|
||||
groupId,
|
||||
requesterSessionId,
|
||||
label: label ? `Group: ${label}` : undefined,
|
||||
next,
|
||||
});
|
||||
}
|
||||
|
||||
// Resolve tools for the subagent (with isSubagent=true for policy filtering)
|
||||
const subagentTools = resolveTools({ isSubagent: true });
|
||||
const toolNames = subagentTools.map((t) => t.name);
|
||||
|
|
@ -135,21 +181,27 @@ export function createSessionsSpawnTool(
|
|||
label,
|
||||
cleanup,
|
||||
timeoutSeconds,
|
||||
announce,
|
||||
announce: groupId ? "silent" : announce,
|
||||
groupId,
|
||||
start: () => childAgent.write(task),
|
||||
});
|
||||
|
||||
// Build response text
|
||||
const groupInfo = groupId ? `\nGroup: ${groupId}` : "";
|
||||
const nextInfo = next ? `\nContinuation: "${next.slice(0, 100)}${next.length > 100 ? "…" : ""}"` : "";
|
||||
const responseText =
|
||||
`Subagent spawned: ${label || task.slice(0, 80)}\n` +
|
||||
`Run: ${runId}${groupInfo}${nextInfo}\n\n` +
|
||||
`⏳ WAITING FOR RESULTS — do NOT proceed with work that depends on these results.\n` +
|
||||
`Do NOT fabricate data or completion status. Results will arrive in your context automatically.`;
|
||||
|
||||
return {
|
||||
content: [
|
||||
{
|
||||
type: "text",
|
||||
text: `Subagent spawned successfully.\n\nRun ID: ${runId}\nSession: ${childSessionId}\nTask: ${label || task.slice(0, 80)}\n\nThe subagent is now working in the background. Its findings will be delivered directly into your context when it completes — do NOT poll or call sessions_list for it. Continue with other tasks or finish your turn.`,
|
||||
},
|
||||
],
|
||||
content: [{ type: "text", text: responseText }],
|
||||
details: {
|
||||
status: "accepted",
|
||||
childSessionId,
|
||||
runId,
|
||||
groupId,
|
||||
},
|
||||
};
|
||||
} catch (err) {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue