Merge pull request #209 from multica-ai/forrestchang/subagent-redesign
feat(agent): replace sessions_spawn with synchronous delegate tool
This commit is contained in:
commit
faa54fc671
19 changed files with 420 additions and 899 deletions
|
|
@ -7,7 +7,7 @@
|
|||
*/
|
||||
|
||||
import { join } from "node:path";
|
||||
import { Agent, Hub, listSubagentRuns } from "@multica/core";
|
||||
import { Agent, Hub } from "@multica/core";
|
||||
import type { AgentOptions } from "@multica/core";
|
||||
import type { ToolsConfig } from "@multica/core";
|
||||
import { DATA_DIR } from "@multica/utils";
|
||||
|
|
@ -238,52 +238,8 @@ export async function runCommand(args: string[]): Promise<void> {
|
|||
console.error(`Error: ${result.error}`);
|
||||
process.exitCode = 1;
|
||||
}
|
||||
|
||||
// Wait for sub-agents to complete and parent to process their results.
|
||||
// Without this, CLI exits before sub-agent announcements are delivered.
|
||||
await waitForSubagents(agent);
|
||||
} finally {
|
||||
hub.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait for any running sub-agents to complete, then output their findings.
|
||||
*
|
||||
* In CLI mode, the parent Agent is not registered with the Hub, so the normal
|
||||
* announce flow (Hub → writeInternal) can't deliver results. Instead, we poll
|
||||
* the registry and print findings directly once all sub-agents finish.
|
||||
*
|
||||
* Max wait: 30 minutes (matches default sub-agent timeout).
|
||||
*/
|
||||
async function waitForSubagents(agent: Agent): Promise<void> {
|
||||
const MAX_WAIT_MS = 30 * 60 * 1000;
|
||||
const POLL_INTERVAL_MS = 2000;
|
||||
const start = Date.now();
|
||||
|
||||
const allRuns = listSubagentRuns(agent.sessionId);
|
||||
if (allRuns.length === 0) return;
|
||||
|
||||
// Phase 1: Wait for all sub-agent runs to finish
|
||||
while (Date.now() - start < MAX_WAIT_MS) {
|
||||
const runs = listSubagentRuns(agent.sessionId);
|
||||
const running = runs.filter((r) => !r.endedAt);
|
||||
if (running.length === 0) break;
|
||||
console.error(dim(`[waiting for ${running.length} sub-agent(s)...]`));
|
||||
await new Promise((r) => setTimeout(r, POLL_INTERVAL_MS));
|
||||
}
|
||||
|
||||
// Phase 2: Output sub-agent findings directly (bypasses Hub announce flow)
|
||||
const completedRuns = listSubagentRuns(agent.sessionId).filter((r) => r.endedAt);
|
||||
if (completedRuns.length === 0) return;
|
||||
|
||||
console.error(dim(`[${completedRuns.length} sub-agent(s) completed]`));
|
||||
|
||||
for (const run of completedRuns) {
|
||||
const displayName = run.label || run.task.slice(0, 60);
|
||||
const status = run.outcome?.status ?? "unknown";
|
||||
const findings = run.findings || "(no output)";
|
||||
console.log(`\n--- Sub-agent: ${displayName} [${status}] ---`);
|
||||
console.log(findings);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -13,7 +13,7 @@ const TOOL_GROUPS: Record<string, string[]> = {
|
|||
'group:runtime': ['exec', 'process'],
|
||||
'group:web': ['web_search', 'web_fetch'],
|
||||
'group:memory': ['memory_search'],
|
||||
'group:subagent': ['sessions_spawn'],
|
||||
'group:subagent': ['delegate'],
|
||||
'group:cron': ['cron'],
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -185,7 +185,7 @@ describe("Phase 2 E2E: Artifact-Aware Pruning", () => {
|
|||
|
||||
describe("Phase 2 E2E: Summary Fallback Artifact Extraction", () => {
|
||||
// UC4: summary fallback extracts artifact references
|
||||
it("UC4: summary fallback includes 'Saved Artifacts' section with all artifact refs", async () => {
|
||||
it("UC4: summary fallback includes 'Saved Artifacts' section with all artifact refs", { timeout: 15_000 }, async () => {
|
||||
const mod = await import("./summary-fallback.js");
|
||||
|
||||
const messages: AgentMessage[] = [
|
||||
|
|
|
|||
|
|
@ -13,6 +13,8 @@ export * from "./tools.js";
|
|||
export * from "./tools/policy.js";
|
||||
export * from "./tools/groups.js";
|
||||
export * from "./extract-text.js";
|
||||
// @deprecated — Old subagent registry. Use `delegate` tool instead.
|
||||
// Kept temporarily for desktop app compatibility.
|
||||
export {
|
||||
listSubagentRuns,
|
||||
getSubagentRun,
|
||||
|
|
@ -23,6 +25,7 @@ export type {
|
|||
SubagentRunOutcome,
|
||||
SubagentGroup,
|
||||
} from "./subagent/types.js";
|
||||
|
||||
export {
|
||||
readClaudeCliCredentials,
|
||||
readCodexCliCredentials,
|
||||
|
|
|
|||
|
|
@ -45,6 +45,16 @@
|
|||
* - `compaction_detail` — Detailed compaction breakdown.
|
||||
* Fields: pre_pruning_tokens, post_compaction_tokens, messages_removed, reason, pruning_applied
|
||||
*
|
||||
* ### Delegation (Sub-Agents)
|
||||
* - `delegate_start` — Delegate tool invoked.
|
||||
* Fields: task_count, timeout_seconds, labels[]
|
||||
* - `delegate_task_start` — Individual sub-agent task begins.
|
||||
* Fields: index, label, task (first 200 chars)
|
||||
* - `delegate_task_end` — Individual sub-agent task completes.
|
||||
* Fields: index, label, status (ok|error|timeout), duration_ms, findings_chars, error?
|
||||
* - `delegate_end` — All delegated tasks complete.
|
||||
* Fields: task_count, ok, errors, timeouts, total_duration_ms
|
||||
*
|
||||
* ### Error Recovery
|
||||
* - `context_overflow` — Context window overflow detected.
|
||||
* Fields: attempt, messages_before
|
||||
|
|
|
|||
|
|
@ -393,11 +393,10 @@ export class Agent {
|
|||
const mergedToolsConfig = mergeToolsConfig(profileToolsConfig, options.tools);
|
||||
const profileDir = this.profile?.getProfileDir();
|
||||
// Use this.sessionId (which may be auto-generated) instead of options.sessionId
|
||||
// (which may be undefined). Without this, sessions_list and sessions_spawn
|
||||
// can't find sub-agent runs because they have no session context.
|
||||
// (which may be undefined). Without this, delegate tool has no session context.
|
||||
this.toolsOptions = mergedToolsConfig
|
||||
? { ...options, sessionId: this.sessionId, cwd: effectiveCwd, tools: mergedToolsConfig, profileDir, provider: this.resolvedProvider }
|
||||
: { ...options, sessionId: this.sessionId, cwd: effectiveCwd, profileDir, provider: this.resolvedProvider };
|
||||
? { ...options, sessionId: this.sessionId, cwd: effectiveCwd, tools: mergedToolsConfig, profileDir, provider: this.resolvedProvider, runLog: this.runLog }
|
||||
: { ...options, sessionId: this.sessionId, cwd: effectiveCwd, profileDir, provider: this.resolvedProvider, runLog: this.runLog };
|
||||
|
||||
const tools = resolveTools(this.toolsOptions);
|
||||
if (this.debug) {
|
||||
|
|
@ -1242,10 +1241,10 @@ export class Agent {
|
|||
|
||||
// Update internal state
|
||||
this.resolvedProvider = providerId;
|
||||
// Keep toolsOptions.provider in sync so sessions_spawn inherits the current provider
|
||||
// Keep toolsOptions.provider in sync so delegate tool inherits the current provider
|
||||
this.toolsOptions = { ...this.toolsOptions, provider: providerId };
|
||||
|
||||
// Reload tools so sessions_spawn picks up the new provider in its closure.
|
||||
// Reload tools so delegate picks up the new provider in its closure.
|
||||
// Without this, the existing tool instance still captures the old provider.
|
||||
const tools = resolveTools(this.toolsOptions);
|
||||
this.agent.setTools(tools);
|
||||
|
|
|
|||
|
|
@ -11,7 +11,7 @@ const PROFILE = {
|
|||
config: { name: "TestAgent" },
|
||||
};
|
||||
|
||||
const TOOLS = ["read", "write", "edit", "glob", "exec", "sessions_spawn", "web_search"];
|
||||
const TOOLS = ["read", "write", "edit", "glob", "exec", "delegate", "web_search"];
|
||||
|
||||
describe("buildSystemPrompt", () => {
|
||||
// ── Full mode ─────────────────────────────────────────────────────────
|
||||
|
|
@ -43,9 +43,9 @@ describe("buildSystemPrompt", () => {
|
|||
expect(result).toContain("## Tool Call Style");
|
||||
});
|
||||
|
||||
it("full mode includes sub-agents section when sessions_spawn present", () => {
|
||||
const result = buildSystemPrompt({ mode: "full", tools: ["sessions_spawn"] });
|
||||
expect(result).toContain("## Sub-Agents");
|
||||
it("full mode includes delegation section when delegate present", () => {
|
||||
const result = buildSystemPrompt({ mode: "full", tools: ["delegate"] });
|
||||
expect(result).toContain("## Delegation");
|
||||
});
|
||||
|
||||
it("full mode includes web access section when web tools present", () => {
|
||||
|
|
@ -131,9 +131,9 @@ describe("buildSystemPrompt", () => {
|
|||
expect(result).not.toContain("## Skills");
|
||||
});
|
||||
|
||||
it("minimal mode excludes sub-agents section even with sessions_spawn", () => {
|
||||
const result = buildSystemPrompt({ mode: "minimal", tools: ["sessions_spawn"] });
|
||||
expect(result).not.toContain("## Sub-Agents");
|
||||
it("minimal mode excludes delegation section even with delegate", () => {
|
||||
const result = buildSystemPrompt({ mode: "minimal", tools: ["delegate"] });
|
||||
expect(result).not.toContain("## Delegation");
|
||||
});
|
||||
|
||||
// ── None mode ─────────────────────────────────────────────────────────
|
||||
|
|
|
|||
|
|
@ -167,14 +167,14 @@ describe("buildToolCallStyleSection", () => {
|
|||
});
|
||||
|
||||
describe("buildConditionalToolSections", () => {
|
||||
it("includes sub-agents section when sessions_spawn present in full mode", () => {
|
||||
const result = buildConditionalToolSections(["sessions_spawn"], "full");
|
||||
expect(result.join("\n")).toContain("## Sub-Agents");
|
||||
it("includes delegation section when delegate present in full mode", () => {
|
||||
const result = buildConditionalToolSections(["delegate"], "full");
|
||||
expect(result.join("\n")).toContain("## Delegation");
|
||||
});
|
||||
|
||||
it("excludes sub-agents section in minimal mode", () => {
|
||||
const result = buildConditionalToolSections(["sessions_spawn"], "minimal");
|
||||
expect(result.join("\n")).not.toContain("## Sub-Agents");
|
||||
it("excludes delegation section in minimal mode", () => {
|
||||
const result = buildConditionalToolSections(["delegate"], "minimal");
|
||||
expect(result.join("\n")).not.toContain("## Delegation");
|
||||
});
|
||||
|
||||
it("includes web access section when web tools present", () => {
|
||||
|
|
|
|||
|
|
@ -28,7 +28,7 @@ const CORE_TOOL_SUMMARIES: Record<string, string> = {
|
|||
web_search: "Search the web via Devv Search",
|
||||
web_fetch: "Fetch and extract readable content from a URL",
|
||||
memory_search: "Search memory files by keyword",
|
||||
sessions_spawn: "Spawn a sub-agent session",
|
||||
delegate: "Run tasks in parallel via sub-agents",
|
||||
data: "Query structured financial and market data",
|
||||
};
|
||||
|
||||
|
|
@ -43,7 +43,7 @@ const TOOL_ORDER = [
|
|||
"web_search",
|
||||
"web_fetch",
|
||||
"memory_search",
|
||||
"sessions_spawn",
|
||||
"delegate",
|
||||
"data",
|
||||
];
|
||||
|
||||
|
|
@ -313,52 +313,34 @@ export function buildConditionalToolSections(
|
|||
);
|
||||
}
|
||||
|
||||
// Subagent tools (full mode only — minimal agents cannot spawn)
|
||||
if (mode === "full" && toolSet.has("sessions_spawn")) {
|
||||
// Delegate tool (full mode only — sub-agents cannot delegate)
|
||||
if (mode === "full" && toolSet.has("delegate")) {
|
||||
lines.push(
|
||||
"## Sub-Agents",
|
||||
"If a task is complex or long-running, spawn a sub-agent. It will do the work and report back when done.",
|
||||
"## Delegation (Sub-Agents)",
|
||||
"Use `delegate` to run tasks in parallel via isolated sub-agents. " +
|
||||
"Each task gets its own agent with full tool access. Results are returned directly when all tasks complete.",
|
||||
"",
|
||||
"### Critical Rules",
|
||||
"- **NEVER fabricate, guess, or make up data that a sub-agent has not yet returned.** " +
|
||||
"This includes completion status — do NOT claim tasks are done until you receive actual results.",
|
||||
"- After spawning, do NOT proceed with work that depends on the sub-agent results. " +
|
||||
"You can still chat with the user, do unrelated tasks, or explain what the sub-agents are working on.",
|
||||
"- Sub-agents cannot spawn nested sub-agents.",
|
||||
"- You can use `sessions_list` to check sub-agent status if needed.",
|
||||
"",
|
||||
"### Groups and Continuation (`next`) — ALWAYS use for multi-agent tasks",
|
||||
"When spawning multiple sub-agents, **always** use `next` to define the follow-up work. " +
|
||||
"This is the standard pattern — do NOT use bare `announce: \"silent\"` for multi-agent collect-then-act workflows.",
|
||||
"### When to Use",
|
||||
"- Collecting data from multiple independent sources (e.g. research 3 stocks simultaneously)",
|
||||
"- Comparative analysis that can be parallelized (e.g. analyze 5 error logs in parallel)",
|
||||
"- Any task where independent sub-tasks would benefit from parallel execution",
|
||||
"",
|
||||
"### Example",
|
||||
"```",
|
||||
"// First spawn — creates a group automatically, returns groupId",
|
||||
'sessions_spawn({ task: "Get AAPL financials", next: "Summarize all data and write a PDF report", label: "AAPL" })',
|
||||
"// → { groupId: \"grp-abc\", runId: \"...\" }",
|
||||
"",
|
||||
"// Subsequent spawns — join the same group",
|
||||
'sessions_spawn({ task: "Get MSFT financials", groupId: "grp-abc", label: "MSFT" })',
|
||||
'sessions_spawn({ task: "Get GOOG financials", groupId: "grp-abc", label: "GOOG" })',
|
||||
"delegate({",
|
||||
' tasks: [',
|
||||
' { task: "Research AAPL financials: revenue, net income, stock price", label: "AAPL" },',
|
||||
' { task: "Research MSFT financials: revenue, net income, stock price", label: "MSFT" },',
|
||||
' { task: "Research GOOG financials: revenue, net income, stock price", label: "GOOG" }',
|
||||
" ]",
|
||||
"})",
|
||||
"```",
|
||||
"",
|
||||
"The system waits for ALL runs in the group to complete, then delivers the combined findings " +
|
||||
"plus the `next` continuation prompt back to you. You can then use tools (write files, call APIs, etc.) " +
|
||||
"to complete the follow-up work. The user is NOT blocked during this process — they can keep chatting.",
|
||||
"",
|
||||
"Use `next` whenever the user's request involves: collect data → then act on it (summarize, analyze, generate files).",
|
||||
"Without `next`, findings are summarized but no further action is taken.",
|
||||
"",
|
||||
"### Announce Modes (when not using groups)",
|
||||
"- `announce: \"immediate\"` (default): findings delivered per sub-agent as each completes.",
|
||||
"- `announce: \"silent\"`: all findings held until every silent sub-agent finishes, then delivered together.",
|
||||
"Groups always use silent collection internally — you don't need to set announce when using groupId.",
|
||||
"",
|
||||
"### Timeout Guidelines",
|
||||
"Set timeoutSeconds generously — a sub-agent that times out loses all its work.",
|
||||
"- Simple tasks (search, read, summarize): 1800 (30 min, the default)",
|
||||
"- Moderate tasks (multi-step research, file downloads + analysis): 1800–2400 (30–40 min)",
|
||||
"- Complex tasks (code generation, PDF creation, multi-file operations): 2400–3600 (40–60 min)",
|
||||
"When in doubt, use a longer timeout.",
|
||||
"### Rules",
|
||||
"- Each sub-agent task should be self-contained — include all context needed in the task description.",
|
||||
"- Sub-agents cannot delegate further (no nesting).",
|
||||
"- The tool blocks until all tasks complete — plan your workflow accordingly.",
|
||||
"- Set `timeoutSeconds` generously for complex tasks (default: 1800 = 30 min).",
|
||||
"",
|
||||
);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -6,8 +6,7 @@ import { createExecTool } from "./tools/exec.js";
|
|||
import { createProcessTool } from "./tools/process.js";
|
||||
import { createGlobTool } from "./tools/glob.js";
|
||||
import { createWebFetchTool, createWebSearchTool } from "./tools/web/index.js";
|
||||
import { createSessionsSpawnTool } from "./tools/sessions-spawn.js";
|
||||
import { createSessionsListTool } from "./tools/sessions-list.js";
|
||||
import { createDelegateTool } from "./tools/delegate.js";
|
||||
import { createMemorySearchTool } from "./tools/memory-search.js";
|
||||
import { createCronTool } from "./tools/cron/index.js";
|
||||
import { createDataTool } from "./tools/data/index.js";
|
||||
|
|
@ -26,12 +25,20 @@ export interface CreateToolsOptions {
|
|||
cwd: string;
|
||||
/** Profile directory for memory_search tool (optional) */
|
||||
profileDir?: string | undefined;
|
||||
/** Whether this agent is a subagent (passed to sessions_spawn tool) */
|
||||
/** Whether this agent is a subagent (passed to delegate tool) */
|
||||
isSubagent?: boolean | undefined;
|
||||
/** Session ID of the agent (passed to sessions_spawn tool) */
|
||||
/** Session ID of the agent (passed to delegate tool) */
|
||||
sessionId?: string | undefined;
|
||||
/** Resolved provider ID of the parent agent (passed to sessions_spawn for subagent inheritance) */
|
||||
/** Resolved provider ID of the parent agent (inherited by sub-agents) */
|
||||
provider?: string | undefined;
|
||||
/** Model name (inherited by sub-agents) */
|
||||
model?: string | undefined;
|
||||
/** API key (inherited by sub-agents) */
|
||||
apiKey?: string | undefined;
|
||||
/** Whether run-log is enabled (passed to child agents) */
|
||||
enableRunLog?: boolean | undefined;
|
||||
/** Run-log instance for delegate events */
|
||||
runLog?: import("./run-log.js").RunLog | undefined;
|
||||
/** Callback invoked when exec tool needs approval before running a command */
|
||||
onExecApprovalNeeded?: ExecApprovalCallback | undefined;
|
||||
/** Callback for sending files through messaging channels */
|
||||
|
|
@ -143,17 +150,18 @@ export function createAllTools(options: CreateToolsOptions | string): AgentTool<
|
|||
tools.push(sendFileTool as AgentTool<any>);
|
||||
}
|
||||
|
||||
// Add sessions_spawn tool (will be filtered by policy for subagents)
|
||||
const sessionsSpawnTool = createSessionsSpawnTool({
|
||||
// Add delegate tool (will be filtered by policy for subagents)
|
||||
const delegateTool = createDelegateTool({
|
||||
isSubagent: isSubagent ?? false,
|
||||
...(sessionId !== undefined ? { sessionId } : {}),
|
||||
...(opts.provider !== undefined ? { provider: opts.provider } : {}),
|
||||
sessionId,
|
||||
provider: opts.provider,
|
||||
model: opts.model,
|
||||
apiKey: opts.apiKey,
|
||||
cwd,
|
||||
runLog: opts.runLog,
|
||||
enableRunLog: opts.enableRunLog,
|
||||
});
|
||||
tools.push(sessionsSpawnTool as AgentTool<any>);
|
||||
|
||||
// Add sessions_list tool
|
||||
const sessionsListTool = createSessionsListTool({ ...(sessionId !== undefined ? { sessionId } : {}) });
|
||||
tools.push(sessionsListTool as AgentTool<any>);
|
||||
tools.push(delegateTool as AgentTool<any>);
|
||||
|
||||
return tools;
|
||||
}
|
||||
|
|
@ -162,6 +170,8 @@ export function createAllTools(options: CreateToolsOptions | string): AgentTool<
|
|||
export interface ResolveToolsOptions extends AgentOptions {
|
||||
/** Profile directory for memory_search tool (computed from profileId if not provided) */
|
||||
profileDir?: string | undefined;
|
||||
/** Run-log instance (forwarded to delegate tool) */
|
||||
runLog?: import("./run-log.js").RunLog | undefined;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -183,6 +193,10 @@ export function resolveTools(options: ResolveToolsOptions): AgentTool<any>[] {
|
|||
isSubagent: options.isSubagent,
|
||||
sessionId: options.sessionId,
|
||||
provider: options.provider,
|
||||
model: options.model,
|
||||
apiKey: options.apiKey,
|
||||
enableRunLog: options.enableRunLog,
|
||||
runLog: options.runLog,
|
||||
onExecApprovalNeeded: options.onExecApprovalNeeded,
|
||||
onChannelSendFile: options.onChannelSendFile,
|
||||
});
|
||||
|
|
|
|||
321
packages/core/src/agent/tools/delegate.ts
Normal file
321
packages/core/src/agent/tools/delegate.ts
Normal file
|
|
@ -0,0 +1,321 @@
|
|||
/**
|
||||
* delegate tool — run tasks in parallel via sub-agents.
|
||||
*
|
||||
* Synchronous from the LLM's perspective: the tool blocks until all
|
||||
* sub-agent tasks complete (or timeout), then returns combined results
|
||||
* directly in the tool response. No async infrastructure needed.
|
||||
*/
|
||||
|
||||
import { join } from "node:path";
|
||||
import { rmSync } from "node:fs";
|
||||
import { Writable } from "node:stream";
|
||||
import { Type } from "@sinclair/typebox";
|
||||
import type { AgentTool } from "@mariozechner/pi-agent-core";
|
||||
import { Agent } from "../runner.js";
|
||||
import type { RunLog } from "../run-log.js";
|
||||
import { DATA_DIR } from "@multica/utils";
|
||||
|
||||
const TaskItemSchema = Type.Object({
|
||||
task: Type.String({ description: "The task for the sub-agent to perform.", minLength: 1 }),
|
||||
label: Type.Optional(
|
||||
Type.String({ description: "Short human-readable label for this task (used in output headers)." }),
|
||||
),
|
||||
});
|
||||
|
||||
const DelegateSchema = Type.Object({
|
||||
tasks: Type.Array(TaskItemSchema, {
|
||||
description: "One or more tasks to run in parallel. Each spawns an isolated sub-agent.",
|
||||
minItems: 1,
|
||||
}),
|
||||
timeoutSeconds: Type.Optional(
|
||||
Type.Number({
|
||||
description:
|
||||
"Per-task timeout in seconds. Default: 1800 (30 min). " +
|
||||
"Set higher for complex tasks. The sub-agent is aborted if it exceeds this limit.",
|
||||
minimum: 0,
|
||||
}),
|
||||
),
|
||||
});
|
||||
|
||||
type DelegateArgs = {
|
||||
tasks: Array<{ task: string; label?: string }>;
|
||||
timeoutSeconds?: number;
|
||||
};
|
||||
|
||||
type TaskResult = {
|
||||
index: number;
|
||||
label: string;
|
||||
status: "ok" | "error" | "timeout";
|
||||
durationMs: number;
|
||||
findings: string;
|
||||
error?: string;
|
||||
};
|
||||
|
||||
export type DelegateResult = {
|
||||
taskCount: number;
|
||||
ok: number;
|
||||
errors: number;
|
||||
timeouts: number;
|
||||
totalDurationMs: number;
|
||||
tasks: TaskResult[];
|
||||
};
|
||||
|
||||
export interface CreateDelegateToolOptions {
|
||||
/** Whether the current agent is itself a subagent */
|
||||
isSubagent?: boolean;
|
||||
/** Session ID of the parent agent */
|
||||
sessionId?: string;
|
||||
/** Resolved provider ID (inherited by sub-agents) */
|
||||
provider?: string;
|
||||
/** Model override (inherited by sub-agents) */
|
||||
model?: string;
|
||||
/** API key (inherited by sub-agents) */
|
||||
apiKey?: string;
|
||||
/** Working directory (inherited by sub-agents) */
|
||||
cwd?: string;
|
||||
/** Run-log instance for emitting delegate events */
|
||||
runLog?: RunLog;
|
||||
/** Whether run-log is enabled (passed to child agents) */
|
||||
enableRunLog?: boolean;
|
||||
}
|
||||
|
||||
const DEFAULT_TIMEOUT_SECONDS = 1800; // 30 minutes
|
||||
|
||||
function formatElapsed(ms: number): string {
|
||||
const totalSeconds = Math.round(ms / 1000);
|
||||
if (totalSeconds < 60) return `${totalSeconds}s`;
|
||||
const minutes = Math.floor(totalSeconds / 60);
|
||||
const seconds = totalSeconds % 60;
|
||||
if (minutes < 60) return seconds > 0 ? `${minutes}m${seconds}s` : `${minutes}m`;
|
||||
const hours = Math.floor(minutes / 60);
|
||||
const remainingMinutes = minutes % 60;
|
||||
return remainingMinutes > 0 ? `${hours}h${remainingMinutes}m` : `${hours}h`;
|
||||
}
|
||||
|
||||
/**
|
||||
* Run a single sub-agent task with timeout.
|
||||
*/
|
||||
async function runSubagentTask(
|
||||
taskDef: { task: string; label?: string },
|
||||
index: number,
|
||||
timeoutMs: number,
|
||||
parentOptions: CreateDelegateToolOptions,
|
||||
runLog?: RunLog,
|
||||
): Promise<TaskResult> {
|
||||
const label = taskDef.label || `Task ${index + 1}`;
|
||||
const start = Date.now();
|
||||
|
||||
runLog?.log("delegate_task_start", {
|
||||
index,
|
||||
label,
|
||||
task: taskDef.task.slice(0, 200),
|
||||
});
|
||||
|
||||
const childAgent = new Agent({
|
||||
provider: parentOptions.provider,
|
||||
model: parentOptions.model,
|
||||
apiKey: parentOptions.apiKey,
|
||||
cwd: parentOptions.cwd,
|
||||
isSubagent: true,
|
||||
enableSkills: false,
|
||||
compactionMode: "tokens",
|
||||
enableRunLog: parentOptions.enableRunLog,
|
||||
// Suppress stdout/stderr output from child agents
|
||||
logger: {
|
||||
stdout: new NullStream(),
|
||||
stderr: new NullStream(),
|
||||
},
|
||||
});
|
||||
|
||||
try {
|
||||
let result: { text: string; error?: string };
|
||||
let timedOut = false;
|
||||
|
||||
if (timeoutMs > 0) {
|
||||
// Race agent.run against timeout
|
||||
let timer: ReturnType<typeof setTimeout>;
|
||||
result = await Promise.race([
|
||||
childAgent.run(taskDef.task),
|
||||
new Promise<never>((_, reject) => {
|
||||
timer = setTimeout(() => {
|
||||
timedOut = true;
|
||||
childAgent.abort();
|
||||
reject(new Error("timeout"));
|
||||
}, timeoutMs);
|
||||
}),
|
||||
]).catch((err) => {
|
||||
if (timedOut) {
|
||||
return { text: "", error: `Timed out after ${formatElapsed(timeoutMs)}` };
|
||||
}
|
||||
throw err;
|
||||
}).finally(() => {
|
||||
clearTimeout(timer);
|
||||
});
|
||||
} else {
|
||||
// No timeout
|
||||
result = await childAgent.run(taskDef.task);
|
||||
}
|
||||
|
||||
const durationMs = Date.now() - start;
|
||||
const status = timedOut ? "timeout" : result.error ? "error" : "ok";
|
||||
|
||||
const taskResult: TaskResult = {
|
||||
index,
|
||||
label,
|
||||
status,
|
||||
durationMs,
|
||||
findings: result.text || "(no output)",
|
||||
error: result.error || undefined,
|
||||
};
|
||||
|
||||
runLog?.log("delegate_task_end", {
|
||||
index,
|
||||
label,
|
||||
status,
|
||||
duration_ms: durationMs,
|
||||
findings_chars: taskResult.findings.length,
|
||||
error: taskResult.error,
|
||||
});
|
||||
|
||||
return taskResult;
|
||||
} catch (err) {
|
||||
const durationMs = Date.now() - start;
|
||||
const message = err instanceof Error ? err.message : String(err);
|
||||
|
||||
const taskResult: TaskResult = {
|
||||
index,
|
||||
label,
|
||||
status: "error",
|
||||
durationMs,
|
||||
findings: "",
|
||||
error: message,
|
||||
};
|
||||
|
||||
runLog?.log("delegate_task_end", {
|
||||
index,
|
||||
label,
|
||||
status: "error",
|
||||
duration_ms: durationMs,
|
||||
findings_chars: 0,
|
||||
error: message,
|
||||
});
|
||||
|
||||
return taskResult;
|
||||
} finally {
|
||||
// Flush session writes before cleanup
|
||||
await childAgent.flushSession();
|
||||
|
||||
// Clean up child session directory unless run-log is enabled
|
||||
if (!parentOptions.enableRunLog) {
|
||||
try {
|
||||
const sessionDir = join(DATA_DIR, "sessions", childAgent.sessionId);
|
||||
rmSync(sessionDir, { recursive: true, force: true });
|
||||
} catch {
|
||||
// Best-effort cleanup
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export function createDelegateTool(
|
||||
options: CreateDelegateToolOptions,
|
||||
): AgentTool<typeof DelegateSchema, DelegateResult> {
|
||||
return {
|
||||
name: "delegate",
|
||||
label: "Delegate Tasks",
|
||||
description:
|
||||
"Run one or more tasks in parallel via isolated sub-agents. " +
|
||||
"Each task gets its own agent with full tool access. " +
|
||||
"Results are returned directly when all tasks complete. " +
|
||||
"Use this for parallelizable work: multi-stock research, comparative analysis, " +
|
||||
"data collection from multiple sources, or any task that benefits from parallel execution.",
|
||||
parameters: DelegateSchema,
|
||||
execute: async (_toolCallId, args) => {
|
||||
const { tasks, timeoutSeconds } = args as DelegateArgs;
|
||||
const timeoutMs = (timeoutSeconds ?? DEFAULT_TIMEOUT_SECONDS) * 1000;
|
||||
|
||||
// Guard: sub-agents cannot delegate
|
||||
if (options.isSubagent) {
|
||||
return {
|
||||
content: [{ type: "text", text: "Error: delegate is not allowed from sub-agent sessions." }],
|
||||
details: {
|
||||
taskCount: 0,
|
||||
ok: 0,
|
||||
errors: 1,
|
||||
timeouts: 0,
|
||||
totalDurationMs: 0,
|
||||
tasks: [],
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
const labels = tasks.map((t, i) => t.label || `Task ${i + 1}`);
|
||||
|
||||
options.runLog?.log("delegate_start", {
|
||||
task_count: tasks.length,
|
||||
timeout_seconds: timeoutSeconds ?? DEFAULT_TIMEOUT_SECONDS,
|
||||
labels,
|
||||
});
|
||||
|
||||
const totalStart = Date.now();
|
||||
|
||||
// Run all tasks in parallel
|
||||
const results = await Promise.all(
|
||||
tasks.map((taskDef, index) =>
|
||||
runSubagentTask(taskDef, index, timeoutMs, options, options.runLog),
|
||||
),
|
||||
);
|
||||
|
||||
const totalDurationMs = Date.now() - totalStart;
|
||||
const ok = results.filter((r) => r.status === "ok").length;
|
||||
const errors = results.filter((r) => r.status === "error").length;
|
||||
const timeouts = results.filter((r) => r.status === "timeout").length;
|
||||
|
||||
options.runLog?.log("delegate_end", {
|
||||
task_count: tasks.length,
|
||||
ok,
|
||||
errors,
|
||||
timeouts,
|
||||
total_duration_ms: totalDurationMs,
|
||||
});
|
||||
|
||||
// Format combined response
|
||||
const statusLine =
|
||||
`All ${tasks.length} task(s) completed: ${ok} succeeded, ${errors} failed, ${timeouts} timed out.\n` +
|
||||
`Total wall time: ${formatElapsed(totalDurationMs)}`;
|
||||
|
||||
const taskSections = results.map((r) => {
|
||||
const statusTag = r.status === "ok" ? "OK" : r.status.toUpperCase();
|
||||
const header = `--- Task ${r.index + 1}: "${r.label}" [${statusTag}] (${formatElapsed(r.durationMs)}) ---`;
|
||||
const body = r.status === "error" && r.error
|
||||
? `Error: ${r.error}\n${r.findings || ""}`
|
||||
: r.findings;
|
||||
return `${header}\n${body}`;
|
||||
});
|
||||
|
||||
const responseText = `${statusLine}\n\n${taskSections.join("\n\n")}`;
|
||||
|
||||
return {
|
||||
content: [{ type: "text", text: responseText }],
|
||||
details: {
|
||||
taskCount: tasks.length,
|
||||
ok,
|
||||
errors,
|
||||
timeouts,
|
||||
totalDurationMs,
|
||||
tasks: results,
|
||||
},
|
||||
};
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Writable stream that discards all output.
|
||||
* Used to suppress child agent stdout/stderr.
|
||||
*/
|
||||
class NullStream extends Writable {
|
||||
_write(_chunk: any, _encoding: string, callback: () => void): void {
|
||||
callback();
|
||||
}
|
||||
}
|
||||
|
|
@ -34,7 +34,7 @@ export const TOOL_GROUPS: Record<string, string[]> = {
|
|||
"group:memory": ["memory_search"],
|
||||
|
||||
// Subagent tools
|
||||
"group:subagent": ["sessions_spawn", "sessions_list"],
|
||||
"group:subagent": ["delegate"],
|
||||
|
||||
// Cron/scheduling tools
|
||||
"group:cron": ["cron"],
|
||||
|
|
@ -61,8 +61,8 @@ export const TOOL_GROUPS: Record<string, string[]> = {
|
|||
* Subagents should not have access to session management or system tools.
|
||||
*/
|
||||
export const DEFAULT_SUBAGENT_TOOL_DENY: string[] = [
|
||||
// Subagents cannot spawn subagents (no nested spawning)
|
||||
"sessions_spawn",
|
||||
// Subagents cannot delegate (no nested delegation)
|
||||
"delegate",
|
||||
];
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -9,7 +9,7 @@ export { createGlobTool } from "./glob.js";
|
|||
export { createWebFetchTool, createWebSearchTool } from "./web/index.js";
|
||||
export { createCronTool } from "./cron/index.js";
|
||||
export { createDataTool } from "./data/index.js";
|
||||
export { createSessionsListTool } from "./sessions-list.js";
|
||||
export { createDelegateTool } from "./delegate.js";
|
||||
|
||||
// Tool groups
|
||||
export {
|
||||
|
|
|
|||
|
|
@ -1,211 +0,0 @@
|
|||
import { describe, it, expect, beforeEach } from "vitest";
|
||||
import type { SubagentRunRecord } from "../subagent/types.js";
|
||||
import { resetSubagentRegistryForTests, seedSubagentRunForTests } from "../subagent/registry.js";
|
||||
import { createSessionsListTool } from "./sessions-list.js";
|
||||
|
||||
function makeRecord(overrides: Partial<SubagentRunRecord> = {}): SubagentRunRecord {
|
||||
return {
|
||||
runId: "run-001",
|
||||
childSessionId: "child-001",
|
||||
requesterSessionId: "parent-001",
|
||||
task: "Test task",
|
||||
cleanup: "delete",
|
||||
createdAt: 1700000000000,
|
||||
...overrides,
|
||||
};
|
||||
}
|
||||
|
||||
describe("sessions_list tool", () => {
|
||||
beforeEach(() => {
|
||||
resetSubagentRegistryForTests();
|
||||
});
|
||||
|
||||
it("returns empty message when no runs exist", async () => {
|
||||
const tool = createSessionsListTool({ sessionId: "parent-001" });
|
||||
const result = await tool.execute("call-1", {});
|
||||
|
||||
expect(result.content[0]).toEqual({
|
||||
type: "text",
|
||||
text: "No subagent runs for this session.",
|
||||
});
|
||||
expect(result.details).toEqual({ runs: [] });
|
||||
});
|
||||
|
||||
it("lists multiple runs with correct status mapping", async () => {
|
||||
const now = Date.now();
|
||||
seedSubagentRunForTests(
|
||||
makeRecord({
|
||||
runId: "run-aaa",
|
||||
label: "Code Review",
|
||||
startedAt: now - 45000,
|
||||
}),
|
||||
);
|
||||
seedSubagentRunForTests(
|
||||
makeRecord({
|
||||
runId: "run-bbb",
|
||||
label: "Test Analysis",
|
||||
startedAt: now - 60000,
|
||||
endedAt: now - 30000,
|
||||
outcome: { status: "ok" },
|
||||
findings: "All tests passed successfully.",
|
||||
findingsCaptured: true,
|
||||
}),
|
||||
);
|
||||
seedSubagentRunForTests(
|
||||
makeRecord({
|
||||
runId: "run-ccc",
|
||||
label: "Lint Check",
|
||||
startedAt: now - 60000,
|
||||
endedAt: now,
|
||||
outcome: { status: "error", error: "timeout" },
|
||||
findings: "Lint check timed out.",
|
||||
findingsCaptured: true,
|
||||
}),
|
||||
);
|
||||
|
||||
const tool = createSessionsListTool({ sessionId: "parent-001" });
|
||||
const result = await tool.execute("call-1", {});
|
||||
|
||||
const text = result.content[0]!;
|
||||
expect(text.type).toBe("text");
|
||||
expect((text as { text: string }).text).toContain("3 total");
|
||||
expect((text as { text: string }).text).toContain("[RUNNING]");
|
||||
expect((text as { text: string }).text).toContain("[OK]");
|
||||
expect((text as { text: string }).text).toContain("[ERROR]");
|
||||
expect((text as { text: string }).text).toContain("Code Review");
|
||||
expect((text as { text: string }).text).toContain("Test Analysis");
|
||||
expect((text as { text: string }).text).toContain("Lint Check");
|
||||
// Verify full runId is shown for completed runs
|
||||
expect((text as { text: string }).text).toContain("id:run-aaa");
|
||||
expect((text as { text: string }).text).toContain("id:run-bbb");
|
||||
expect((text as { text: string }).text).toContain("id:run-ccc");
|
||||
// Verify findings are shown for completed runs
|
||||
expect((text as { text: string }).text).toContain("All tests passed successfully.");
|
||||
expect((text as { text: string }).text).toContain("Lint check timed out.");
|
||||
|
||||
expect(result.details!.runs).toHaveLength(3);
|
||||
expect(result.details!.runs[0]!.status).toBe("running");
|
||||
expect(result.details!.runs[1]!.status).toBe("ok");
|
||||
expect(result.details!.runs[2]!.status).toBe("error");
|
||||
});
|
||||
|
||||
it("returns detail for a specific runId", async () => {
|
||||
const now = Date.now();
|
||||
seedSubagentRunForTests(
|
||||
makeRecord({
|
||||
runId: "run-detail",
|
||||
label: "Deep Analysis",
|
||||
task: "Analyze the authentication module thoroughly",
|
||||
startedAt: now - 90000,
|
||||
endedAt: now - 10000,
|
||||
outcome: { status: "ok" },
|
||||
findings: "Found 2 potential issues in token validation.",
|
||||
findingsCaptured: true,
|
||||
}),
|
||||
);
|
||||
|
||||
const tool = createSessionsListTool({ sessionId: "parent-001" });
|
||||
const result = await tool.execute("call-1", { runId: "run-detail" });
|
||||
|
||||
const text = (result.content[0] as { text: string }).text;
|
||||
expect(text).toContain("Run: run-detail");
|
||||
expect(text).toContain("Label: Deep Analysis");
|
||||
expect(text).toContain("Status: ok");
|
||||
expect(text).toContain("Found 2 potential issues");
|
||||
expect(text).toContain("Duration:");
|
||||
|
||||
expect(result.details!.runs).toHaveLength(1);
|
||||
expect(result.details!.runs[0]!.runId).toBe("run-detail");
|
||||
});
|
||||
|
||||
it("returns not found for unknown runId", async () => {
|
||||
const tool = createSessionsListTool({ sessionId: "parent-001" });
|
||||
const result = await tool.execute("call-1", { runId: "nonexistent" });
|
||||
|
||||
const text = (result.content[0] as { text: string }).text;
|
||||
expect(text).toContain("Run not found");
|
||||
expect(result.details).toEqual({ runs: [] });
|
||||
});
|
||||
|
||||
it("rejects runId belonging to a different requester", async () => {
|
||||
seedSubagentRunForTests(
|
||||
makeRecord({
|
||||
runId: "run-other",
|
||||
requesterSessionId: "other-parent",
|
||||
}),
|
||||
);
|
||||
|
||||
const tool = createSessionsListTool({ sessionId: "parent-001" });
|
||||
const result = await tool.execute("call-1", { runId: "run-other" });
|
||||
|
||||
const text = (result.content[0] as { text: string }).text;
|
||||
expect(text).toContain("Run not found");
|
||||
expect(result.details).toEqual({ runs: [] });
|
||||
});
|
||||
|
||||
it("handles missing sessionId gracefully", async () => {
|
||||
const tool = createSessionsListTool({});
|
||||
const result = await tool.execute("call-1", {});
|
||||
|
||||
const text = (result.content[0] as { text: string }).text;
|
||||
expect(text).toContain("No session ID available");
|
||||
expect(result.details).toEqual({ runs: [] });
|
||||
});
|
||||
|
||||
it("shows findings for grouped completed runs", async () => {
|
||||
const now = Date.now();
|
||||
const groupId = "group-001";
|
||||
seedSubagentRunForTests(
|
||||
makeRecord({
|
||||
runId: "run-g1",
|
||||
label: "Bull Case Research",
|
||||
startedAt: now - 60000,
|
||||
endedAt: now - 10000,
|
||||
outcome: { status: "ok" },
|
||||
findings: "AI infrastructure capex growing 40% YoY.",
|
||||
findingsCaptured: true,
|
||||
groupId,
|
||||
}),
|
||||
);
|
||||
seedSubagentRunForTests(
|
||||
makeRecord({
|
||||
runId: "run-g2",
|
||||
label: "Bear Case Research",
|
||||
startedAt: now - 60000,
|
||||
endedAt: now - 5000,
|
||||
outcome: { status: "ok" },
|
||||
findings: "Valuation risk: forward P/E above historical average.",
|
||||
findingsCaptured: true,
|
||||
groupId,
|
||||
}),
|
||||
);
|
||||
|
||||
const tool = createSessionsListTool({ sessionId: "parent-001" });
|
||||
const result = await tool.execute("call-1", {});
|
||||
|
||||
const text = (result.content[0] as { text: string }).text;
|
||||
expect(text).toContain("id:run-g1");
|
||||
expect(text).toContain("id:run-g2");
|
||||
expect(text).toContain("AI infrastructure capex growing 40% YoY.");
|
||||
expect(text).toContain("Valuation risk: forward P/E above historical average.");
|
||||
});
|
||||
|
||||
it("shows findings status for running task", async () => {
|
||||
const now = Date.now();
|
||||
seedSubagentRunForTests(
|
||||
makeRecord({
|
||||
runId: "run-running",
|
||||
label: "Still Running",
|
||||
startedAt: now - 30000,
|
||||
// no endedAt
|
||||
}),
|
||||
);
|
||||
|
||||
const tool = createSessionsListTool({ sessionId: "parent-001" });
|
||||
const result = await tool.execute("call-1", { runId: "run-running" });
|
||||
|
||||
const text = (result.content[0] as { text: string }).text;
|
||||
expect(text).toContain("Status: running");
|
||||
expect(text).toContain("Findings: (still running)");
|
||||
});
|
||||
});
|
||||
|
|
@ -1,273 +0,0 @@
|
|||
/**
|
||||
* sessions_list tool — allows an agent to view its spawned subagent runs.
|
||||
*
|
||||
* Lists all subagent runs for the current session, or shows details for a
|
||||
* specific run when a runId is provided.
|
||||
*/
|
||||
|
||||
import { Type } from "@sinclair/typebox";
|
||||
import type { AgentTool } from "@mariozechner/pi-agent-core";
|
||||
import { listSubagentRuns, getSubagentRun, getSubagentGroup } from "../subagent/registry.js";
|
||||
import type { SubagentRunRecord } from "../subagent/types.js";
|
||||
|
||||
const SessionsListSchema = Type.Object({
|
||||
runId: Type.Optional(
|
||||
Type.String({ description: "Optional run ID to get details for a specific run. If omitted, lists all runs." }),
|
||||
),
|
||||
});
|
||||
|
||||
type SessionsListArgs = {
|
||||
runId?: string;
|
||||
};
|
||||
|
||||
export type SessionsListResult = {
|
||||
runs: Array<{
|
||||
runId: string;
|
||||
label?: string | undefined;
|
||||
task: string;
|
||||
status: "running" | "ok" | "error" | "timeout" | "unknown";
|
||||
startedAt?: number | undefined;
|
||||
endedAt?: number | undefined;
|
||||
findings?: string | undefined;
|
||||
}>;
|
||||
};
|
||||
|
||||
export interface CreateSessionsListToolOptions {
|
||||
/** Session ID of the current (requester) agent */
|
||||
sessionId?: string;
|
||||
}
|
||||
|
||||
function resolveStatus(record: SubagentRunRecord): "running" | "ok" | "error" | "timeout" | "unknown" {
|
||||
if (!record.endedAt) return "running";
|
||||
return record.outcome?.status ?? "unknown";
|
||||
}
|
||||
|
||||
function formatElapsed(ms: number): string {
|
||||
const totalSeconds = Math.round(ms / 1000);
|
||||
if (totalSeconds < 60) return `${totalSeconds}s`;
|
||||
const minutes = Math.floor(totalSeconds / 60);
|
||||
const seconds = totalSeconds % 60;
|
||||
if (minutes < 60) return seconds > 0 ? `${minutes}m${seconds}s` : `${minutes}m`;
|
||||
const hours = Math.floor(minutes / 60);
|
||||
const remainingMinutes = minutes % 60;
|
||||
return remainingMinutes > 0 ? `${hours}h${remainingMinutes}m` : `${hours}h`;
|
||||
}
|
||||
|
||||
function formatRunSummary(record: SubagentRunRecord, index: number, now: number): string {
|
||||
const status = resolveStatus(record);
|
||||
const displayName = record.label || record.task.slice(0, 60);
|
||||
const statusTag = `[${status}]`.padEnd(10);
|
||||
|
||||
let timing = "";
|
||||
if (status === "running" && record.startedAt) {
|
||||
timing = `started ${formatElapsed(now - record.startedAt)} ago`;
|
||||
} else if (record.startedAt && record.endedAt) {
|
||||
timing = `completed in ${formatElapsed(record.endedAt - record.startedAt)}`;
|
||||
}
|
||||
|
||||
const parts = [`#${index + 1} ${statusTag} "${displayName}"`];
|
||||
if (timing) parts.push(`(${record.runId.slice(0, 8)}…, ${timing})`);
|
||||
else parts.push(`(${record.runId.slice(0, 8)}…)`);
|
||||
|
||||
return parts.join(" ");
|
||||
}
|
||||
|
||||
function formatRunDetail(record: SubagentRunRecord, now: number): string {
|
||||
const status = resolveStatus(record);
|
||||
const lines: string[] = [
|
||||
`Run: ${record.runId}`,
|
||||
];
|
||||
|
||||
if (record.label) lines.push(`Label: ${record.label}`);
|
||||
if (record.groupId) {
|
||||
const group = getSubagentGroup(record.groupId);
|
||||
lines.push(`Group: ${record.groupId}${group?.label ? ` (${group.label})` : ""}`);
|
||||
if (group?.next) lines.push(`Continuation: ${group.next.slice(0, 120)}${group.next.length > 120 ? "…" : ""}`);
|
||||
}
|
||||
lines.push(`Task: ${record.task}`);
|
||||
lines.push(`Status: ${status}${record.outcome?.error ? ` — ${record.outcome.error}` : ""}`);
|
||||
lines.push(`Child Session: ${record.childSessionId}`);
|
||||
lines.push(`Created: ${new Date(record.createdAt).toISOString()} (${formatElapsed(now - record.createdAt)} ago)`);
|
||||
|
||||
if (record.startedAt) {
|
||||
lines.push(`Started: ${new Date(record.startedAt).toISOString()} (${formatElapsed(now - record.startedAt)} ago)`);
|
||||
}
|
||||
if (record.endedAt) {
|
||||
lines.push(`Ended: ${new Date(record.endedAt).toISOString()}`);
|
||||
if (record.startedAt) {
|
||||
lines.push(`Duration: ${formatElapsed(record.endedAt - record.startedAt)}`);
|
||||
}
|
||||
}
|
||||
|
||||
if (record.findingsCaptured) {
|
||||
lines.push(`Findings: ${record.findings || "(no output)"}`);
|
||||
} else if (record.endedAt) {
|
||||
lines.push("Findings: (not yet captured)");
|
||||
} else {
|
||||
lines.push("Findings: (still running)");
|
||||
}
|
||||
|
||||
if (record.announced) lines.push("Announced: yes");
|
||||
|
||||
return lines.join("\n");
|
||||
}
|
||||
|
||||
function toResultRun(record: SubagentRunRecord) {
|
||||
return {
|
||||
runId: record.runId,
|
||||
label: record.label,
|
||||
task: record.task,
|
||||
status: resolveStatus(record),
|
||||
startedAt: record.startedAt,
|
||||
endedAt: record.endedAt,
|
||||
findings: record.findings,
|
||||
};
|
||||
}
|
||||
|
||||
export function createSessionsListTool(
|
||||
options: CreateSessionsListToolOptions,
|
||||
): AgentTool<typeof SessionsListSchema, SessionsListResult> {
|
||||
return {
|
||||
name: "sessions_list",
|
||||
label: "List Subagent Runs",
|
||||
description:
|
||||
"List all subagent runs spawned by this session and their current status. " +
|
||||
"Optionally pass a runId to get detailed information about a specific run. " +
|
||||
"Use this to check subagent progress or when the user asks about status.",
|
||||
parameters: SessionsListSchema,
|
||||
execute: async (_toolCallId, args) => {
|
||||
const { runId } = args as SessionsListArgs;
|
||||
const requesterSessionId = options.sessionId;
|
||||
|
||||
if (!requesterSessionId) {
|
||||
return {
|
||||
content: [{ type: "text", text: "No session ID available. Cannot list subagent runs." }],
|
||||
details: { runs: [] },
|
||||
};
|
||||
}
|
||||
|
||||
const now = Date.now();
|
||||
|
||||
// Detail mode: specific run
|
||||
if (runId) {
|
||||
const record = getSubagentRun(runId);
|
||||
if (!record) {
|
||||
return {
|
||||
content: [{ type: "text", text: `Run not found: ${runId}` }],
|
||||
details: { runs: [] },
|
||||
};
|
||||
}
|
||||
if (record.requesterSessionId !== requesterSessionId) {
|
||||
return {
|
||||
content: [{ type: "text", text: `Run not found: ${runId}` }],
|
||||
details: { runs: [] },
|
||||
};
|
||||
}
|
||||
return {
|
||||
content: [{ type: "text", text: formatRunDetail(record, now) }],
|
||||
details: { runs: [toResultRun(record)] },
|
||||
};
|
||||
}
|
||||
|
||||
// List mode: all runs for this session
|
||||
const runs = listSubagentRuns(requesterSessionId);
|
||||
|
||||
if (runs.length === 0) {
|
||||
return {
|
||||
content: [{ type: "text", text: "No subagent runs for this session." }],
|
||||
details: { runs: [] },
|
||||
};
|
||||
}
|
||||
|
||||
const someRunning = runs.some((r) => !r.endedAt);
|
||||
|
||||
// Build status lines, grouping runs by groupId
|
||||
const statusLines: string[] = [];
|
||||
const groupedRuns = new Map<string, SubagentRunRecord[]>();
|
||||
const ungroupedRuns: SubagentRunRecord[] = [];
|
||||
|
||||
for (const r of runs) {
|
||||
if (r.groupId) {
|
||||
const list = groupedRuns.get(r.groupId) ?? [];
|
||||
list.push(r);
|
||||
groupedRuns.set(r.groupId, list);
|
||||
} else {
|
||||
ungroupedRuns.push(r);
|
||||
}
|
||||
}
|
||||
|
||||
let idx = 0;
|
||||
|
||||
// Grouped runs
|
||||
for (const [gId, gRuns] of groupedRuns) {
|
||||
const group = getSubagentGroup(gId);
|
||||
const groupLabel = group?.label || `Group ${gId.slice(0, 8)}…`;
|
||||
const done = gRuns.filter(r => r.endedAt).length;
|
||||
const nextSnippet = group?.next ? ` → next: "${group.next.slice(0, 60)}${group.next.length > 60 ? "…" : ""}"` : "";
|
||||
statusLines.push(`\n 📦 ${groupLabel} (${done}/${gRuns.length} done${nextSnippet})`);
|
||||
|
||||
for (const r of gRuns) {
|
||||
idx++;
|
||||
const displayName = r.label || r.task.slice(0, 60);
|
||||
const status = resolveStatus(r);
|
||||
if (status === "running") {
|
||||
const elapsed = r.startedAt ? formatElapsed(now - r.startedAt) : "just spawned";
|
||||
statusLines.push(` ${idx}. [RUNNING] "${displayName}" (${elapsed}) id:${r.runId}`);
|
||||
} else {
|
||||
const elapsed = r.startedAt && r.endedAt ? formatElapsed(r.endedAt - r.startedAt) : "";
|
||||
const findings = r.findingsCaptured
|
||||
? (r.findings ? r.findings.slice(0, 4000) + (r.findings.length > 4000 ? "…" : "") : "(no output)")
|
||||
: "(findings not yet captured)";
|
||||
statusLines.push(` ${idx}. [${status.toUpperCase()}] "${displayName}" (${elapsed}) id:${r.runId}\n Findings: ${findings}`);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Ungrouped runs
|
||||
for (const r of ungroupedRuns) {
|
||||
idx++;
|
||||
const displayName = r.label || r.task.slice(0, 60);
|
||||
const status = resolveStatus(r);
|
||||
if (status === "running") {
|
||||
const elapsed = r.startedAt ? formatElapsed(now - r.startedAt) : "just spawned";
|
||||
statusLines.push(` ${idx}. [RUNNING] "${displayName}" (${elapsed}) id:${r.runId}`);
|
||||
} else {
|
||||
const elapsed = r.startedAt && r.endedAt ? formatElapsed(r.endedAt - r.startedAt) : "";
|
||||
const findings = r.findingsCaptured
|
||||
? (r.findings ? r.findings.slice(0, 4000) + (r.findings.length > 4000 ? "…" : "") : "(no output)")
|
||||
: "(findings not yet captured)";
|
||||
statusLines.push(` ${idx}. [${status.toUpperCase()}] "${displayName}" (${elapsed}) id:${r.runId}\n Findings: ${findings}`);
|
||||
}
|
||||
}
|
||||
|
||||
const header = `Subagent runs for this session: ${runs.length} total`;
|
||||
const body = statusLines.join("\n");
|
||||
|
||||
// If any subagents are still running, return status with wait instruction.
|
||||
// We do NOT use steer() here — steer would cancel unrelated tool calls
|
||||
// that the LLM may be processing in the same batch.
|
||||
if (someRunning) {
|
||||
const runningCount = runs.filter((r) => !r.endedAt).length;
|
||||
return {
|
||||
content: [
|
||||
{
|
||||
type: "text",
|
||||
text:
|
||||
header + "\n" + body + "\n\n" +
|
||||
`STATUS: ${runningCount} subagent(s) still running. This is normal — they need time to complete.\n` +
|
||||
"ACTION REQUIRED: Do NOT call sessions_list again. Results will be delivered into your context automatically when they finish.\n" +
|
||||
"Do NOT attempt to do this work yourself — the subagents are handling it.",
|
||||
},
|
||||
],
|
||||
details: { runs: runs.map(toResultRun) },
|
||||
};
|
||||
}
|
||||
|
||||
// All completed — normal response
|
||||
return {
|
||||
content: [{ type: "text", text: header + "\n" + body }],
|
||||
details: { runs: runs.map(toResultRun) },
|
||||
};
|
||||
},
|
||||
};
|
||||
}
|
||||
|
|
@ -1,61 +0,0 @@
|
|||
import { describe, it, expect, beforeEach } from "vitest";
|
||||
import { createSessionsSpawnTool } from "./sessions-spawn.js";
|
||||
import { getSubagentGroup, resetSubagentRegistryForTests } from "../subagent/registry.js";
|
||||
|
||||
describe("sessions_spawn tool", () => {
|
||||
beforeEach(() => {
|
||||
resetSubagentRegistryForTests();
|
||||
});
|
||||
it("has correct name and description", () => {
|
||||
const tool = createSessionsSpawnTool({ isSubagent: false, sessionId: "test-session" });
|
||||
expect(tool.name).toBe("sessions_spawn");
|
||||
expect(tool.label).toBe("Spawn Subagent");
|
||||
expect(tool.description).toContain("Spawn a background subagent");
|
||||
});
|
||||
|
||||
it("rejects spawn from subagent sessions", async () => {
|
||||
const tool = createSessionsSpawnTool({ isSubagent: true, sessionId: "child-session" });
|
||||
|
||||
const result = await tool.execute(
|
||||
"call-1",
|
||||
{ task: "do something" } as any,
|
||||
new AbortController().signal,
|
||||
);
|
||||
|
||||
expect(result.details.status).toBe("error");
|
||||
expect(result.details.error).toContain("not allowed from sub-agent sessions");
|
||||
const firstContent = result.content[0] as { type: string; text: string };
|
||||
expect(firstContent.text).toContain("not allowed");
|
||||
});
|
||||
|
||||
it("auto-creates group when custom groupId is provided", async () => {
|
||||
const tool = createSessionsSpawnTool({ isSubagent: false, sessionId: "parent-session" });
|
||||
|
||||
// Should not error — the group is auto-created
|
||||
await tool.execute(
|
||||
"call-group",
|
||||
{ task: "research topic", label: "Research", groupId: "my-custom-group" } as any,
|
||||
new AbortController().signal,
|
||||
);
|
||||
|
||||
// Verify group was created in the registry
|
||||
const group = getSubagentGroup("my-custom-group");
|
||||
expect(group).toBeDefined();
|
||||
expect(group!.groupId).toBe("my-custom-group");
|
||||
expect(group!.label).toBe("Group: Research");
|
||||
});
|
||||
|
||||
it("fails gracefully when Hub is not initialized", async () => {
|
||||
const tool = createSessionsSpawnTool({ isSubagent: false, sessionId: "parent-session" });
|
||||
|
||||
const result = await tool.execute(
|
||||
"call-2",
|
||||
{ task: "analyze code", label: "Code Analysis" } as any,
|
||||
new AbortController().signal,
|
||||
);
|
||||
|
||||
// Should get an error because Hub singleton is not set up in test
|
||||
expect(result.details.status).toBe("error");
|
||||
expect(result.details.error).toContain("Hub");
|
||||
});
|
||||
});
|
||||
|
|
@ -1,220 +0,0 @@
|
|||
/**
|
||||
* sessions_spawn tool — allows a parent agent to spawn subagent runs.
|
||||
*
|
||||
* Subagents run in isolated sessions with restricted tools.
|
||||
* Results are announced back to the parent when the child completes.
|
||||
*/
|
||||
|
||||
import { v7 as uuidv7 } from "uuid";
|
||||
import { Type } from "@sinclair/typebox";
|
||||
import type { AgentTool } from "@mariozechner/pi-agent-core";
|
||||
import { getHub } from "../../hub/hub-singleton.js";
|
||||
import { buildSubagentSystemPrompt } from "../subagent/announce.js";
|
||||
import { registerSubagentRun, createSubagentGroup, getSubagentGroup } from "../subagent/registry.js";
|
||||
import { resolveTools } from "../tools.js";
|
||||
|
||||
const SessionsSpawnSchema = Type.Object({
|
||||
task: Type.String({ description: "The task for the subagent to perform.", minLength: 1 }),
|
||||
label: Type.Optional(
|
||||
Type.String({ description: "Human-readable label for this background task." }),
|
||||
),
|
||||
model: Type.Optional(
|
||||
Type.String({ description: "Override the LLM model for the subagent (e.g. 'gpt-4o', 'claude-sonnet')." }),
|
||||
),
|
||||
cleanup: Type.Optional(
|
||||
Type.Union([Type.Literal("delete"), Type.Literal("keep")], {
|
||||
description: "Session cleanup after completion. 'delete' removes session files, 'keep' preserves for audit. Default: 'delete'.",
|
||||
}),
|
||||
),
|
||||
timeoutSeconds: Type.Optional(
|
||||
Type.Number({
|
||||
description:
|
||||
"Execution timeout in seconds. Default: 1800 (30 min). " +
|
||||
"Set to 0 for no timeout (useful for complex, long-running tasks). " +
|
||||
"The subagent will be terminated if it exceeds this limit.",
|
||||
minimum: 0,
|
||||
}),
|
||||
),
|
||||
announce: Type.Optional(
|
||||
Type.Union([Type.Literal("immediate"), Type.Literal("silent")], {
|
||||
description:
|
||||
"Announcement mode. 'immediate' (default): findings delivered as each subagent completes. " +
|
||||
"'silent': defer all announcements until every silent subagent from this session finishes, " +
|
||||
"then deliver one combined report. Use 'silent' when spawning multiple subagents to collect " +
|
||||
"data in parallel and you want to summarize everything at once. " +
|
||||
"Ignored when groupId is provided (groups always collect all results before announcing).",
|
||||
}),
|
||||
),
|
||||
groupId: Type.Optional(
|
||||
Type.String({
|
||||
description:
|
||||
"Join an existing group. Pass the groupId returned by a previous sessions_spawn call " +
|
||||
"to add this subagent to the same group. All runs in a group are announced together " +
|
||||
"when the last one completes. If omitted AND 'next' is provided, a new group is created automatically.",
|
||||
}),
|
||||
),
|
||||
next: Type.Optional(
|
||||
Type.String({
|
||||
description:
|
||||
"Continuation task to execute after ALL subagents in the group complete. " +
|
||||
"Only used when creating a new group (first spawn without groupId). " +
|
||||
"When set, the combined findings from all subagents plus this 'next' prompt " +
|
||||
"are delivered to you so you can perform follow-up work (e.g. summarize, generate reports, write files). " +
|
||||
"Setting 'next' automatically creates a group and implies silent collection.",
|
||||
}),
|
||||
),
|
||||
});
|
||||
|
||||
type SessionsSpawnArgs = {
|
||||
task: string;
|
||||
label?: string;
|
||||
model?: string;
|
||||
cleanup?: "delete" | "keep";
|
||||
timeoutSeconds?: number;
|
||||
announce?: "immediate" | "silent";
|
||||
groupId?: string;
|
||||
next?: string;
|
||||
};
|
||||
|
||||
export type SessionsSpawnResult = {
|
||||
status: "accepted" | "error";
|
||||
childSessionId?: string;
|
||||
runId?: string;
|
||||
groupId?: string;
|
||||
error?: string;
|
||||
};
|
||||
|
||||
export interface CreateSessionsSpawnToolOptions {
|
||||
/** Whether the current agent is itself a subagent */
|
||||
isSubagent?: boolean;
|
||||
/** Session ID of the current (requester) agent */
|
||||
sessionId?: string;
|
||||
/** Resolved provider ID of the parent agent (inherited by subagents) */
|
||||
provider?: string;
|
||||
}
|
||||
|
||||
export function createSessionsSpawnTool(
|
||||
options: CreateSessionsSpawnToolOptions,
|
||||
): AgentTool<typeof SessionsSpawnSchema, SessionsSpawnResult> {
|
||||
return {
|
||||
name: "sessions_spawn",
|
||||
label: "Spawn Subagent",
|
||||
description:
|
||||
"Spawn a background subagent to handle a specific task. The subagent runs in an isolated session with its own tool set. " +
|
||||
"When it completes, its findings are delivered directly into your context automatically. " +
|
||||
"After spawning, do NOT proceed with work that depends on the results — but you can still chat or do unrelated tasks. " +
|
||||
"When spawning multiple subagents for a collect-then-act workflow, ALWAYS use the `next` parameter " +
|
||||
"on the first spawn to define follow-up work, then pass the returned groupId to subsequent spawns. " +
|
||||
"Use this for parallelizable work, long-running analysis, or tasks that benefit from isolation.",
|
||||
parameters: SessionsSpawnSchema,
|
||||
execute: async (_toolCallId, args) => {
|
||||
const { task, label, model, cleanup = "delete", timeoutSeconds, announce, next } = args as SessionsSpawnArgs;
|
||||
let { groupId } = args as SessionsSpawnArgs;
|
||||
|
||||
// Guard: subagents cannot spawn subagents
|
||||
if (options.isSubagent) {
|
||||
return {
|
||||
content: [{ type: "text", text: "Error: sessions_spawn is not allowed from sub-agent sessions." }],
|
||||
details: {
|
||||
status: "error",
|
||||
error: "sessions_spawn is not allowed from sub-agent sessions",
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
const requesterSessionId = options.sessionId ?? "unknown";
|
||||
const runId = uuidv7();
|
||||
const childSessionId = uuidv7();
|
||||
|
||||
// Auto-create group when groupId is provided but doesn't exist yet,
|
||||
// or when `next` is provided without a groupId.
|
||||
if (groupId) {
|
||||
const existingGroup = getSubagentGroup(groupId);
|
||||
if (!existingGroup) {
|
||||
// LLM provided a custom groupId — auto-create the group
|
||||
createSubagentGroup({
|
||||
groupId,
|
||||
requesterSessionId,
|
||||
label: label ? `Group: ${label}` : undefined,
|
||||
next,
|
||||
});
|
||||
}
|
||||
} else if (next) {
|
||||
groupId = uuidv7();
|
||||
createSubagentGroup({
|
||||
groupId,
|
||||
requesterSessionId,
|
||||
label: label ? `Group: ${label}` : undefined,
|
||||
next,
|
||||
});
|
||||
}
|
||||
|
||||
// Resolve tools for the subagent (with isSubagent=true for policy filtering)
|
||||
const subagentTools = resolveTools({ isSubagent: true });
|
||||
const toolNames = subagentTools.map((t) => t.name);
|
||||
|
||||
// Build system prompt for the child
|
||||
const systemPrompt = buildSubagentSystemPrompt({
|
||||
requesterSessionId,
|
||||
childSessionId,
|
||||
label,
|
||||
task,
|
||||
tools: toolNames,
|
||||
});
|
||||
|
||||
// Spawn child agent via Hub
|
||||
try {
|
||||
const hub = getHub();
|
||||
const childAgent = hub.createSubagent(childSessionId, {
|
||||
systemPrompt,
|
||||
model,
|
||||
provider: options.provider,
|
||||
});
|
||||
|
||||
// Register the run for lifecycle tracking.
|
||||
// The write is deferred via the start callback so the child only
|
||||
// begins work once a concurrency slot is available in the queue.
|
||||
registerSubagentRun({
|
||||
runId,
|
||||
childSessionId,
|
||||
requesterSessionId,
|
||||
task,
|
||||
label,
|
||||
cleanup,
|
||||
timeoutSeconds,
|
||||
announce: groupId ? "silent" : announce,
|
||||
groupId,
|
||||
start: () => childAgent.write(task),
|
||||
});
|
||||
|
||||
// Build response text
|
||||
const groupInfo = groupId ? `\nGroup: ${groupId}` : "";
|
||||
const nextInfo = next ? `\nContinuation: "${next.slice(0, 100)}${next.length > 100 ? "…" : ""}"` : "";
|
||||
const responseText =
|
||||
`Subagent spawned: ${label || task.slice(0, 80)}\n` +
|
||||
`Run: ${runId}${groupInfo}${nextInfo}\n\n` +
|
||||
`⏳ WAITING FOR RESULTS — do NOT proceed with work that depends on these results.\n` +
|
||||
`Do NOT fabricate data or completion status. Results will arrive in your context automatically.`;
|
||||
|
||||
return {
|
||||
content: [{ type: "text", text: responseText }],
|
||||
details: {
|
||||
status: "accepted",
|
||||
childSessionId,
|
||||
runId,
|
||||
groupId,
|
||||
},
|
||||
};
|
||||
} catch (err) {
|
||||
const message = err instanceof Error ? err.message : String(err);
|
||||
return {
|
||||
content: [{ type: "text", text: `Error spawning subagent: ${message}` }],
|
||||
details: {
|
||||
status: "error",
|
||||
error: message,
|
||||
},
|
||||
};
|
||||
}
|
||||
},
|
||||
};
|
||||
}
|
||||
|
|
@ -39,7 +39,7 @@ const TOOL_DISPLAY: Record<string, { label: string; icon: LucideIcon }> = {
|
|||
memory_set: { label: "MemorySet", icon: Database },
|
||||
memory_delete: { label: "MemoryDelete", icon: Database },
|
||||
memory_list: { label: "MemoryList", icon: Database },
|
||||
sessions_spawn: { label: "SpawnSession", icon: GitBranch },
|
||||
delegate: { label: "Delegate", icon: GitBranch },
|
||||
data: { label: "Data", icon: BarChart3 },
|
||||
}
|
||||
|
||||
|
|
@ -81,11 +81,12 @@ function getSubtitle(toolName: string, args?: Record<string, unknown>): string {
|
|||
const ticker = params?.ticker ? String(params.ticker).toUpperCase() : ""
|
||||
return ticker ? `${action} ${ticker}` : action
|
||||
}
|
||||
case "sessions_spawn": {
|
||||
const label = args.label ? String(args.label) : ""
|
||||
if (label) return label.length > 60 ? label.slice(0, 57) + "…" : label
|
||||
const task = String(args.task ?? "")
|
||||
return task.length > 60 ? task.slice(0, 57) + "…" : task
|
||||
case "delegate": {
|
||||
const tasks = args.tasks as Array<{ label?: string; task?: string }> | undefined
|
||||
if (!tasks?.length) return ""
|
||||
const labels = tasks.map((t, i) => t.label || `Task ${i + 1}`)
|
||||
const summary = labels.join(", ")
|
||||
return summary.length > 60 ? summary.slice(0, 57) + "…" : summary
|
||||
}
|
||||
default:
|
||||
return ""
|
||||
|
|
@ -106,7 +107,7 @@ const RUNNING_LABELS: Record<string, string> = {
|
|||
web_search: "searching…",
|
||||
web_fetch: "fetching…",
|
||||
data: "fetching…",
|
||||
sessions_spawn: "spawning…",
|
||||
delegate: "delegating…",
|
||||
}
|
||||
|
||||
/** Stats derived from tool result content */
|
||||
|
|
|
|||
|
|
@ -217,7 +217,7 @@ async function runTask(
|
|||
enableSkills: false,
|
||||
tools: {
|
||||
// Only allow coding tools — no web, no cron, no sessions
|
||||
deny: ["web_fetch", "web_search", "cron", "data", "sessions_spawn", "sessions_list", "memory_search", "send_file"],
|
||||
deny: ["web_fetch", "web_search", "cron", "data", "delegate", "memory_search", "send_file"],
|
||||
},
|
||||
};
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue