Merge remote-tracking branch 'origin/main' into Bohan-J/desktop-auto-update
This commit is contained in:
commit
e9efb66fb2
29 changed files with 2079 additions and 50 deletions
|
|
@ -272,6 +272,18 @@ describe("AsyncAgent internal flow", () => {
|
|||
agent.close();
|
||||
});
|
||||
|
||||
it("does not persist assistant summary when result text is a NO_REPLY variant", async () => {
|
||||
runInternalMock.mockResolvedValueOnce({ text: "NO_REPLY.", thinking: undefined, error: undefined });
|
||||
const agent = new AsyncAgent();
|
||||
|
||||
agent.writeInternal("announce findings", { forwardAssistant: true, persistResponse: true });
|
||||
await agent.waitForIdle();
|
||||
|
||||
expect(persistAssistantSummaryMock).not.toHaveBeenCalled();
|
||||
|
||||
agent.close();
|
||||
});
|
||||
|
||||
it("does not persist assistant summary when result text is empty", async () => {
|
||||
runInternalMock.mockResolvedValueOnce({ text: " ", thinking: undefined, error: undefined });
|
||||
const agent = new AsyncAgent();
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@ import { Channel } from "./channel.js";
|
|||
import type { AgentOptions, Message } from "./types.js";
|
||||
import type { MulticaEvent } from "./events.js";
|
||||
import { injectMessageTimestamp } from "./message-timestamp.js";
|
||||
import { isSilentReplyText } from "./tokens.js";
|
||||
|
||||
const devNull = { write: () => true } as unknown as NodeJS.WritableStream;
|
||||
|
||||
|
|
@ -31,6 +32,7 @@ export class AsyncAgent {
|
|||
private pendingWrites = 0;
|
||||
private closeCallbacks: Array<() => void> = [];
|
||||
private forwardInternalAssistant = false;
|
||||
private _lastRunError: string | undefined;
|
||||
readonly sessionId: string;
|
||||
|
||||
constructor(options?: AgentOptions) {
|
||||
|
|
@ -64,13 +66,19 @@ export class AsyncAgent {
|
|||
|
||||
this.queue = this.queue
|
||||
.then(async () => {
|
||||
if (this._closed) return;
|
||||
if (this._closed) {
|
||||
console.log(`[AsyncAgent:${this.sessionId.slice(0, 8)}] write() skipped — agent closed`);
|
||||
return;
|
||||
}
|
||||
console.log(`[AsyncAgent:${this.sessionId.slice(0, 8)}] run() starting for message: ${content.slice(0, 80)}`);
|
||||
const result = await this.agent.run(message, { displayPrompt: content });
|
||||
console.log(`[AsyncAgent:${this.sessionId.slice(0, 8)}] run() completed, error=${result.error ?? "none"}`);
|
||||
// Flush pending session writes so waitForIdle() callers
|
||||
// can safely read session data from disk.
|
||||
await this.agent.flushSession();
|
||||
// Normal text is delivered via message_end event; only handle errors here
|
||||
if (result.error) {
|
||||
this._lastRunError = result.error;
|
||||
console.error(`[AsyncAgent] Agent run error: ${result.error}`);
|
||||
this.channel.send({ id: uuidv7(), content: `[error] ${result.error}` });
|
||||
// Only emit agent_error for HTTP 401 from the LLM provider so the
|
||||
|
|
@ -83,6 +91,7 @@ export class AsyncAgent {
|
|||
})
|
||||
.catch((err) => {
|
||||
const message = err instanceof Error ? err.message : String(err);
|
||||
this._lastRunError = message;
|
||||
console.error(`[AsyncAgent] Agent run exception: ${message}`);
|
||||
this.channel.send({ id: uuidv7(), content: `[error] ${message}` });
|
||||
// Only emit agent_error for HTTP 401 from the LLM provider so the
|
||||
|
|
@ -120,8 +129,11 @@ export class AsyncAgent {
|
|||
// Internal run errors are for diagnostics only; do not leak to user stream.
|
||||
console.error(`[AsyncAgent] Internal run error: ${result.error}`);
|
||||
}
|
||||
// Stop forwarding BEFORE persist to avoid double-emitting the same
|
||||
// assistant message (once from runInternal streaming, once from appendMessage).
|
||||
this.forwardInternalAssistant = prevForward;
|
||||
// Persist the LLM summary so it remains in parent context for future turns
|
||||
if (persistResponse && result.text?.trim() && result.text.trim() !== "NO_REPLY") {
|
||||
if (persistResponse && result.text?.trim() && !isSilentReplyText(result.text)) {
|
||||
this.agent.persistAssistantSummary(result.text.trim());
|
||||
await this.agent.flushSession();
|
||||
}
|
||||
|
|
@ -164,6 +176,43 @@ export class AsyncAgent {
|
|||
return this.queue;
|
||||
}
|
||||
|
||||
/** Error message from the last run, if it failed. */
|
||||
get lastRunError(): string | undefined {
|
||||
return this._lastRunError;
|
||||
}
|
||||
|
||||
/** Whether the agent is currently executing a run (normal or internal). */
|
||||
get isRunning(): boolean {
|
||||
return this.agent.isRunning;
|
||||
}
|
||||
|
||||
/** Whether the underlying LLM is currently streaming a response. */
|
||||
get isStreaming(): boolean {
|
||||
return this.agent.isStreaming;
|
||||
}
|
||||
|
||||
/**
|
||||
* Steer the agent mid-run. Bypasses the serial queue and injects a message
|
||||
* directly into the PiAgentCore steering queue. The message is delivered
|
||||
* after the current tool execution completes, skipping remaining tool calls.
|
||||
*/
|
||||
steer(content: string): void {
|
||||
this.agent.steer(content);
|
||||
}
|
||||
|
||||
/**
|
||||
* Queue a follow-up message for after the current run finishes.
|
||||
* Delivered only when the agent has no more tool calls or steering messages.
|
||||
*/
|
||||
followUp(content: string): void {
|
||||
this.agent.followUp(content);
|
||||
}
|
||||
|
||||
/** Whether the underlying PiAgentCore has queued steer/followUp messages. */
|
||||
hasQueuedMessages(): boolean {
|
||||
return this.agent.hasQueuedMessages();
|
||||
}
|
||||
|
||||
private shouldForwardEvent(event: AgentEvent | MulticaEvent): boolean {
|
||||
if (!this.agent.isInternalRun) return true;
|
||||
if (!this.forwardInternalAssistant) return false;
|
||||
|
|
|
|||
|
|
@ -44,7 +44,8 @@ function buildCoreTemplate(): string {
|
|||
},
|
||||
tools: {
|
||||
// brave: { apiKey: "brv-..." },
|
||||
// perplexity: { apiKey: "pplx-...", baseUrl: "https://api.perplexity.ai", model: "perplexity/sonar-pro" }
|
||||
// perplexity: { apiKey: "pplx-...", baseUrl: "https://api.perplexity.ai", model: "perplexity/sonar-pro" },
|
||||
// data: { apiKey: "your-financial-datasets-api-key" }
|
||||
}
|
||||
}
|
||||
`;
|
||||
|
|
|
|||
|
|
@ -91,6 +91,7 @@ export class Agent {
|
|||
|
||||
// Internal run state
|
||||
private _internalRun = false;
|
||||
private _isRunning = false;
|
||||
private _runMutex: Promise<void> = Promise.resolve();
|
||||
private currentUserDisplayPrompt: string | undefined;
|
||||
|
||||
|
|
@ -304,8 +305,8 @@ export class Agent {
|
|||
const mergedToolsConfig = mergeToolsConfig(profileToolsConfig, options.tools);
|
||||
const profileDir = this.profile?.getProfileDir();
|
||||
this.toolsOptions = mergedToolsConfig
|
||||
? { ...options, tools: mergedToolsConfig, profileDir }
|
||||
: { ...options, profileDir };
|
||||
? { ...options, tools: mergedToolsConfig, profileDir, provider: this.resolvedProvider }
|
||||
: { ...options, profileDir, provider: this.resolvedProvider };
|
||||
|
||||
const tools = resolveTools(this.toolsOptions);
|
||||
if (this.debug) {
|
||||
|
|
@ -427,6 +428,7 @@ export class Agent {
|
|||
this.refreshAuthState();
|
||||
this.output.state.lastAssistantText = "";
|
||||
this.currentUserDisplayPrompt = options?.displayPrompt;
|
||||
this._isRunning = true;
|
||||
|
||||
try {
|
||||
// Early validation: check API key before calling PiAgentCore.prompt(),
|
||||
|
|
@ -489,6 +491,7 @@ export class Agent {
|
|||
: undefined;
|
||||
return { text: this.output.state.lastAssistantText, thinking, error: this.agent.state.error };
|
||||
} finally {
|
||||
this._isRunning = false;
|
||||
this.currentUserDisplayPrompt = undefined;
|
||||
}
|
||||
}
|
||||
|
|
@ -664,6 +667,40 @@ export class Agent {
|
|||
return this._internalRun;
|
||||
}
|
||||
|
||||
/** Whether a run (normal or internal) is currently executing inside _run(). */
|
||||
get isRunning(): boolean {
|
||||
return this._isRunning;
|
||||
}
|
||||
|
||||
/** Whether the underlying PiAgentCore is currently streaming an LLM response. */
|
||||
get isStreaming(): boolean {
|
||||
return this.agent.state.isStreaming;
|
||||
}
|
||||
|
||||
/**
|
||||
* Queue a steering message to interrupt the agent mid-run.
|
||||
* Delivered after current tool execution, skipping remaining tool calls.
|
||||
* Safe to call from any context (does not require the run mutex).
|
||||
*/
|
||||
steer(content: string): void {
|
||||
const msg: UserMessage = { role: "user", content, timestamp: Date.now() };
|
||||
this.agent.steer(msg);
|
||||
}
|
||||
|
||||
/**
|
||||
* Queue a follow-up message for after the current run finishes.
|
||||
* Delivered only when the agent has no more tool calls or steering messages.
|
||||
*/
|
||||
followUp(content: string): void {
|
||||
const msg: UserMessage = { role: "user", content, timestamp: Date.now() };
|
||||
this.agent.followUp(msg);
|
||||
}
|
||||
|
||||
/** Whether the underlying PiAgentCore has queued steer/followUp messages. */
|
||||
hasQueuedMessages(): boolean {
|
||||
return this.agent.hasQueuedMessages();
|
||||
}
|
||||
|
||||
/**
|
||||
* Persist a synthetic assistant message into both in-memory state and session JSONL.
|
||||
* Used after an internal run to keep the LLM summary visible in future turns
|
||||
|
|
@ -895,6 +932,13 @@ export class Agent {
|
|||
|
||||
// Update internal state
|
||||
this.resolvedProvider = providerId;
|
||||
// Keep toolsOptions.provider in sync so sessions_spawn inherits the current provider
|
||||
this.toolsOptions = { ...this.toolsOptions, provider: providerId };
|
||||
|
||||
// Reload tools so sessions_spawn picks up the new provider in its closure.
|
||||
// Without this, the existing tool instance still captures the old provider.
|
||||
const tools = resolveTools(this.toolsOptions);
|
||||
this.agent.setTools(tools);
|
||||
|
||||
// Update session metadata (save original providerId, not alias-resolved)
|
||||
this.session.saveMeta({
|
||||
|
|
@ -906,7 +950,7 @@ export class Agent {
|
|||
});
|
||||
|
||||
// Rebuild system prompt so runtime info reflects the new provider/model
|
||||
const toolNames = (this.agent.state.tools ?? []).map((t: { name: string }) => t.name);
|
||||
const toolNames = tools.map((t) => t.name);
|
||||
const systemPrompt = this.rebuildSystemPrompt(toolNames);
|
||||
if (systemPrompt) {
|
||||
this.agent.setSystemPrompt(systemPrompt);
|
||||
|
|
|
|||
172
packages/core/src/agent/subagent/README.md
Normal file
172
packages/core/src/agent/subagent/README.md
Normal file
|
|
@ -0,0 +1,172 @@
|
|||
# Subagent System
|
||||
|
||||
The subagent system allows a parent agent to spawn isolated child agents that run tasks in parallel and report results back automatically.
|
||||
|
||||
## Architecture Overview
|
||||
|
||||
```
|
||||
┌─────────────────────────────────────────────────────────────────────┐
|
||||
│ Parent Agent (runner.ts) │
|
||||
│ │
|
||||
│ tools: sessions_spawn, sessions_list │
|
||||
│ state: resolvedProvider, toolsOptions │
|
||||
└──────────┬──────────────────────────────────────────────────────────┘
|
||||
│
|
||||
│ sessions_spawn(task, label, timeoutSeconds)
|
||||
▼
|
||||
┌─────────────────────────────────────────────────────────────────────┐
|
||||
│ Spawn Flow (sessions-spawn.ts) │
|
||||
│ │
|
||||
│ 1. Build subagent system prompt (announce.ts) │
|
||||
│ 2. hub.createSubagent(childSessionId, { provider, model }) │
|
||||
│ 3. registerSubagentRun({ start: () => childAgent.write(task) }) │
|
||||
│ 4. Return { status: "accepted", runId, childSessionId } │
|
||||
└──────────┬──────────────────────────────────────────────────────────┘
|
||||
│
|
||||
▼
|
||||
┌─────────────────────────────────────────────────────────────────────┐
|
||||
│ Concurrency Queue (command-queue.ts) │
|
||||
│ │
|
||||
│ Lane: "subagent" — max 10 concurrent (configurable) │
|
||||
│ Queued runs wait for a slot before start() is called │
|
||||
└──────────┬──────────────────────────────────────────────────────────┘
|
||||
│ slot acquired
|
||||
▼
|
||||
┌─────────────────────────────────────────────────────────────────────┐
|
||||
│ Child Agent Execution │
|
||||
│ │
|
||||
│ ┌───────────────────────────────────────────────────────────────┐ │
|
||||
│ │ AsyncAgent (async-agent.ts) │ │
|
||||
│ │ - Isolated session with restricted tools (isSubagent=true) │ │
|
||||
│ │ - Inherits parent's LLM provider │ │
|
||||
│ │ - System prompt: task focus + error reporting rules │ │
|
||||
│ │ - Tracks lastRunError for error propagation │ │
|
||||
│ └───────────────────────────────────────────────────────────────┘ │
|
||||
│ │
|
||||
│ ┌───────────────────────────────────────────────────────────────┐ │
|
||||
│ │ watchChildAgent (registry.ts) │ │
|
||||
│ │ - Sets startedAt, starts timeout timer │ │
|
||||
│ │ - waitForIdle() — waits for child's task queue to drain │ │
|
||||
│ │ - onClose() — handles explicit close (timeout kill, etc.) │ │
|
||||
│ └───────────────────────────────────────────────────────────────┘ │
|
||||
└──────────┬──────────────────────────────────────────────────────────┘
|
||||
│
|
||||
│ child completes / errors / times out
|
||||
▼
|
||||
┌─────────────────────────────────────────────────────────────────────┐
|
||||
│ Completion Handling (registry.ts) │
|
||||
│ │
|
||||
│ handleRunCompletion(record) │
|
||||
│ │ │
|
||||
│ ├─ Phase 1: captureFindings() │
|
||||
│ │ - Read last assistant reply from child session JSONL │
|
||||
│ │ - Falls back to last toolResult if no assistant text │
|
||||
│ │ - Persists findings to record before session deletion │
|
||||
│ │ │
|
||||
│ ├─ Session Cleanup │
|
||||
│ │ - cleanup="delete": rm child session dir + hub.closeAgent() │
|
||||
│ │ - cleanup="keep": preserve for audit │
|
||||
│ │ │
|
||||
│ └─ Phase 2: checkAndAnnounce(requesterSessionId) │
|
||||
│ - Finds all unannounced, completed runs with findings │
|
||||
│ - Calls runCoalescedAnnounceFlow() │
|
||||
│ - Marks records: announced=true, archiveAtMs=now+60min │
|
||||
└──────────┬──────────────────────────────────────────────────────────┘
|
||||
│
|
||||
▼
|
||||
┌─────────────────────────────────────────────────────────────────────┐
|
||||
│ Announcement Delivery (announce.ts) │
|
||||
│ │
|
||||
│ runCoalescedAnnounceFlow(requesterSessionId, records) │
|
||||
│ │ │
|
||||
│ ├─ Format message: formatCoalescedAnnouncementMessage() │
|
||||
│ │ - Single record: task name, status, findings, stats │
|
||||
│ │ - Multiple records: combined report with all findings │
|
||||
│ │ │
|
||||
│ ├─ Two-tier delivery: │
|
||||
│ │ │
|
||||
│ │ Tier 1: BUSY (parent running or has pending writes) │
|
||||
│ │ └─ enqueueAnnounce() → announce-queue.ts │
|
||||
│ │ - Debounce 1s to batch nearby completions │
|
||||
│ │ - Drain via writeInternal() when parent finishes │
|
||||
│ │ │
|
||||
│ │ Tier 2: IDLE (parent not running) │
|
||||
│ │ └─ sendAnnounceDirect() │
|
||||
│ │ - writeInternal(msg, { forwardAssistant, persistResponse })│
|
||||
│ │ │
|
||||
│ └─ All delivery uses writeInternal() (marks as internal: true) │
|
||||
│ → Prevents announcement from showing as user bubble in UI │
|
||||
│ → LLM processes findings and responds naturally to user │
|
||||
└──────────┬──────────────────────────────────────────────────────────┘
|
||||
│
|
||||
▼
|
||||
┌─────────────────────────────────────────────────────────────────────┐
|
||||
│ Record Lifecycle (registry.ts) │
|
||||
│ │
|
||||
│ created → startedAt → endedAt → findingsCaptured → announced │
|
||||
│ │
|
||||
│ After announcement: │
|
||||
│ - Record kept with archiveAtMs = now + 60 min │
|
||||
│ - sessions_list can still query records during this window │
|
||||
│ - Sweeper runs every 60s, removes expired records │
|
||||
│ - When all records removed, sweeper stops │
|
||||
└─────────────────────────────────────────────────────────────────────┘
|
||||
```
|
||||
|
||||
## Key Files
|
||||
|
||||
| File | Purpose |
|
||||
|------|---------|
|
||||
| `sessions-spawn.ts` | Tool: spawns a child agent with task, label, timeout, provider |
|
||||
| `sessions-list.ts` | Tool: lists subagent runs and their status |
|
||||
| `registry.ts` | Lifecycle management: register, watch, capture, announce, archive |
|
||||
| `announce.ts` | System prompt builder, findings reader, message formatter, delivery |
|
||||
| `announce-queue.ts` | Debounced queue for batching announcements when parent is busy |
|
||||
| `command-queue.ts` | Concurrency limiter for subagent lane slots |
|
||||
| `lanes.ts` | Lane config: max concurrency (10), default timeout (600s) |
|
||||
| `types.ts` | Shared types: SubagentRunRecord, SubagentRunOutcome, etc. |
|
||||
| `registry-store.ts` | Persistence: save/load runs to disk for crash recovery |
|
||||
|
||||
## Provider Inheritance
|
||||
|
||||
Subagents inherit the parent's resolved LLM provider:
|
||||
|
||||
```
|
||||
runner.ts (resolvedProvider)
|
||||
→ toolsOptions.provider
|
||||
→ tools.ts (CreateToolsOptions.provider)
|
||||
→ sessions-spawn.ts (options.provider)
|
||||
→ hub.createSubagent({ provider })
|
||||
```
|
||||
|
||||
When the user switches providers via UI (`setProvider()`), `toolsOptions.provider` is updated in sync so future spawns use the new provider.
|
||||
|
||||
## Error Propagation
|
||||
|
||||
```
|
||||
Child tool error (e.g., API 401)
|
||||
→ Subagent LLM sees error, includes in final message (system prompt rule)
|
||||
→ captureFindings() reads final message
|
||||
→ Announcement includes error in findings
|
||||
→ Parent LLM sees error and can inform user
|
||||
|
||||
Child run error (e.g., missing API key for provider)
|
||||
→ AsyncAgent._lastRunError set
|
||||
→ registry.ts checks childAgent.lastRunError after waitForIdle()
|
||||
→ outcome = { status: "error", error: "No API key configured..." }
|
||||
→ Announcement: "task failed: No API key configured..."
|
||||
```
|
||||
|
||||
## Timeout Behavior
|
||||
|
||||
Default: 600s (10 min). System prompt guides the parent LLM:
|
||||
- Simple tasks: 600s (default)
|
||||
- Moderate tasks: 900-1200s (15-20 min)
|
||||
- Complex tasks: 1200-1800s (20-30 min)
|
||||
|
||||
On timeout:
|
||||
1. Timeout timer fires in `watchChildAgent()`
|
||||
2. `cleanup({ status: "timeout" })` is called
|
||||
3. Child agent is closed via `hub.closeAgent()`
|
||||
4. Findings are captured from whatever the child wrote so far
|
||||
5. Announcement reports "timed out" with partial findings
|
||||
203
packages/core/src/agent/subagent/announce-queue.test.ts
Normal file
203
packages/core/src/agent/subagent/announce-queue.test.ts
Normal file
|
|
@ -0,0 +1,203 @@
|
|||
import { afterEach, describe, expect, it, vi } from "vitest";
|
||||
import {
|
||||
enqueueAnnounce,
|
||||
resetAnnounceQueuesForTests,
|
||||
getAnnounceQueueDepth,
|
||||
type AnnounceQueueItem,
|
||||
type AnnounceQueueSettings,
|
||||
} from "./announce-queue.js";
|
||||
|
||||
afterEach(() => {
|
||||
resetAnnounceQueuesForTests();
|
||||
});
|
||||
|
||||
function makeItem(overrides?: Partial<AnnounceQueueItem>): AnnounceQueueItem {
|
||||
return {
|
||||
prompt: "test prompt",
|
||||
summaryLine: "test summary",
|
||||
enqueuedAt: Date.now(),
|
||||
requesterSessionId: "session-1",
|
||||
...overrides,
|
||||
};
|
||||
}
|
||||
|
||||
const FAST_SETTINGS: AnnounceQueueSettings = {
|
||||
mode: "followup",
|
||||
debounceMs: 0,
|
||||
cap: 20,
|
||||
dropPolicy: "old",
|
||||
};
|
||||
|
||||
describe("announce queue", () => {
|
||||
it("enqueues an item and drains via send callback", async () => {
|
||||
const sent: AnnounceQueueItem[] = [];
|
||||
const send = async (item: AnnounceQueueItem) => { sent.push(item); };
|
||||
|
||||
enqueueAnnounce({
|
||||
key: "test",
|
||||
item: makeItem(),
|
||||
settings: FAST_SETTINGS,
|
||||
send,
|
||||
});
|
||||
|
||||
// Wait for async drain
|
||||
await new Promise((r) => setTimeout(r, 50));
|
||||
|
||||
expect(sent).toHaveLength(1);
|
||||
expect(sent[0]!.prompt).toBe("test prompt");
|
||||
});
|
||||
|
||||
it("batches items in collect mode", async () => {
|
||||
const sent: AnnounceQueueItem[] = [];
|
||||
const send = async (item: AnnounceQueueItem) => { sent.push(item); };
|
||||
|
||||
const collectSettings: AnnounceQueueSettings = {
|
||||
mode: "collect",
|
||||
debounceMs: 0,
|
||||
cap: 20,
|
||||
dropPolicy: "old",
|
||||
};
|
||||
|
||||
enqueueAnnounce({
|
||||
key: "test",
|
||||
item: makeItem({ prompt: "prompt 1" }),
|
||||
settings: collectSettings,
|
||||
send,
|
||||
});
|
||||
enqueueAnnounce({
|
||||
key: "test",
|
||||
item: makeItem({ prompt: "prompt 2" }),
|
||||
settings: collectSettings,
|
||||
send,
|
||||
});
|
||||
enqueueAnnounce({
|
||||
key: "test",
|
||||
item: makeItem({ prompt: "prompt 3" }),
|
||||
settings: collectSettings,
|
||||
send,
|
||||
});
|
||||
|
||||
await new Promise((r) => setTimeout(r, 50));
|
||||
|
||||
// Collect mode batches all into one send
|
||||
expect(sent).toHaveLength(1);
|
||||
expect(sent[0]!.prompt).toContain("prompt 1");
|
||||
expect(sent[0]!.prompt).toContain("prompt 2");
|
||||
expect(sent[0]!.prompt).toContain("prompt 3");
|
||||
expect(sent[0]!.prompt).toContain("3 queued announce(s)");
|
||||
});
|
||||
|
||||
it("sends items individually in followup mode", async () => {
|
||||
const sent: AnnounceQueueItem[] = [];
|
||||
const send = async (item: AnnounceQueueItem) => { sent.push(item); };
|
||||
|
||||
enqueueAnnounce({
|
||||
key: "test",
|
||||
item: makeItem({ prompt: "prompt A" }),
|
||||
settings: FAST_SETTINGS,
|
||||
send,
|
||||
});
|
||||
enqueueAnnounce({
|
||||
key: "test",
|
||||
item: makeItem({ prompt: "prompt B" }),
|
||||
settings: FAST_SETTINGS,
|
||||
send,
|
||||
});
|
||||
|
||||
await new Promise((r) => setTimeout(r, 50));
|
||||
|
||||
expect(sent).toHaveLength(2);
|
||||
expect(sent[0]!.prompt).toBe("prompt A");
|
||||
expect(sent[1]!.prompt).toBe("prompt B");
|
||||
});
|
||||
|
||||
it("respects cap with 'new' drop policy (rejects new items)", async () => {
|
||||
const sent: AnnounceQueueItem[] = [];
|
||||
const send = async (item: AnnounceQueueItem) => {
|
||||
// Slow send to keep items in queue
|
||||
await new Promise((r) => setTimeout(r, 200));
|
||||
sent.push(item);
|
||||
};
|
||||
|
||||
const cappedSettings: AnnounceQueueSettings = {
|
||||
mode: "followup",
|
||||
debounceMs: 0,
|
||||
cap: 2,
|
||||
dropPolicy: "new",
|
||||
};
|
||||
|
||||
const r1 = enqueueAnnounce({ key: "test", item: makeItem({ prompt: "1" }), settings: cappedSettings, send });
|
||||
const r2 = enqueueAnnounce({ key: "test", item: makeItem({ prompt: "2" }), settings: cappedSettings, send });
|
||||
const r3 = enqueueAnnounce({ key: "test", item: makeItem({ prompt: "3" }), settings: cappedSettings, send });
|
||||
|
||||
expect(r1).toBe(true);
|
||||
expect(r2).toBe(true);
|
||||
expect(r3).toBe(false); // Rejected — cap reached
|
||||
});
|
||||
|
||||
it("respects cap with 'old' drop policy (drops oldest)", async () => {
|
||||
const sent: AnnounceQueueItem[] = [];
|
||||
const send = async (item: AnnounceQueueItem) => {
|
||||
await new Promise((r) => setTimeout(r, 200));
|
||||
sent.push(item);
|
||||
};
|
||||
|
||||
const cappedSettings: AnnounceQueueSettings = {
|
||||
mode: "followup",
|
||||
debounceMs: 0,
|
||||
cap: 2,
|
||||
dropPolicy: "old",
|
||||
};
|
||||
|
||||
enqueueAnnounce({ key: "test", item: makeItem({ prompt: "1" }), settings: cappedSettings, send });
|
||||
enqueueAnnounce({ key: "test", item: makeItem({ prompt: "2" }), settings: cappedSettings, send });
|
||||
enqueueAnnounce({ key: "test", item: makeItem({ prompt: "3" }), settings: cappedSettings, send });
|
||||
|
||||
// Queue should have items 2 and 3 (oldest was dropped)
|
||||
expect(getAnnounceQueueDepth("test")).toBeLessThanOrEqual(2);
|
||||
});
|
||||
|
||||
it("cleans up queue after drain completes", async () => {
|
||||
const sent: AnnounceQueueItem[] = [];
|
||||
const send = async (item: AnnounceQueueItem) => { sent.push(item); };
|
||||
|
||||
enqueueAnnounce({
|
||||
key: "test",
|
||||
item: makeItem(),
|
||||
settings: FAST_SETTINGS,
|
||||
send,
|
||||
});
|
||||
|
||||
await new Promise((r) => setTimeout(r, 50));
|
||||
|
||||
expect(sent).toHaveLength(1);
|
||||
expect(getAnnounceQueueDepth("test")).toBe(0);
|
||||
});
|
||||
|
||||
it("debounces before draining", async () => {
|
||||
const sent: AnnounceQueueItem[] = [];
|
||||
const send = async (item: AnnounceQueueItem) => { sent.push(item); };
|
||||
|
||||
const debouncedSettings: AnnounceQueueSettings = {
|
||||
mode: "followup",
|
||||
debounceMs: 100,
|
||||
cap: 20,
|
||||
dropPolicy: "old",
|
||||
};
|
||||
|
||||
enqueueAnnounce({
|
||||
key: "test",
|
||||
item: makeItem(),
|
||||
settings: debouncedSettings,
|
||||
send,
|
||||
});
|
||||
|
||||
// Should not have sent yet (debounce)
|
||||
await new Promise((r) => setTimeout(r, 30));
|
||||
expect(sent).toHaveLength(0);
|
||||
|
||||
// Wait for debounce to complete
|
||||
await new Promise((r) => setTimeout(r, 150));
|
||||
expect(sent).toHaveLength(1);
|
||||
});
|
||||
});
|
||||
315
packages/core/src/agent/subagent/announce-queue.ts
Normal file
315
packages/core/src/agent/subagent/announce-queue.ts
Normal file
|
|
@ -0,0 +1,315 @@
|
|||
/**
|
||||
* Announce queue for subagent result delivery.
|
||||
*
|
||||
* Handles queuing and batching of subagent announcements when the parent
|
||||
* agent is busy. Supports debounce, cap, drop policy, and collect mode.
|
||||
*
|
||||
* Ported from OpenClaw (MIT license), adapted for Super Multica.
|
||||
*/
|
||||
|
||||
// ============================================================================
|
||||
// Types
|
||||
// ============================================================================
|
||||
|
||||
export type AnnounceQueueMode =
|
||||
/** Try steer, no queue fallback */
|
||||
| "steer"
|
||||
/** Try steer, fall back to queue */
|
||||
| "steer-backlog"
|
||||
/** Queue and send items individually */
|
||||
| "followup"
|
||||
/** Queue and batch all items into one combined prompt */
|
||||
| "collect";
|
||||
|
||||
export type AnnounceDropPolicy =
|
||||
/** Drop oldest items when cap reached */
|
||||
| "old"
|
||||
/** Drop newest items when cap reached */
|
||||
| "new"
|
||||
/** Summarize dropped items */
|
||||
| "summarize";
|
||||
|
||||
export type AnnounceQueueItem = {
|
||||
prompt: string;
|
||||
summaryLine?: string;
|
||||
enqueuedAt: number;
|
||||
requesterSessionId: string;
|
||||
};
|
||||
|
||||
export type AnnounceQueueSettings = {
|
||||
mode: AnnounceQueueMode;
|
||||
debounceMs?: number;
|
||||
cap?: number;
|
||||
dropPolicy?: AnnounceDropPolicy;
|
||||
};
|
||||
|
||||
type AnnounceQueueState = {
|
||||
items: AnnounceQueueItem[];
|
||||
draining: boolean;
|
||||
lastEnqueuedAt: number;
|
||||
mode: AnnounceQueueMode;
|
||||
debounceMs: number;
|
||||
cap: number;
|
||||
dropPolicy: AnnounceDropPolicy;
|
||||
droppedCount: number;
|
||||
summaryLines: string[];
|
||||
send: (item: AnnounceQueueItem) => Promise<void>;
|
||||
};
|
||||
|
||||
// ============================================================================
|
||||
// Defaults
|
||||
// ============================================================================
|
||||
|
||||
const DEFAULT_DEBOUNCE_MS = 1000;
|
||||
const DEFAULT_CAP = 20;
|
||||
const DEFAULT_DROP_POLICY: AnnounceDropPolicy = "summarize";
|
||||
|
||||
export const DEFAULT_ANNOUNCE_SETTINGS: AnnounceQueueSettings = {
|
||||
mode: "steer-backlog",
|
||||
debounceMs: DEFAULT_DEBOUNCE_MS,
|
||||
cap: DEFAULT_CAP,
|
||||
dropPolicy: DEFAULT_DROP_POLICY,
|
||||
};
|
||||
|
||||
// ============================================================================
|
||||
// Module state
|
||||
// ============================================================================
|
||||
|
||||
const ANNOUNCE_QUEUES = new Map<string, AnnounceQueueState>();
|
||||
|
||||
// ============================================================================
|
||||
// Public API
|
||||
// ============================================================================
|
||||
|
||||
/**
|
||||
* Enqueue an announcement for delivery. Returns true if enqueued,
|
||||
* false if dropped (cap + "new" drop policy).
|
||||
*/
|
||||
export function enqueueAnnounce(params: {
|
||||
key: string;
|
||||
item: AnnounceQueueItem;
|
||||
settings: AnnounceQueueSettings;
|
||||
send: (item: AnnounceQueueItem) => Promise<void>;
|
||||
}): boolean {
|
||||
const queue = getOrCreateQueue(params.key, params.settings, params.send);
|
||||
queue.lastEnqueuedAt = Date.now();
|
||||
|
||||
const shouldEnqueue = applyDropPolicy(queue, params.item);
|
||||
if (!shouldEnqueue) {
|
||||
if (queue.dropPolicy === "new") {
|
||||
scheduleAnnounceDrain(params.key);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
queue.items.push(params.item);
|
||||
scheduleAnnounceDrain(params.key);
|
||||
return true;
|
||||
}
|
||||
|
||||
/** Reset all queues (for testing). */
|
||||
export function resetAnnounceQueuesForTests(): void {
|
||||
ANNOUNCE_QUEUES.clear();
|
||||
}
|
||||
|
||||
/** Get the current queue depth for a key (for testing/diagnostics). */
|
||||
export function getAnnounceQueueDepth(key: string): number {
|
||||
return ANNOUNCE_QUEUES.get(key)?.items.length ?? 0;
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Queue management
|
||||
// ============================================================================
|
||||
|
||||
function getOrCreateQueue(
|
||||
key: string,
|
||||
settings: AnnounceQueueSettings,
|
||||
send: (item: AnnounceQueueItem) => Promise<void>,
|
||||
): AnnounceQueueState {
|
||||
const existing = ANNOUNCE_QUEUES.get(key);
|
||||
if (existing) {
|
||||
existing.mode = settings.mode;
|
||||
if (typeof settings.debounceMs === "number") {
|
||||
existing.debounceMs = Math.max(0, settings.debounceMs);
|
||||
}
|
||||
if (typeof settings.cap === "number" && settings.cap > 0) {
|
||||
existing.cap = Math.floor(settings.cap);
|
||||
}
|
||||
if (settings.dropPolicy) {
|
||||
existing.dropPolicy = settings.dropPolicy;
|
||||
}
|
||||
existing.send = send;
|
||||
return existing;
|
||||
}
|
||||
|
||||
const created: AnnounceQueueState = {
|
||||
items: [],
|
||||
draining: false,
|
||||
lastEnqueuedAt: 0,
|
||||
mode: settings.mode,
|
||||
debounceMs:
|
||||
typeof settings.debounceMs === "number"
|
||||
? Math.max(0, settings.debounceMs)
|
||||
: DEFAULT_DEBOUNCE_MS,
|
||||
cap:
|
||||
typeof settings.cap === "number" && settings.cap > 0
|
||||
? Math.floor(settings.cap)
|
||||
: DEFAULT_CAP,
|
||||
dropPolicy: settings.dropPolicy ?? DEFAULT_DROP_POLICY,
|
||||
droppedCount: 0,
|
||||
summaryLines: [],
|
||||
send,
|
||||
};
|
||||
ANNOUNCE_QUEUES.set(key, created);
|
||||
return created;
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Drop policy
|
||||
// ============================================================================
|
||||
|
||||
function applyDropPolicy(
|
||||
queue: AnnounceQueueState,
|
||||
item: AnnounceQueueItem,
|
||||
): boolean {
|
||||
if (queue.items.length < queue.cap) {
|
||||
return true;
|
||||
}
|
||||
|
||||
switch (queue.dropPolicy) {
|
||||
case "new":
|
||||
// Reject the incoming item
|
||||
return false;
|
||||
|
||||
case "old": {
|
||||
// Drop the oldest item to make room
|
||||
const dropped = queue.items.shift();
|
||||
if (dropped) {
|
||||
queue.droppedCount++;
|
||||
const summary = dropped.summaryLine?.trim() || dropped.prompt.slice(0, 80);
|
||||
queue.summaryLines.push(summary);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
case "summarize": {
|
||||
// Drop the oldest item but keep a summary
|
||||
const dropped = queue.items.shift();
|
||||
if (dropped) {
|
||||
queue.droppedCount++;
|
||||
const summary = dropped.summaryLine?.trim() || dropped.prompt.slice(0, 80);
|
||||
queue.summaryLines.push(summary);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
default:
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Drain scheduling
|
||||
// ============================================================================
|
||||
|
||||
function scheduleAnnounceDrain(key: string): void {
|
||||
const queue = ANNOUNCE_QUEUES.get(key);
|
||||
if (!queue || queue.draining) return;
|
||||
|
||||
queue.draining = true;
|
||||
void (async () => {
|
||||
try {
|
||||
while (queue.items.length > 0 || queue.droppedCount > 0) {
|
||||
await waitForDebounce(queue);
|
||||
|
||||
if (queue.mode === "collect") {
|
||||
// Batch all items into one combined prompt
|
||||
const items = queue.items.splice(0, queue.items.length);
|
||||
const summary = buildDropSummary(queue);
|
||||
const prompt = buildCollectPrompt(items, summary);
|
||||
const last = items.at(-1);
|
||||
if (!last) break;
|
||||
await queue.send({ ...last, prompt });
|
||||
continue;
|
||||
}
|
||||
|
||||
// followup / steer-backlog: send items individually
|
||||
const summary = buildDropSummary(queue);
|
||||
if (summary) {
|
||||
const next = queue.items.shift();
|
||||
if (!next) break;
|
||||
await queue.send({ ...next, prompt: summary });
|
||||
continue;
|
||||
}
|
||||
|
||||
const next = queue.items.shift();
|
||||
if (!next) break;
|
||||
await queue.send(next);
|
||||
}
|
||||
} catch (err) {
|
||||
console.error(`[AnnounceQueue] Drain failed for ${key}: ${String(err)}`);
|
||||
} finally {
|
||||
queue.draining = false;
|
||||
if (queue.items.length === 0 && queue.droppedCount === 0) {
|
||||
ANNOUNCE_QUEUES.delete(key);
|
||||
} else {
|
||||
scheduleAnnounceDrain(key);
|
||||
}
|
||||
}
|
||||
})();
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Helpers
|
||||
// ============================================================================
|
||||
|
||||
function waitForDebounce(queue: AnnounceQueueState): Promise<void> {
|
||||
const elapsed = Date.now() - queue.lastEnqueuedAt;
|
||||
const remaining = Math.max(0, queue.debounceMs - elapsed);
|
||||
if (remaining <= 0) return Promise.resolve();
|
||||
return new Promise((resolve) => setTimeout(resolve, remaining));
|
||||
}
|
||||
|
||||
function buildDropSummary(queue: AnnounceQueueState): string | undefined {
|
||||
if (queue.droppedCount === 0) return undefined;
|
||||
|
||||
const parts: string[] = [
|
||||
`[${queue.droppedCount} earlier announce(s) were summarized due to queue backlog]`,
|
||||
];
|
||||
if (queue.summaryLines.length > 0) {
|
||||
parts.push("");
|
||||
for (const line of queue.summaryLines) {
|
||||
parts.push(`- ${line}`);
|
||||
}
|
||||
}
|
||||
|
||||
// Reset counters
|
||||
queue.droppedCount = 0;
|
||||
queue.summaryLines = [];
|
||||
|
||||
return parts.join("\n");
|
||||
}
|
||||
|
||||
function buildCollectPrompt(
|
||||
items: AnnounceQueueItem[],
|
||||
dropSummary: string | undefined,
|
||||
): string {
|
||||
const parts: string[] = [
|
||||
`[${items.length} queued announce(s) while agent was busy]`,
|
||||
"",
|
||||
];
|
||||
|
||||
for (let i = 0; i < items.length; i++) {
|
||||
parts.push(`---`);
|
||||
parts.push(`Queued #${i + 1}`);
|
||||
parts.push(items[i]!.prompt);
|
||||
parts.push("");
|
||||
}
|
||||
|
||||
if (dropSummary) {
|
||||
parts.push(dropSummary);
|
||||
parts.push("");
|
||||
}
|
||||
|
||||
return parts.join("\n");
|
||||
}
|
||||
|
|
@ -16,6 +16,7 @@ import type {
|
|||
SubagentRunRecord,
|
||||
SubagentSystemPromptParams,
|
||||
} from "./types.js";
|
||||
import { enqueueAnnounce, DEFAULT_ANNOUNCE_SETTINGS } from "./announce-queue.js";
|
||||
|
||||
/**
|
||||
* Build the system prompt injected into a subagent session.
|
||||
|
|
@ -275,7 +276,15 @@ export function formatCoalescedAnnouncementMessage(
|
|||
|
||||
/**
|
||||
* Run the coalesced announcement flow for all completed runs of a requester.
|
||||
* Formats a single combined message and delivers it to the parent agent.
|
||||
* Uses two-tier delivery:
|
||||
* 1. Queue — if parent is busy (running or has pending writes), queue for
|
||||
* later drain via writeInternal (with debounce to batch nearby completions)
|
||||
* 2. Direct — if parent is idle, send immediately via writeInternal
|
||||
*
|
||||
* All delivery uses writeInternal() which marks the announcement prompt as
|
||||
* `internal: true` (hidden from UI). The assistant's summary response is
|
||||
* forwarded to the real-time stream (`forwardAssistant: true`) so the user
|
||||
* sees the result, and persisted to JSONL for future session loads.
|
||||
*/
|
||||
export function runCoalescedAnnounceFlow(
|
||||
requesterSessionId: string,
|
||||
|
|
@ -293,7 +302,28 @@ export function runCoalescedAnnounceFlow(
|
|||
return false;
|
||||
}
|
||||
|
||||
parentAgent.writeInternal(message, { forwardAssistant: true, persistResponse: true });
|
||||
// Tier 1: BUSY — parent is running or has pending writes
|
||||
// Queue the announcement for delivery via writeInternal() after the parent
|
||||
// finishes its current work. We do NOT use steer() (cancels unrelated tool
|
||||
// calls) or followUp() (doesn't mark entries as internal, polluting the UI).
|
||||
if (parentAgent.isRunning || parentAgent.getPendingWrites() > 0) {
|
||||
enqueueAnnounce({
|
||||
key: requesterSessionId,
|
||||
item: {
|
||||
prompt: message,
|
||||
summaryLine: `${records.length} subagent(s) completed`,
|
||||
enqueuedAt: Date.now(),
|
||||
requesterSessionId,
|
||||
},
|
||||
settings: DEFAULT_ANNOUNCE_SETTINGS,
|
||||
send: async (item) => sendAnnounceDirect(requesterSessionId, item.prompt),
|
||||
});
|
||||
console.log(`[SubagentAnnounce] Queued announcement for parent: ${requesterSessionId}`);
|
||||
return true;
|
||||
}
|
||||
|
||||
// Tier 2: IDLE — parent is idle, send directly via writeInternal
|
||||
sendAnnounceDirect(requesterSessionId, message);
|
||||
return true;
|
||||
} catch (err) {
|
||||
console.error(`[SubagentAnnounce] Failed to coalesced-announce to parent:`, err);
|
||||
|
|
@ -301,6 +331,26 @@ export function runCoalescedAnnounceFlow(
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Send announcement directly to parent via writeInternal.
|
||||
* Used as Tier 3 (idle) and as the queue drain callback.
|
||||
*/
|
||||
function sendAnnounceDirect(requesterSessionId: string, message: string): void {
|
||||
try {
|
||||
const hub = getHub();
|
||||
const parentAgent = hub.getAgent(requesterSessionId);
|
||||
if (!parentAgent || parentAgent.closed) {
|
||||
console.warn(
|
||||
`[SubagentAnnounce] Parent agent not found or closed for direct send: ${requesterSessionId}`,
|
||||
);
|
||||
return;
|
||||
}
|
||||
parentAgent.writeInternal(message, { forwardAssistant: true, persistResponse: true });
|
||||
} catch (err) {
|
||||
console.error(`[SubagentAnnounce] Failed direct announce to parent:`, err);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Run the full subagent announcement flow:
|
||||
* 1. Read child's last assistant reply
|
||||
|
|
|
|||
|
|
@ -266,8 +266,105 @@ describe("subagent registry — coalescing", () => {
|
|||
});
|
||||
});
|
||||
|
||||
describe("subagent registry — silent announce mode", () => {
|
||||
// Note: In tests (no Hub), watchChildAgent completes synchronously within
|
||||
// registerSubagentRun(), so each run's lifecycle finishes before the next
|
||||
// registration call. Multi-run coalescing requires async child agents and
|
||||
// is validated in integration tests.
|
||||
|
||||
it("stores announce field on the record", () => {
|
||||
const record = registerSubagentRun({
|
||||
runId: "run-ann",
|
||||
childSessionId: "child-ann",
|
||||
requesterSessionId: "parent-1",
|
||||
task: "Task",
|
||||
announce: "silent",
|
||||
});
|
||||
expect(record.announce).toBe("silent");
|
||||
});
|
||||
|
||||
it("defaults announce to undefined (immediate behavior)", () => {
|
||||
const record = registerSubagentRun({
|
||||
runId: "run-def",
|
||||
childSessionId: "child-def",
|
||||
requesterSessionId: "parent-1",
|
||||
task: "Task",
|
||||
});
|
||||
expect(record.announce).toBeUndefined();
|
||||
});
|
||||
|
||||
it("silent runs are announced via runCoalescedAnnounceFlow", async () => {
|
||||
const announceModule = await import("./announce.js");
|
||||
const spy = vi.spyOn(announceModule, "runCoalescedAnnounceFlow").mockReturnValue(true);
|
||||
|
||||
registerSubagentRun({
|
||||
runId: "run-s1",
|
||||
childSessionId: "child-s1",
|
||||
requesterSessionId: "parent-1",
|
||||
task: "Silent A",
|
||||
announce: "silent",
|
||||
});
|
||||
|
||||
await flushQueue();
|
||||
|
||||
// Silent run announced (via runCoalescedAnnounceFlow mock)
|
||||
const silentCalls = spy.mock.calls.filter(
|
||||
([reqId, records]) =>
|
||||
reqId === "parent-1" &&
|
||||
records.some((r: { announce?: string }) => r.announce === "silent"),
|
||||
);
|
||||
expect(silentCalls.length).toBeGreaterThanOrEqual(1);
|
||||
|
||||
const runS1 = getSubagentRun("run-s1");
|
||||
expect(runS1?.announced).toBe(true);
|
||||
expect(runS1?.announce).toBe("silent");
|
||||
|
||||
spy.mockRestore();
|
||||
});
|
||||
|
||||
it("immediate and silent runs are never mixed in the same announce call", async () => {
|
||||
const announceModule = await import("./announce.js");
|
||||
const spy = vi.spyOn(announceModule, "runCoalescedAnnounceFlow").mockReturnValue(true);
|
||||
|
||||
// Register immediate run, then silent run
|
||||
registerSubagentRun({
|
||||
runId: "run-imm",
|
||||
childSessionId: "child-imm",
|
||||
requesterSessionId: "parent-1",
|
||||
task: "Immediate task",
|
||||
});
|
||||
registerSubagentRun({
|
||||
runId: "run-s1",
|
||||
childSessionId: "child-s1",
|
||||
requesterSessionId: "parent-1",
|
||||
task: "Silent task",
|
||||
announce: "silent",
|
||||
});
|
||||
|
||||
await flushQueue();
|
||||
|
||||
const calls = spy.mock.calls.filter(
|
||||
([reqId]) => reqId === "parent-1",
|
||||
);
|
||||
|
||||
// Immediate and silent should never be in the same announce call
|
||||
const mixedCalls = calls.filter(([, records]) => {
|
||||
const hasImm = records.some((r: { announce?: string }) => r.announce !== "silent");
|
||||
const hasSilent = records.some((r: { announce?: string }) => r.announce === "silent");
|
||||
return hasImm && hasSilent;
|
||||
});
|
||||
expect(mixedCalls).toHaveLength(0);
|
||||
|
||||
// Both should be announced (in separate calls)
|
||||
expect(getSubagentRun("run-imm")?.announced).toBe(true);
|
||||
expect(getSubagentRun("run-s1")?.announced).toBe(true);
|
||||
|
||||
spy.mockRestore();
|
||||
});
|
||||
});
|
||||
|
||||
describe("subagent registry — post-announce cleanup", () => {
|
||||
it("removes runs from registry after successful announcement", async () => {
|
||||
it("keeps runs in registry after successful announcement with archiveAtMs", async () => {
|
||||
// Mock runCoalescedAnnounceFlow to succeed
|
||||
const announceModule = await import("./announce.js");
|
||||
const spy = vi.spyOn(announceModule, "runCoalescedAnnounceFlow").mockReturnValue(true);
|
||||
|
|
@ -288,11 +385,20 @@ describe("subagent registry — post-announce cleanup", () => {
|
|||
|
||||
await flushQueue();
|
||||
|
||||
// Both runs should have been announced and removed from registry
|
||||
// Both runs should have been announced but kept in registry with archiveAtMs
|
||||
expect(spy).toHaveBeenCalled();
|
||||
expect(getSubagentRun("run-a")).toBeUndefined();
|
||||
expect(getSubagentRun("run-b")).toBeUndefined();
|
||||
expect(listSubagentRuns("parent-1")).toHaveLength(0);
|
||||
|
||||
const runA = getSubagentRun("run-a");
|
||||
const runB = getSubagentRun("run-b");
|
||||
expect(runA).toBeDefined();
|
||||
expect(runB).toBeDefined();
|
||||
expect(runA!.announced).toBe(true);
|
||||
expect(runB!.announced).toBe(true);
|
||||
expect(runA!.archiveAtMs).toBeGreaterThan(Date.now());
|
||||
expect(runB!.archiveAtMs).toBeGreaterThan(Date.now());
|
||||
|
||||
// Records are still queryable
|
||||
expect(listSubagentRuns("parent-1")).toHaveLength(2);
|
||||
|
||||
spy.mockRestore();
|
||||
});
|
||||
|
|
|
|||
|
|
@ -101,6 +101,7 @@ export function registerSubagentRun(params: RegisterSubagentRunParams): Subagent
|
|||
label,
|
||||
cleanup = "delete",
|
||||
timeoutSeconds,
|
||||
announce,
|
||||
start,
|
||||
} = params;
|
||||
|
||||
|
|
@ -111,6 +112,7 @@ export function registerSubagentRun(params: RegisterSubagentRunParams): Subagent
|
|||
task,
|
||||
label,
|
||||
cleanup,
|
||||
announce,
|
||||
createdAt: Date.now(),
|
||||
};
|
||||
|
||||
|
|
@ -121,7 +123,9 @@ export function registerSubagentRun(params: RegisterSubagentRunParams): Subagent
|
|||
// Enqueue in the subagent lane — the start callback and watchChildAgent
|
||||
// only execute once a concurrency slot is available.
|
||||
void enqueueInLane(SubagentLane.Subagent, async () => {
|
||||
console.log(`[SubagentRegistry] Lane slot acquired for ${runId}, calling start()`);
|
||||
start?.();
|
||||
console.log(`[SubagentRegistry] start() returned, entering watchChildAgent`);
|
||||
return watchChildAgent(record, timeoutSeconds);
|
||||
});
|
||||
|
||||
|
|
@ -248,12 +252,26 @@ function watchChildAgent(record: SubagentRunRecord, timeoutSeconds?: number): Pr
|
|||
// Wait for the child agent's task queue to drain (task completion),
|
||||
// then trigger announce flow. Uses waitForIdle() instead of consuming
|
||||
// the stream (which would conflict with Hub.consumeAgent).
|
||||
console.log(`[SubagentRegistry] waitForIdle() called for child ${childSessionId}, pendingWrites=${childAgent.getPendingWrites()}`);
|
||||
childAgent.waitForIdle().then(
|
||||
() => cleanup({ status: "ok" }),
|
||||
(err) => cleanup({
|
||||
status: "error",
|
||||
error: err instanceof Error ? err.message : String(err),
|
||||
}),
|
||||
() => {
|
||||
const runtime = Date.now() - (record.startedAt ?? 0);
|
||||
const runError = childAgent.lastRunError;
|
||||
if (runError) {
|
||||
console.log(`[SubagentRegistry] waitForIdle() resolved for child ${childSessionId} with error (runtime: ${runtime}ms): ${runError}`);
|
||||
cleanup({ status: "error", error: runError });
|
||||
} else {
|
||||
console.log(`[SubagentRegistry] waitForIdle() resolved OK for child ${childSessionId} (runtime: ${runtime}ms)`);
|
||||
cleanup({ status: "ok" });
|
||||
}
|
||||
},
|
||||
(err) => {
|
||||
console.error(`[SubagentRegistry] waitForIdle() rejected for child ${childSessionId}:`, err);
|
||||
cleanup({
|
||||
status: "error",
|
||||
error: err instanceof Error ? err.message : String(err),
|
||||
});
|
||||
},
|
||||
);
|
||||
|
||||
// Also handle explicit close (e.g., timeout kill, Hub shutdown)
|
||||
|
|
@ -280,43 +298,57 @@ function captureFindings(record: SubagentRunRecord): void {
|
|||
}
|
||||
|
||||
/**
|
||||
* Phase 2: Check if all unannounced runs for this requester have completed.
|
||||
* If so, send a single coalesced announcement to the parent.
|
||||
* Phase 2: Announce completed-but-unannounced runs.
|
||||
*
|
||||
* Runs with announce="silent" are held back until ALL silent runs from the
|
||||
* same requester have completed. All other runs (immediate / undefined) are
|
||||
* announced per-completion as before.
|
||||
*/
|
||||
function checkAndAnnounce(requesterSessionId: string): void {
|
||||
const allRuns = listSubagentRuns(requesterSessionId);
|
||||
|
||||
// Only consider unannounced runs
|
||||
const pending = allRuns.filter(r => !r.announced);
|
||||
if (pending.length === 0) return;
|
||||
// ── Immediate runs: announce per-completion (default behavior) ──
|
||||
const immediateReady = allRuns.filter(
|
||||
r => !r.announced && r.endedAt !== undefined && r.findingsCaptured && r.announce !== "silent",
|
||||
);
|
||||
if (immediateReady.length > 0) {
|
||||
announceGroup(requesterSessionId, immediateReady);
|
||||
}
|
||||
|
||||
// Are all unannounced runs done?
|
||||
const allDone = pending.every(r => r.endedAt !== undefined);
|
||||
if (!allDone) return;
|
||||
// ── Silent runs: announce only when ALL silent runs are done ──
|
||||
const silentRuns = allRuns.filter(r => r.announce === "silent");
|
||||
const unannouncedSilent = silentRuns.filter(r => !r.announced);
|
||||
const silentReady = unannouncedSilent.filter(
|
||||
r => r.endedAt !== undefined && r.findingsCaptured,
|
||||
);
|
||||
|
||||
// Have all had findings captured?
|
||||
const allCaptured = pending.every(r => r.findingsCaptured);
|
||||
if (!allCaptured) return;
|
||||
// All unannounced silent runs must be ready (ended + findings captured)
|
||||
if (silentReady.length > 0 && silentReady.length === unannouncedSilent.length) {
|
||||
announceGroup(requesterSessionId, silentReady);
|
||||
}
|
||||
}
|
||||
|
||||
// All done — send coalesced announcement
|
||||
const announced = runCoalescedAnnounceFlow(requesterSessionId, pending);
|
||||
/** Announce a group of runs and mark them as announced. */
|
||||
function announceGroup(requesterSessionId: string, runs: SubagentRunRecord[]): void {
|
||||
const announced = runCoalescedAnnounceFlow(requesterSessionId, runs);
|
||||
|
||||
if (announced) {
|
||||
for (const r of pending) {
|
||||
for (const r of runs) {
|
||||
r.announced = true;
|
||||
r.cleanupHandled = true;
|
||||
// Remove from registry immediately — findings already delivered to parent
|
||||
subagentRuns.delete(r.runId);
|
||||
// Keep records for querying via sessions_list; let sweeper archive later
|
||||
r.archiveAtMs = Date.now() + DEFAULT_ARCHIVE_AFTER_MS;
|
||||
}
|
||||
persist();
|
||||
if (subagentRuns.size === 0) {
|
||||
stopSweeper();
|
||||
}
|
||||
} else {
|
||||
// Allow retry — mark cleanupHandled false so initSubagentRegistry() retries
|
||||
for (const r of runs) {
|
||||
r.cleanupHandled = false;
|
||||
}
|
||||
persist();
|
||||
console.warn(
|
||||
`[SubagentRegistry] Coalesced announce failed for requester ${requesterSessionId}`,
|
||||
`[SubagentRegistry] Announce failed for requester ${requesterSessionId}`,
|
||||
);
|
||||
// Leave announced=false so initSubagentRegistry() can retry on restart
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -45,6 +45,9 @@ export type SubagentRunRecord = {
|
|||
findingsCaptured?: boolean | undefined;
|
||||
/** Whether the coalesced announcement has been sent to parent */
|
||||
announced?: boolean | undefined;
|
||||
/** Announcement mode: "immediate" (default) announces per-completion,
|
||||
* "silent" defers until all silent runs from the same requester complete. */
|
||||
announce?: "immediate" | "silent" | undefined;
|
||||
};
|
||||
|
||||
/** Parameters for registering a new subagent run */
|
||||
|
|
@ -58,6 +61,8 @@ export type RegisterSubagentRunParams = {
|
|||
timeoutSeconds?: number | undefined;
|
||||
/** Callback invoked when the queue slot is acquired (used to defer childAgent.write). */
|
||||
start?: (() => void) | undefined;
|
||||
/** Announcement mode: "immediate" (default) or "silent" (defer until all silent runs complete). */
|
||||
announce?: "immediate" | "silent" | undefined;
|
||||
};
|
||||
|
||||
/** Parameters for the announce flow */
|
||||
|
|
|
|||
|
|
@ -28,6 +28,7 @@ const CORE_TOOL_SUMMARIES: Record<string, string> = {
|
|||
web_fetch: "Fetch and extract readable content from a URL",
|
||||
memory_search: "Search memory files by keyword",
|
||||
sessions_spawn: "Spawn a sub-agent session",
|
||||
data: "Query structured financial and market data",
|
||||
};
|
||||
|
||||
/** Preferred display order for tools */
|
||||
|
|
@ -42,6 +43,7 @@ const TOOL_ORDER = [
|
|||
"web_fetch",
|
||||
"memory_search",
|
||||
"sessions_spawn",
|
||||
"data",
|
||||
];
|
||||
|
||||
// ─── Section builders ───────────────────────────────────────────────────────
|
||||
|
|
@ -260,9 +262,35 @@ export function buildConditionalToolSections(
|
|||
lines.push(
|
||||
"## Sub-Agents",
|
||||
"If a task is complex or long-running, spawn a sub-agent. It will do the work and report back when done.",
|
||||
"You can check on running sub-agents at any time.",
|
||||
"IMPORTANT: After spawning sub-agents, do NOT immediately check on them with sessions_list. " +
|
||||
"Results are delivered directly into your context automatically when the sub-agent finishes. " +
|
||||
"Continue with other tasks or finish your turn and wait for the results to arrive.",
|
||||
"You may use sessions_list to check on sub-agents only if a long time has passed or the user explicitly asks about their status.",
|
||||
"Sub-agents cannot spawn nested sub-agents.",
|
||||
"",
|
||||
"### Timeout Guidelines",
|
||||
"Set timeoutSeconds generously — a sub-agent that times out loses all its work.",
|
||||
"- Simple tasks (search, read, summarize): 600 (10 min, the default)",
|
||||
"- Moderate tasks (multi-step research, file downloads + analysis): 900–1200 (15–20 min)",
|
||||
"- Complex tasks (code generation, PDF creation, multi-file operations): 1200–1800 (20–30 min)",
|
||||
"When in doubt, use a longer timeout. It is always better to wait longer than to lose completed work.",
|
||||
"",
|
||||
"### Announce Modes",
|
||||
"- `announce: \"immediate\"` (default): Each sub-agent's findings are delivered to you as soon as it completes.",
|
||||
"- `announce: \"silent\"`: All findings are held back until every silent sub-agent finishes, then delivered as ONE combined report.",
|
||||
"Use \"silent\" when you want to collect data from multiple sub-agents first, then summarize everything at once.",
|
||||
"",
|
||||
);
|
||||
}
|
||||
|
||||
// Data tools
|
||||
if (toolSet.has("data")) {
|
||||
lines.push(
|
||||
"## Data Access",
|
||||
"You have access to structured financial and market data via the `data` tool.",
|
||||
'Use domain="finance" with specific actions to retrieve stock prices, financial statements, SEC filings, metrics, and more.',
|
||||
"Always specify dates in YYYY-MM-DD format. Use period='annual' or 'quarterly' or 'ttm' for financial statements.",
|
||||
"",
|
||||
);
|
||||
}
|
||||
|
||||
|
|
@ -364,6 +392,10 @@ export function buildSubagentSection(
|
|||
"## Subagent Rules",
|
||||
"- Stay focused on the assigned task below.",
|
||||
"- Complete the task thoroughly and report your findings.",
|
||||
"- If you encounter errors (missing API keys, permission denied, tool failures, etc.), " +
|
||||
"you MUST explicitly report them in your final message. " +
|
||||
"State exactly what failed and what is needed to fix it — " +
|
||||
"the parent agent relies on your final message to understand what happened.",
|
||||
"- Do NOT initiate side actions unrelated to the task.",
|
||||
"- Do NOT attempt to communicate with the user directly.",
|
||||
"- Do NOT spawn nested subagents.",
|
||||
|
|
|
|||
39
packages/core/src/agent/tokens.test.ts
Normal file
39
packages/core/src/agent/tokens.test.ts
Normal file
|
|
@ -0,0 +1,39 @@
|
|||
import { describe, it, expect } from "vitest";
|
||||
import { isSilentReplyText, SILENT_REPLY_TOKEN } from "./tokens.js";
|
||||
|
||||
describe("isSilentReplyText", () => {
|
||||
it("detects exact NO_REPLY", () => {
|
||||
expect(isSilentReplyText("NO_REPLY")).toBe(true);
|
||||
});
|
||||
|
||||
it("detects NO_REPLY with surrounding whitespace", () => {
|
||||
expect(isSilentReplyText(" NO_REPLY ")).toBe(true);
|
||||
expect(isSilentReplyText("\nNO_REPLY\n")).toBe(true);
|
||||
});
|
||||
|
||||
it("detects NO_REPLY with trailing punctuation", () => {
|
||||
expect(isSilentReplyText("NO_REPLY.")).toBe(true);
|
||||
expect(isSilentReplyText("NO_REPLY.\n")).toBe(true);
|
||||
});
|
||||
|
||||
it("detects NO_REPLY at end of text", () => {
|
||||
expect(isSilentReplyText("I have nothing to report. NO_REPLY")).toBe(true);
|
||||
});
|
||||
|
||||
it("returns false for undefined/empty", () => {
|
||||
expect(isSilentReplyText(undefined)).toBe(false);
|
||||
expect(isSilentReplyText("")).toBe(false);
|
||||
});
|
||||
|
||||
it("returns false for normal text", () => {
|
||||
expect(isSilentReplyText("Here are the findings")).toBe(false);
|
||||
});
|
||||
|
||||
it("returns false for NO_REPLY embedded in a word", () => {
|
||||
expect(isSilentReplyText("DONO_REPLYX")).toBe(false);
|
||||
});
|
||||
|
||||
it("exports SILENT_REPLY_TOKEN as NO_REPLY", () => {
|
||||
expect(SILENT_REPLY_TOKEN).toBe("NO_REPLY");
|
||||
});
|
||||
});
|
||||
17
packages/core/src/agent/tokens.ts
Normal file
17
packages/core/src/agent/tokens.ts
Normal file
|
|
@ -0,0 +1,17 @@
|
|||
export const SILENT_REPLY_TOKEN = "NO_REPLY";
|
||||
|
||||
function escapeRegExp(value: string): string {
|
||||
return value.replace(/[.*+?^${}()|[\]\\]/g, "\\$&");
|
||||
}
|
||||
|
||||
export function isSilentReplyText(
|
||||
text: string | undefined,
|
||||
token: string = SILENT_REPLY_TOKEN,
|
||||
): boolean {
|
||||
if (!text) return false;
|
||||
const escaped = escapeRegExp(token);
|
||||
const prefix = new RegExp(`^\\s*${escaped}(?=$|\\W)`);
|
||||
if (prefix.test(text)) return true;
|
||||
const suffix = new RegExp(`\\b${escaped}\\b\\W*$`);
|
||||
return suffix.test(text);
|
||||
}
|
||||
|
|
@ -10,6 +10,7 @@ import { createSessionsSpawnTool } from "./tools/sessions-spawn.js";
|
|||
import { createSessionsListTool } from "./tools/sessions-list.js";
|
||||
import { createMemorySearchTool } from "./tools/memory-search.js";
|
||||
import { createCronTool } from "./tools/cron/index.js";
|
||||
import { createDataTool } from "./tools/data/index.js";
|
||||
import { filterTools } from "./tools/policy.js";
|
||||
import { isMulticaError, isRetryableError } from "@multica/utils";
|
||||
import type { ExecApprovalCallback } from "./tools/exec-approval-types.js";
|
||||
|
|
@ -26,6 +27,8 @@ export interface CreateToolsOptions {
|
|||
isSubagent?: boolean | undefined;
|
||||
/** Session ID of the agent (passed to sessions_spawn tool) */
|
||||
sessionId?: string | undefined;
|
||||
/** Resolved provider ID of the parent agent (passed to sessions_spawn for subagent inheritance) */
|
||||
provider?: string | undefined;
|
||||
/** Callback invoked when exec tool needs approval before running a command */
|
||||
onExecApprovalNeeded?: ExecApprovalCallback | undefined;
|
||||
}
|
||||
|
|
@ -110,6 +113,7 @@ export function createAllTools(options: CreateToolsOptions | string): AgentTool<
|
|||
const webSearchTool = createWebSearchTool();
|
||||
|
||||
const cronTool = createCronTool();
|
||||
const dataTool = createDataTool();
|
||||
|
||||
const tools: AgentTool<any>[] = [
|
||||
...baseTools,
|
||||
|
|
@ -119,6 +123,7 @@ export function createAllTools(options: CreateToolsOptions | string): AgentTool<
|
|||
webFetchTool as AgentTool<any>,
|
||||
webSearchTool as AgentTool<any>,
|
||||
cronTool as AgentTool<any>,
|
||||
dataTool as AgentTool<any>,
|
||||
];
|
||||
|
||||
// Add memory_search tool if profileDir is provided
|
||||
|
|
@ -131,6 +136,7 @@ export function createAllTools(options: CreateToolsOptions | string): AgentTool<
|
|||
const sessionsSpawnTool = createSessionsSpawnTool({
|
||||
isSubagent: isSubagent ?? false,
|
||||
...(sessionId !== undefined ? { sessionId } : {}),
|
||||
...(opts.provider !== undefined ? { provider: opts.provider } : {}),
|
||||
});
|
||||
tools.push(sessionsSpawnTool as AgentTool<any>);
|
||||
|
||||
|
|
@ -165,6 +171,7 @@ export function resolveTools(options: ResolveToolsOptions): AgentTool<any>[] {
|
|||
profileDir: options.profileDir,
|
||||
isSubagent: options.isSubagent,
|
||||
sessionId: options.sessionId,
|
||||
provider: options.provider,
|
||||
onExecApprovalNeeded: options.onExecApprovalNeeded,
|
||||
});
|
||||
|
||||
|
|
|
|||
118
packages/core/src/agent/tools/data/data-tool.ts
Normal file
118
packages/core/src/agent/tools/data/data-tool.ts
Normal file
|
|
@ -0,0 +1,118 @@
|
|||
/**
|
||||
* Unified data tool — structured access to domain-specific data sources.
|
||||
*
|
||||
* Currently supports the "finance" domain (Financial Datasets API).
|
||||
* Designed as a stable interface: the backend can be swapped from
|
||||
* direct API calls to a Multica Data Service without changing the tool schema.
|
||||
*/
|
||||
|
||||
import { Type } from "@sinclair/typebox";
|
||||
import type { AgentTool } from "@mariozechner/pi-agent-core";
|
||||
import { executeFinanceAction } from "./finance/actions.js";
|
||||
|
||||
// ─── Schema ─────────────────────────────────────────────────────────────────
|
||||
|
||||
const DataToolSchema = Type.Object({
|
||||
domain: Type.String({
|
||||
description: 'Data domain. Currently supported: "finance".',
|
||||
}),
|
||||
action: Type.String({
|
||||
description:
|
||||
"Action to perform within the domain.\n\n" +
|
||||
"FINANCE DOMAIN ACTIONS:\n" +
|
||||
"Prices:\n" +
|
||||
" get_price_snapshot — params: { ticker }\n" +
|
||||
" get_prices — params: { ticker, start_date, end_date, interval?, interval_multiplier? }\n" +
|
||||
" get_crypto_price_snapshot — params: { ticker } (e.g. BTC-USD)\n" +
|
||||
" get_crypto_prices — params: { ticker, start_date, end_date, interval?, interval_multiplier? }\n" +
|
||||
" get_available_crypto_tickers — params: {}\n" +
|
||||
"Financial Statements (period: annual|quarterly|ttm):\n" +
|
||||
" get_income_statements — params: { ticker, period, limit?, report_period_gt/gte/lt/lte? }\n" +
|
||||
" get_balance_sheets — same params\n" +
|
||||
" get_cash_flow_statements — same params\n" +
|
||||
" get_all_financial_statements — same params (returns all three)\n" +
|
||||
"Metrics:\n" +
|
||||
" get_financial_metrics_snapshot — params: { ticker }\n" +
|
||||
" get_financial_metrics — params: { ticker, period?, limit?, report_period*? }\n" +
|
||||
" get_analyst_estimates — params: { ticker, period? }\n" +
|
||||
"Company:\n" +
|
||||
" get_news — params: { ticker, start_date?, end_date?, limit? }\n" +
|
||||
" get_insider_trades — params: { ticker, limit?, filing_date*? }\n" +
|
||||
" get_segmented_revenues — params: { ticker, period, limit? }\n" +
|
||||
" get_company_facts — params: { ticker }\n" +
|
||||
"SEC Filings:\n" +
|
||||
" get_filings — params: { ticker, filing_type?, limit? }\n" +
|
||||
" get_filing_items — params: { ticker, filing_type, accession_number?, item? }",
|
||||
}),
|
||||
params: Type.Record(Type.String(), Type.Unknown(), {
|
||||
description:
|
||||
"Action-specific parameters as key-value pairs. " +
|
||||
"Common: ticker (string, e.g. 'AAPL'), period ('annual'|'quarterly'|'ttm'), " +
|
||||
"limit (number), start_date/end_date ('YYYY-MM-DD'), " +
|
||||
"interval ('day'|'week'|'month'|'year'), filing_type ('10-K'|'10-Q'|'8-K').",
|
||||
}),
|
||||
});
|
||||
|
||||
// ─── Types ──────────────────────────────────────────────────────────────────
|
||||
|
||||
type DataToolArgs = {
|
||||
domain: string;
|
||||
action: string;
|
||||
params: Record<string, unknown>;
|
||||
};
|
||||
|
||||
export type DataToolResult = {
|
||||
domain: string;
|
||||
action: string;
|
||||
data: unknown;
|
||||
sourceUrl?: string;
|
||||
};
|
||||
|
||||
// ─── Factory ────────────────────────────────────────────────────────────────
|
||||
|
||||
export function createDataTool(): AgentTool<typeof DataToolSchema, DataToolResult> {
|
||||
return {
|
||||
name: "data",
|
||||
label: "Data",
|
||||
description:
|
||||
"Query structured data from external sources. " +
|
||||
'Supports domain="finance" for stock prices, financial statements, key metrics, ' +
|
||||
"SEC filings, analyst estimates, insider trades, news, and crypto data.",
|
||||
parameters: DataToolSchema,
|
||||
execute: async (_toolCallId, args, signal) => {
|
||||
const { domain, action, params } = args as DataToolArgs;
|
||||
|
||||
if (domain !== "finance") {
|
||||
const errorPayload = {
|
||||
error: true,
|
||||
message: `Unknown domain: "${domain}". Currently supported: "finance".`,
|
||||
};
|
||||
return {
|
||||
content: [{ type: "text", text: JSON.stringify(errorPayload, null, 2) }],
|
||||
details: { domain, action, data: null } as unknown as DataToolResult,
|
||||
};
|
||||
}
|
||||
|
||||
try {
|
||||
const result = await executeFinanceAction(action, params ?? {}, signal);
|
||||
const payload: DataToolResult = {
|
||||
domain,
|
||||
action,
|
||||
data: result.data,
|
||||
sourceUrl: result.sourceUrl,
|
||||
};
|
||||
return {
|
||||
content: [{ type: "text", text: JSON.stringify(payload, null, 2) }],
|
||||
details: payload,
|
||||
};
|
||||
} catch (error) {
|
||||
const message = error instanceof Error ? error.message : String(error);
|
||||
const errorPayload = { error: true, domain, action, message };
|
||||
return {
|
||||
content: [{ type: "text", text: JSON.stringify(errorPayload, null, 2) }],
|
||||
details: { domain, action, data: null } as unknown as DataToolResult,
|
||||
};
|
||||
}
|
||||
},
|
||||
};
|
||||
}
|
||||
295
packages/core/src/agent/tools/data/finance/actions.ts
Normal file
295
packages/core/src/agent/tools/data/finance/actions.ts
Normal file
|
|
@ -0,0 +1,295 @@
|
|||
/**
|
||||
* Finance action handlers.
|
||||
*
|
||||
* Maps each action name to the corresponding Financial Datasets API call.
|
||||
*/
|
||||
|
||||
import type { FinanceAction } from "./types.js";
|
||||
import { FINANCE_ACTION_SET } from "./types.js";
|
||||
import { financeFetch } from "./api.js";
|
||||
|
||||
// ─── Helpers ────────────────────────────────────────────────────────────────
|
||||
|
||||
type Params = Record<string, unknown>;
|
||||
|
||||
function requireParam(params: Params, name: string): string {
|
||||
const value = params[name];
|
||||
if (value === undefined || value === null || value === "") {
|
||||
throw new Error(`Missing required parameter: ${name}`);
|
||||
}
|
||||
return String(value);
|
||||
}
|
||||
|
||||
function optionalString(params: Params, name: string): string | undefined {
|
||||
const value = params[name];
|
||||
if (value === undefined || value === null || value === "") return undefined;
|
||||
return String(value);
|
||||
}
|
||||
|
||||
function optionalNumber(params: Params, name: string): number | undefined {
|
||||
const value = params[name];
|
||||
if (value === undefined || value === null) return undefined;
|
||||
return Number(value);
|
||||
}
|
||||
|
||||
function optionalStringArray(params: Params, name: string): string[] | undefined {
|
||||
const value = params[name];
|
||||
if (value === undefined || value === null) return undefined;
|
||||
if (Array.isArray(value)) return value.map(String);
|
||||
return [String(value)];
|
||||
}
|
||||
|
||||
/** Extract common financial statement filter params */
|
||||
function statementFilters(params: Params) {
|
||||
return {
|
||||
ticker: requireParam(params, "ticker"),
|
||||
period: requireParam(params, "period"),
|
||||
limit: optionalNumber(params, "limit"),
|
||||
report_period_gt: optionalString(params, "report_period_gt"),
|
||||
report_period_gte: optionalString(params, "report_period_gte"),
|
||||
report_period_lt: optionalString(params, "report_period_lt"),
|
||||
report_period_lte: optionalString(params, "report_period_lte"),
|
||||
};
|
||||
}
|
||||
|
||||
// ─── Action result type ─────────────────────────────────────────────────────
|
||||
|
||||
export interface FinanceActionResult {
|
||||
data: unknown;
|
||||
sourceUrl: string;
|
||||
}
|
||||
|
||||
// ─── Action handlers ────────────────────────────────────────────────────────
|
||||
|
||||
type ActionHandler = (params: Params, signal?: AbortSignal) => Promise<FinanceActionResult>;
|
||||
|
||||
const handlers: Record<FinanceAction, ActionHandler> = {
|
||||
// ── Prices ──────────────────────────────────────────────────────────────
|
||||
|
||||
get_price_snapshot: async (params, signal) => {
|
||||
const ticker = requireParam(params, "ticker");
|
||||
const { data, url } = await financeFetch("/prices/snapshot", { ticker }, signal);
|
||||
return { data: (data as Record<string, unknown>).snapshot ?? data, sourceUrl: url };
|
||||
},
|
||||
|
||||
get_prices: async (params, signal) => {
|
||||
const ticker = requireParam(params, "ticker");
|
||||
const start_date = requireParam(params, "start_date");
|
||||
const end_date = requireParam(params, "end_date");
|
||||
const { data, url } = await financeFetch(
|
||||
"/prices",
|
||||
{
|
||||
ticker,
|
||||
start_date,
|
||||
end_date,
|
||||
interval: optionalString(params, "interval") ?? "day",
|
||||
interval_multiplier: optionalNumber(params, "interval_multiplier"),
|
||||
},
|
||||
signal,
|
||||
);
|
||||
return { data: (data as Record<string, unknown>).prices ?? data, sourceUrl: url };
|
||||
},
|
||||
|
||||
get_crypto_price_snapshot: async (params, signal) => {
|
||||
const ticker = requireParam(params, "ticker");
|
||||
const { data, url } = await financeFetch("/crypto/prices/snapshot", { ticker }, signal);
|
||||
return { data: (data as Record<string, unknown>).snapshot ?? data, sourceUrl: url };
|
||||
},
|
||||
|
||||
get_crypto_prices: async (params, signal) => {
|
||||
const ticker = requireParam(params, "ticker");
|
||||
const start_date = requireParam(params, "start_date");
|
||||
const end_date = requireParam(params, "end_date");
|
||||
const { data, url } = await financeFetch(
|
||||
"/crypto/prices",
|
||||
{
|
||||
ticker,
|
||||
start_date,
|
||||
end_date,
|
||||
interval: optionalString(params, "interval") ?? "day",
|
||||
interval_multiplier: optionalNumber(params, "interval_multiplier"),
|
||||
},
|
||||
signal,
|
||||
);
|
||||
return { data: (data as Record<string, unknown>).prices ?? data, sourceUrl: url };
|
||||
},
|
||||
|
||||
get_available_crypto_tickers: async (_params, signal) => {
|
||||
const { data, url } = await financeFetch("/crypto/prices/tickers", {}, signal);
|
||||
return { data: (data as Record<string, unknown>).tickers ?? data, sourceUrl: url };
|
||||
},
|
||||
|
||||
// ── Financial statements ────────────────────────────────────────────────
|
||||
|
||||
get_income_statements: async (params, signal) => {
|
||||
const filters = statementFilters(params);
|
||||
const { data, url } = await financeFetch("/financials/income-statements", filters, signal);
|
||||
return { data: (data as Record<string, unknown>).income_statements ?? data, sourceUrl: url };
|
||||
},
|
||||
|
||||
get_balance_sheets: async (params, signal) => {
|
||||
const filters = statementFilters(params);
|
||||
const { data, url } = await financeFetch("/financials/balance-sheets", filters, signal);
|
||||
return { data: (data as Record<string, unknown>).balance_sheets ?? data, sourceUrl: url };
|
||||
},
|
||||
|
||||
get_cash_flow_statements: async (params, signal) => {
|
||||
const filters = statementFilters(params);
|
||||
const { data, url } = await financeFetch("/financials/cash-flow-statements", filters, signal);
|
||||
return { data: (data as Record<string, unknown>).cash_flow_statements ?? data, sourceUrl: url };
|
||||
},
|
||||
|
||||
get_all_financial_statements: async (params, signal) => {
|
||||
const filters = statementFilters(params);
|
||||
const { data, url } = await financeFetch("/financials", filters, signal);
|
||||
return { data: (data as Record<string, unknown>).financials ?? data, sourceUrl: url };
|
||||
},
|
||||
|
||||
// ── Metrics & estimates ─────────────────────────────────────────────────
|
||||
|
||||
get_financial_metrics_snapshot: async (params, signal) => {
|
||||
const ticker = requireParam(params, "ticker");
|
||||
const { data, url } = await financeFetch("/financial-metrics/snapshot", { ticker }, signal);
|
||||
return { data: (data as Record<string, unknown>).snapshot ?? data, sourceUrl: url };
|
||||
},
|
||||
|
||||
get_financial_metrics: async (params, signal) => {
|
||||
const ticker = requireParam(params, "ticker");
|
||||
const { data, url } = await financeFetch(
|
||||
"/financial-metrics",
|
||||
{
|
||||
ticker,
|
||||
period: optionalString(params, "period"),
|
||||
limit: optionalNumber(params, "limit"),
|
||||
report_period: optionalString(params, "report_period"),
|
||||
report_period_gt: optionalString(params, "report_period_gt"),
|
||||
report_period_gte: optionalString(params, "report_period_gte"),
|
||||
report_period_lt: optionalString(params, "report_period_lt"),
|
||||
report_period_lte: optionalString(params, "report_period_lte"),
|
||||
},
|
||||
signal,
|
||||
);
|
||||
return { data: (data as Record<string, unknown>).financial_metrics ?? data, sourceUrl: url };
|
||||
},
|
||||
|
||||
get_analyst_estimates: async (params, signal) => {
|
||||
const ticker = requireParam(params, "ticker");
|
||||
const { data, url } = await financeFetch(
|
||||
"/analyst-estimates",
|
||||
{
|
||||
ticker,
|
||||
period: optionalString(params, "period"),
|
||||
},
|
||||
signal,
|
||||
);
|
||||
return { data: (data as Record<string, unknown>).analyst_estimates ?? data, sourceUrl: url };
|
||||
},
|
||||
|
||||
// ── Company info ────────────────────────────────────────────────────────
|
||||
|
||||
get_news: async (params, signal) => {
|
||||
const ticker = requireParam(params, "ticker");
|
||||
const { data, url } = await financeFetch(
|
||||
"/news",
|
||||
{
|
||||
ticker,
|
||||
start_date: optionalString(params, "start_date"),
|
||||
end_date: optionalString(params, "end_date"),
|
||||
limit: optionalNumber(params, "limit"),
|
||||
},
|
||||
signal,
|
||||
);
|
||||
return { data: (data as Record<string, unknown>).news ?? data, sourceUrl: url };
|
||||
},
|
||||
|
||||
get_insider_trades: async (params, signal) => {
|
||||
const ticker = requireParam(params, "ticker");
|
||||
const { data, url } = await financeFetch(
|
||||
"/insider-trades",
|
||||
{
|
||||
ticker: ticker.toUpperCase(),
|
||||
limit: optionalNumber(params, "limit"),
|
||||
filing_date: optionalString(params, "filing_date"),
|
||||
filing_date_gt: optionalString(params, "filing_date_gt"),
|
||||
filing_date_gte: optionalString(params, "filing_date_gte"),
|
||||
filing_date_lt: optionalString(params, "filing_date_lt"),
|
||||
filing_date_lte: optionalString(params, "filing_date_lte"),
|
||||
},
|
||||
signal,
|
||||
);
|
||||
return { data: (data as Record<string, unknown>).insider_trades ?? data, sourceUrl: url };
|
||||
},
|
||||
|
||||
get_segmented_revenues: async (params, signal) => {
|
||||
const ticker = requireParam(params, "ticker");
|
||||
const period = requireParam(params, "period");
|
||||
const { data, url } = await financeFetch(
|
||||
"/financials/segmented-revenues",
|
||||
{
|
||||
ticker,
|
||||
period,
|
||||
limit: optionalNumber(params, "limit"),
|
||||
},
|
||||
signal,
|
||||
);
|
||||
return { data: (data as Record<string, unknown>).segmented_revenues ?? data, sourceUrl: url };
|
||||
},
|
||||
|
||||
get_company_facts: async (params, signal) => {
|
||||
const ticker = requireParam(params, "ticker");
|
||||
const { data, url } = await financeFetch("/company/facts", { ticker }, signal);
|
||||
return { data: (data as Record<string, unknown>).company_facts ?? data, sourceUrl: url };
|
||||
},
|
||||
|
||||
// ── SEC filings ─────────────────────────────────────────────────────────
|
||||
|
||||
get_filings: async (params, signal) => {
|
||||
const ticker = requireParam(params, "ticker");
|
||||
const { data, url } = await financeFetch(
|
||||
"/filings",
|
||||
{
|
||||
ticker,
|
||||
filing_type: optionalString(params, "filing_type"),
|
||||
limit: optionalNumber(params, "limit"),
|
||||
},
|
||||
signal,
|
||||
);
|
||||
return { data: (data as Record<string, unknown>).filings ?? data, sourceUrl: url };
|
||||
},
|
||||
|
||||
get_filing_items: async (params, signal) => {
|
||||
const ticker = requireParam(params, "ticker").toUpperCase();
|
||||
const filing_type = requireParam(params, "filing_type");
|
||||
const { data, url } = await financeFetch(
|
||||
"/filings/items",
|
||||
{
|
||||
ticker,
|
||||
filing_type,
|
||||
accession_number: optionalString(params, "accession_number"),
|
||||
item: optionalStringArray(params, "item"),
|
||||
},
|
||||
signal,
|
||||
);
|
||||
return { data, sourceUrl: url };
|
||||
},
|
||||
};
|
||||
|
||||
// ─── Public API ─────────────────────────────────────────────────────────────
|
||||
|
||||
/**
|
||||
* Execute a finance domain action.
|
||||
*
|
||||
* @throws Error if the action is unknown or required params are missing.
|
||||
*/
|
||||
export function executeFinanceAction(
|
||||
action: string,
|
||||
params: Record<string, unknown>,
|
||||
signal?: AbortSignal,
|
||||
): Promise<FinanceActionResult> {
|
||||
if (!FINANCE_ACTION_SET.has(action)) {
|
||||
throw new Error(
|
||||
`Unknown finance action: "${action}". Available: ${[...FINANCE_ACTION_SET].join(", ")}`,
|
||||
);
|
||||
}
|
||||
return handlers[action as FinanceAction](params, signal);
|
||||
}
|
||||
79
packages/core/src/agent/tools/data/finance/api.ts
Normal file
79
packages/core/src/agent/tools/data/finance/api.ts
Normal file
|
|
@ -0,0 +1,79 @@
|
|||
/**
|
||||
* Financial Datasets API client.
|
||||
*
|
||||
* Base URL: https://api.financialdatasets.ai
|
||||
* Auth: X-API-KEY header
|
||||
* All endpoints use GET with query parameters.
|
||||
*/
|
||||
|
||||
import { credentialManager } from "../../../credentials.js";
|
||||
|
||||
const BASE_URL = "https://api.financialdatasets.ai";
|
||||
const TIMEOUT_MS = 30_000;
|
||||
|
||||
function getApiKey(): string {
|
||||
// 1. credentials.json5 → tools.data.apiKey (preferred)
|
||||
const toolConfig = credentialManager.getToolConfig("data");
|
||||
if (toolConfig?.apiKey) return toolConfig.apiKey;
|
||||
|
||||
// 2. Fallback: env var (skills.env.json5 or process.env)
|
||||
const envKey = credentialManager.getEnv("FINANCIAL_DATASETS_API_KEY");
|
||||
if (envKey) return envKey;
|
||||
|
||||
throw new Error(
|
||||
"Financial Datasets API key not configured. " +
|
||||
'Set it in ~/.super-multica/credentials.json5 under tools.data.apiKey, ' +
|
||||
"or set FINANCIAL_DATASETS_API_KEY in ~/.super-multica/skills.env.json5.",
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Fetch data from the Financial Datasets API.
|
||||
*
|
||||
* @param path - API path (e.g., "/prices/snapshot")
|
||||
* @param params - Query parameters. Arrays are sent as repeated params (e.g., item=1A&item=1B).
|
||||
* @param signal - Optional AbortSignal for cancellation.
|
||||
*/
|
||||
export async function financeFetch<T = Record<string, unknown>>(
|
||||
path: string,
|
||||
params: Record<string, string | string[] | number | boolean | undefined>,
|
||||
signal?: AbortSignal,
|
||||
): Promise<{ data: T; url: string }> {
|
||||
const apiKey = getApiKey();
|
||||
|
||||
const url = new URL(path, BASE_URL);
|
||||
for (const [key, value] of Object.entries(params)) {
|
||||
if (value === undefined || value === null) continue;
|
||||
if (Array.isArray(value)) {
|
||||
for (const v of value) {
|
||||
url.searchParams.append(key, String(v));
|
||||
}
|
||||
} else {
|
||||
url.searchParams.set(key, String(value));
|
||||
}
|
||||
}
|
||||
|
||||
const timeoutSignal = AbortSignal.timeout(TIMEOUT_MS);
|
||||
const combinedSignal = signal
|
||||
? AbortSignal.any([signal, timeoutSignal])
|
||||
: timeoutSignal;
|
||||
|
||||
const res = await fetch(url.toString(), {
|
||||
method: "GET",
|
||||
headers: {
|
||||
"X-API-KEY": apiKey,
|
||||
Accept: "application/json",
|
||||
},
|
||||
signal: combinedSignal,
|
||||
});
|
||||
|
||||
if (!res.ok) {
|
||||
const body = await res.text().catch(() => "");
|
||||
throw new Error(
|
||||
`Financial Datasets API error (${res.status}): ${body || res.statusText}`,
|
||||
);
|
||||
}
|
||||
|
||||
const data = (await res.json()) as T;
|
||||
return { data, url: url.toString() };
|
||||
}
|
||||
35
packages/core/src/agent/tools/data/finance/types.ts
Normal file
35
packages/core/src/agent/tools/data/finance/types.ts
Normal file
|
|
@ -0,0 +1,35 @@
|
|||
/**
|
||||
* Finance domain types for the data tool.
|
||||
*/
|
||||
|
||||
/** All supported finance actions */
|
||||
export const FINANCE_ACTIONS = [
|
||||
// Price data
|
||||
"get_price_snapshot",
|
||||
"get_prices",
|
||||
"get_crypto_price_snapshot",
|
||||
"get_crypto_prices",
|
||||
"get_available_crypto_tickers",
|
||||
// Financial statements
|
||||
"get_income_statements",
|
||||
"get_balance_sheets",
|
||||
"get_cash_flow_statements",
|
||||
"get_all_financial_statements",
|
||||
// Metrics & estimates
|
||||
"get_financial_metrics_snapshot",
|
||||
"get_financial_metrics",
|
||||
"get_analyst_estimates",
|
||||
// Company info
|
||||
"get_news",
|
||||
"get_insider_trades",
|
||||
"get_segmented_revenues",
|
||||
"get_company_facts",
|
||||
// SEC filings
|
||||
"get_filings",
|
||||
"get_filing_items",
|
||||
] as const;
|
||||
|
||||
export type FinanceAction = (typeof FINANCE_ACTIONS)[number];
|
||||
|
||||
/** Set for O(1) lookup */
|
||||
export const FINANCE_ACTION_SET = new Set<string>(FINANCE_ACTIONS);
|
||||
1
packages/core/src/agent/tools/data/index.ts
Normal file
1
packages/core/src/agent/tools/data/index.ts
Normal file
|
|
@ -0,0 +1 @@
|
|||
export { createDataTool, type DataToolResult } from "./data-tool.js";
|
||||
|
|
@ -39,6 +39,9 @@ export const TOOL_GROUPS: Record<string, string[]> = {
|
|||
// Cron/scheduling tools
|
||||
"group:cron": ["cron"],
|
||||
|
||||
// Data tools (finance, etc.)
|
||||
"group:data": ["data"],
|
||||
|
||||
// All core tools
|
||||
"group:core": [
|
||||
"read",
|
||||
|
|
@ -49,6 +52,7 @@ export const TOOL_GROUPS: Record<string, string[]> = {
|
|||
"process",
|
||||
"web_search",
|
||||
"web_fetch",
|
||||
"data",
|
||||
],
|
||||
};
|
||||
|
||||
|
|
|
|||
|
|
@ -8,6 +8,7 @@ export { createProcessTool } from "./process.js";
|
|||
export { createGlobTool } from "./glob.js";
|
||||
export { createWebFetchTool, createWebSearchTool } from "./web/index.js";
|
||||
export { createCronTool } from "./cron/index.js";
|
||||
export { createDataTool } from "./data/index.js";
|
||||
export { createSessionsListTool } from "./sessions-list.js";
|
||||
|
||||
// Tool groups
|
||||
|
|
|
|||
|
|
@ -73,9 +73,9 @@ describe("sessions_list tool", () => {
|
|||
const text = result.content[0]!;
|
||||
expect(text.type).toBe("text");
|
||||
expect((text as { text: string }).text).toContain("3 total");
|
||||
expect((text as { text: string }).text).toContain("[running]");
|
||||
expect((text as { text: string }).text).toContain("[ok]");
|
||||
expect((text as { text: string }).text).toContain("[error]");
|
||||
expect((text as { text: string }).text).toContain("[RUNNING]");
|
||||
expect((text as { text: string }).text).toContain("[OK]");
|
||||
expect((text as { text: string }).text).toContain("[ERROR]");
|
||||
expect((text as { text: string }).text).toContain("Code Review");
|
||||
expect((text as { text: string }).text).toContain("Test Analysis");
|
||||
expect((text as { text: string }).text).toContain("Lint Check");
|
||||
|
|
|
|||
|
|
@ -127,7 +127,9 @@ export function createSessionsListTool(
|
|||
label: "List Subagent Runs",
|
||||
description:
|
||||
"List all subagent runs spawned by this session and their current status. " +
|
||||
"Optionally pass a runId to get detailed information about a specific run.",
|
||||
"Optionally pass a runId to get detailed information about a specific run. " +
|
||||
"NOTE: Do NOT call this immediately after spawning subagents — results arrive automatically in your context when subagents complete. " +
|
||||
"Only use this if a long time has passed or the user explicitly asks about subagent status.",
|
||||
parameters: SessionsListSchema,
|
||||
execute: async (_toolCallId, args) => {
|
||||
const { runId } = args as SessionsListArgs;
|
||||
|
|
@ -173,13 +175,52 @@ export function createSessionsListTool(
|
|||
};
|
||||
}
|
||||
|
||||
const lines = [`Subagent runs for this session: ${runs.length} total`, ""];
|
||||
const someRunning = runs.some((r) => !r.endedAt);
|
||||
|
||||
// Build status lines for each run
|
||||
const statusLines: string[] = [];
|
||||
for (let i = 0; i < runs.length; i++) {
|
||||
lines.push(formatRunSummary(runs[i]!, i, now));
|
||||
const r = runs[i]!;
|
||||
const displayName = r.label || r.task.slice(0, 60);
|
||||
const status = resolveStatus(r);
|
||||
if (status === "running") {
|
||||
const elapsed = r.startedAt ? formatElapsed(now - r.startedAt) : "just spawned";
|
||||
statusLines.push(` ${i + 1}. [RUNNING] "${displayName}" (${elapsed})`);
|
||||
} else {
|
||||
const elapsed = r.startedAt && r.endedAt ? formatElapsed(r.endedAt - r.startedAt) : "";
|
||||
const findings = r.findingsCaptured
|
||||
? (r.findings ? r.findings.slice(0, 200) + (r.findings.length > 200 ? "…" : "") : "(no output)")
|
||||
: "(findings not yet captured)";
|
||||
statusLines.push(` ${i + 1}. [${status.toUpperCase()}] "${displayName}" (${elapsed})\n Findings: ${findings}`);
|
||||
}
|
||||
}
|
||||
|
||||
const header = `Subagent runs for this session: ${runs.length} total`;
|
||||
const body = statusLines.join("\n");
|
||||
|
||||
// If any subagents are still running, return status with wait instruction.
|
||||
// We do NOT use steer() here — steer would cancel unrelated tool calls
|
||||
// that the LLM may be processing in the same batch.
|
||||
if (someRunning) {
|
||||
const runningCount = runs.filter((r) => !r.endedAt).length;
|
||||
return {
|
||||
content: [
|
||||
{
|
||||
type: "text",
|
||||
text:
|
||||
header + "\n" + body + "\n\n" +
|
||||
`STATUS: ${runningCount} subagent(s) still running. This is normal — they need time to complete.\n` +
|
||||
"ACTION REQUIRED: Do NOT call sessions_list again. Results will be delivered into your context automatically when they finish.\n" +
|
||||
"Do NOT attempt to do this work yourself — the subagents are handling it.",
|
||||
},
|
||||
],
|
||||
details: { runs: runs.map(toResultRun) },
|
||||
};
|
||||
}
|
||||
|
||||
// All completed — normal response
|
||||
return {
|
||||
content: [{ type: "text", text: lines.join("\n") }],
|
||||
content: [{ type: "text", text: header + "\n" + body }],
|
||||
details: { runs: runs.map(toResultRun) },
|
||||
};
|
||||
},
|
||||
|
|
|
|||
|
|
@ -35,6 +35,15 @@ const SessionsSpawnSchema = Type.Object({
|
|||
minimum: 0,
|
||||
}),
|
||||
),
|
||||
announce: Type.Optional(
|
||||
Type.Union([Type.Literal("immediate"), Type.Literal("silent")], {
|
||||
description:
|
||||
"Announcement mode. 'immediate' (default): findings delivered as each subagent completes. " +
|
||||
"'silent': defer all announcements until every silent subagent from this session finishes, " +
|
||||
"then deliver one combined report. Use 'silent' when spawning multiple subagents to collect " +
|
||||
"data in parallel and you want to summarize everything at once.",
|
||||
}),
|
||||
),
|
||||
});
|
||||
|
||||
type SessionsSpawnArgs = {
|
||||
|
|
@ -43,6 +52,7 @@ type SessionsSpawnArgs = {
|
|||
model?: string;
|
||||
cleanup?: "delete" | "keep";
|
||||
timeoutSeconds?: number;
|
||||
announce?: "immediate" | "silent";
|
||||
};
|
||||
|
||||
export type SessionsSpawnResult = {
|
||||
|
|
@ -57,6 +67,8 @@ export interface CreateSessionsSpawnToolOptions {
|
|||
isSubagent?: boolean;
|
||||
/** Session ID of the current (requester) agent */
|
||||
sessionId?: string;
|
||||
/** Resolved provider ID of the parent agent (inherited by subagents) */
|
||||
provider?: string;
|
||||
}
|
||||
|
||||
export function createSessionsSpawnTool(
|
||||
|
|
@ -67,11 +79,13 @@ export function createSessionsSpawnTool(
|
|||
label: "Spawn Subagent",
|
||||
description:
|
||||
"Spawn a background subagent to handle a specific task. The subagent runs in an isolated session with its own tool set. " +
|
||||
"When it completes, its findings are announced back to you automatically. " +
|
||||
"When it completes, its findings are delivered directly into your context automatically — you do NOT need to poll or check. " +
|
||||
"IMPORTANT: After spawning subagents, continue with any other immediate tasks you have, or simply finish your turn and wait. " +
|
||||
"Do NOT call sessions_list to check on subagents you just spawned — results take time and will arrive on their own. " +
|
||||
"Use this for parallelizable work, long-running analysis, or tasks that benefit from isolation.",
|
||||
parameters: SessionsSpawnSchema,
|
||||
execute: async (_toolCallId, args) => {
|
||||
const { task, label, model, cleanup = "delete", timeoutSeconds } = args as SessionsSpawnArgs;
|
||||
const { task, label, model, cleanup = "delete", timeoutSeconds, announce } = args as SessionsSpawnArgs;
|
||||
|
||||
// Guard: subagents cannot spawn subagents
|
||||
if (options.isSubagent) {
|
||||
|
|
@ -107,6 +121,7 @@ export function createSessionsSpawnTool(
|
|||
const childAgent = hub.createSubagent(childSessionId, {
|
||||
systemPrompt,
|
||||
model,
|
||||
provider: options.provider,
|
||||
});
|
||||
|
||||
// Register the run for lifecycle tracking.
|
||||
|
|
@ -120,6 +135,7 @@ export function createSessionsSpawnTool(
|
|||
label,
|
||||
cleanup,
|
||||
timeoutSeconds,
|
||||
announce,
|
||||
start: () => childAgent.write(task),
|
||||
});
|
||||
|
||||
|
|
@ -127,7 +143,7 @@ export function createSessionsSpawnTool(
|
|||
content: [
|
||||
{
|
||||
type: "text",
|
||||
text: `Subagent spawned successfully.\n\nRun ID: ${runId}\nSession: ${childSessionId}\nTask: ${label || task.slice(0, 80)}\n\nThe subagent is now working in the background. You will receive its findings when it completes.`,
|
||||
text: `Subagent spawned successfully.\n\nRun ID: ${runId}\nSession: ${childSessionId}\nTask: ${label || task.slice(0, 80)}\n\nThe subagent is now working in the background. Its findings will be delivered directly into your context when it completes — do NOT poll or call sessions_list for it. Continue with other tasks or finish your turn.`,
|
||||
},
|
||||
],
|
||||
details: {
|
||||
|
|
|
|||
|
|
@ -12,6 +12,7 @@ import {
|
|||
GlobeIcon,
|
||||
DatabaseIcon,
|
||||
GitBranchIcon,
|
||||
ChartBarLineIcon,
|
||||
ArrowRight01Icon,
|
||||
} from "@hugeicons/core-free-icons"
|
||||
import { cn, getTextContent } from "@multica/ui/lib/utils"
|
||||
|
|
@ -39,6 +40,7 @@ const TOOL_DISPLAY: Record<string, { label: string; icon: typeof File01Icon }> =
|
|||
memory_delete: { label: "MemoryDelete", icon: DatabaseIcon },
|
||||
memory_list: { label: "MemoryList", icon: DatabaseIcon },
|
||||
sessions_spawn: { label: "SpawnSession", icon: GitBranchIcon },
|
||||
data: { label: "Data", icon: ChartBarLineIcon },
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
|
|
@ -73,6 +75,18 @@ function getSubtitle(toolName: string, args?: Record<string, unknown>): string {
|
|||
return args.query ? String(args.query) : ""
|
||||
case "web_fetch":
|
||||
try { return new URL(String(args.url)).hostname } catch { return String(args.url ?? "") }
|
||||
case "data": {
|
||||
const action = String(args.action ?? "").replace(/^get_/, "")
|
||||
const params = args.params as Record<string, unknown> | undefined
|
||||
const ticker = params?.ticker ? String(params.ticker).toUpperCase() : ""
|
||||
return ticker ? `${action} ${ticker}` : action
|
||||
}
|
||||
case "sessions_spawn": {
|
||||
const label = args.label ? String(args.label) : ""
|
||||
if (label) return label.length > 60 ? label.slice(0, 57) + "…" : label
|
||||
const task = String(args.task ?? "")
|
||||
return task.length > 60 ? task.slice(0, 57) + "…" : task
|
||||
}
|
||||
default:
|
||||
return ""
|
||||
}
|
||||
|
|
@ -91,6 +105,8 @@ const RUNNING_LABELS: Record<string, string> = {
|
|||
glob: "searching…",
|
||||
web_search: "searching…",
|
||||
web_fetch: "fetching…",
|
||||
data: "fetching…",
|
||||
sessions_spawn: "spawning…",
|
||||
}
|
||||
|
||||
/** Stats derived from tool result content */
|
||||
|
|
|
|||
208
skills/dcf-valuation/SKILL.md
Normal file
208
skills/dcf-valuation/SKILL.md
Normal file
|
|
@ -0,0 +1,208 @@
|
|||
---
|
||||
name: DCF Valuation
|
||||
description: Perform Discounted Cash Flow (DCF) valuation analysis for public companies. Use when the user asks to value a stock, calculate intrinsic value, fair value, perform DCF analysis, determine if a stock is undervalued or overvalued, or estimate a price target.
|
||||
version: 1.0.0
|
||||
metadata:
|
||||
emoji: "\U0001F9EE"
|
||||
requires:
|
||||
env:
|
||||
- FINANCIAL_DATASETS_API_KEY
|
||||
tags:
|
||||
- finance
|
||||
- valuation
|
||||
- dcf
|
||||
userInvocable: true
|
||||
disableModelInvocation: false
|
||||
---
|
||||
|
||||
## Instructions
|
||||
|
||||
Perform a rigorous Discounted Cash Flow (DCF) valuation. Follow all steps and show your work.
|
||||
|
||||
### Progress Checklist
|
||||
|
||||
```
|
||||
DCF Analysis Progress:
|
||||
- [ ] Step 1: Gather financial data
|
||||
- [ ] Step 2: Calculate historical FCF and growth
|
||||
- [ ] Step 3: Estimate WACC
|
||||
- [ ] Step 4: Project future cash flows
|
||||
- [ ] Step 5: Calculate present value and fair value
|
||||
- [ ] Step 6: Sensitivity analysis
|
||||
- [ ] Step 7: Validate results
|
||||
- [ ] Step 8: Present findings
|
||||
```
|
||||
|
||||
### Step 1: Gather Financial Data
|
||||
|
||||
Use `data` tool with `domain="finance"` for all calls:
|
||||
|
||||
1. **Cash Flow History** (5 years):
|
||||
```
|
||||
action: "get_cash_flow_statements"
|
||||
params: { ticker: "[TICKER]", period: "annual", limit: 5 }
|
||||
```
|
||||
Extract: `free_cash_flow`, `net_cash_flow_from_operations`, `capital_expenditure`
|
||||
Fallback: FCF = Operating Cash Flow - CapEx
|
||||
|
||||
2. **Income Statements** (5 years):
|
||||
```
|
||||
action: "get_income_statements"
|
||||
params: { ticker: "[TICKER]", period: "annual", limit: 5 }
|
||||
```
|
||||
Extract: `revenue`, `operating_income`, `net_income`, `income_tax_expense`
|
||||
|
||||
3. **Balance Sheet** (latest):
|
||||
```
|
||||
action: "get_balance_sheets"
|
||||
params: { ticker: "[TICKER]", period: "annual", limit: 1 }
|
||||
```
|
||||
Extract: `total_debt`, `cash_and_equivalents`, `outstanding_shares`
|
||||
|
||||
4. **Financial Metrics** (current):
|
||||
```
|
||||
action: "get_financial_metrics_snapshot"
|
||||
params: { ticker: "[TICKER]" }
|
||||
```
|
||||
Extract: `market_cap`, `enterprise_value`, `return_on_invested_capital`, `debt_to_equity`, `free_cash_flow_per_share`
|
||||
|
||||
5. **Analyst Estimates**:
|
||||
```
|
||||
action: "get_analyst_estimates"
|
||||
params: { ticker: "[TICKER]", period: "annual" }
|
||||
```
|
||||
Extract: Forward EPS estimates for growth validation
|
||||
|
||||
6. **Current Price**:
|
||||
```
|
||||
action: "get_price_snapshot"
|
||||
params: { ticker: "[TICKER]" }
|
||||
```
|
||||
|
||||
7. **Company Facts**:
|
||||
```
|
||||
action: "get_company_facts"
|
||||
params: { ticker: "[TICKER]" }
|
||||
```
|
||||
Extract: `sector` — use to determine WACC range from [sector-wacc.md](references/sector-wacc.md)
|
||||
|
||||
### Step 2: Calculate Historical FCF and Growth
|
||||
|
||||
- Compute FCF for each of the last 5 years
|
||||
- Calculate 5-year FCF CAGR: `(FCF_latest / FCF_earliest)^(1/years) - 1`
|
||||
- Cross-validate with: revenue growth, operating income growth, analyst EPS growth
|
||||
- **Cap projected growth at 15%** (sustained higher growth is rare)
|
||||
- If FCF is volatile, weight analyst estimates more heavily
|
||||
|
||||
### Step 3: Estimate WACC
|
||||
|
||||
Use the company's `sector` to look up the base WACC range from [sector-wacc.md](references/sector-wacc.md).
|
||||
|
||||
**Calculate WACC:**
|
||||
```
|
||||
WACC = (E/V) * Re + (D/V) * Rd * (1 - Tax Rate)
|
||||
|
||||
Where:
|
||||
E = Market cap (equity value)
|
||||
D = Total debt
|
||||
V = E + D
|
||||
Re = Risk-free rate + Beta * Equity Risk Premium
|
||||
Rd = Cost of debt (estimate from interest expense / total debt)
|
||||
Tax Rate = Effective tax rate from income statements
|
||||
```
|
||||
|
||||
**Default assumptions:**
|
||||
- Risk-free rate: ~4.0-4.5% (10-year Treasury)
|
||||
- Equity risk premium: ~5.5%
|
||||
- If beta unavailable, use sector average
|
||||
|
||||
**Sanity check:** WACC should be 2-4% below ROIC for value-creating companies.
|
||||
|
||||
### Step 4: Project Future Cash Flows (Years 1-5)
|
||||
|
||||
- Apply growth rate with annual decay (multiply by 0.95 each year)
|
||||
- Year 1: FCF * (1 + growth_rate)
|
||||
- Year 2: FCF * (1 + growth_rate * 0.95)
|
||||
- Year 3: FCF * (1 + growth_rate * 0.90)
|
||||
- Year 4: FCF * (1 + growth_rate * 0.85)
|
||||
- Year 5: FCF * (1 + growth_rate * 0.80)
|
||||
|
||||
**Terminal Value** (Gordon Growth Model):
|
||||
```
|
||||
TV = FCF_Year5 * (1 + g) / (WACC - g)
|
||||
Where g = terminal growth rate (2.5% default, GDP proxy)
|
||||
```
|
||||
|
||||
### Step 5: Calculate Present Value and Fair Value
|
||||
|
||||
```
|
||||
PV of each FCF = FCF_t / (1 + WACC)^t
|
||||
PV of Terminal Value = TV / (1 + WACC)^5
|
||||
|
||||
Enterprise Value = Sum of PV(FCFs) + PV(Terminal Value)
|
||||
Net Debt = Total Debt - Cash and Equivalents
|
||||
Equity Value = Enterprise Value - Net Debt
|
||||
Fair Value per Share = Equity Value / Shares Outstanding
|
||||
```
|
||||
|
||||
### Step 6: Sensitivity Analysis
|
||||
|
||||
Create a matrix varying two key assumptions:
|
||||
|
||||
| | TG 2.0% | TG 2.5% | TG 3.0% |
|
||||
|---|---|---|---|
|
||||
| **WACC -1%** | $ | $ | $ |
|
||||
| **WACC base** | $ | $ | $ |
|
||||
| **WACC +1%** | $ | $ | $ |
|
||||
|
||||
(TG = Terminal Growth Rate)
|
||||
|
||||
### Step 7: Validate Results
|
||||
|
||||
Before presenting, check:
|
||||
|
||||
1. **EV comparison**: Calculated EV within 30% of reported enterprise_value
|
||||
- If off by >30%, revisit WACC or growth assumptions
|
||||
2. **Terminal value ratio**: Should be 50-80% of total EV for mature companies
|
||||
- If >90%, growth rate may be too high
|
||||
- If <40%, near-term projections may be aggressive
|
||||
3. **FCF yield check**: Compare fair value FCF yield to current market FCF yield
|
||||
|
||||
If validation fails, adjust assumptions and recalculate.
|
||||
|
||||
### Step 8: Present Results
|
||||
|
||||
Format clearly with:
|
||||
|
||||
1. **Executive Summary**
|
||||
- Current price vs. fair value estimate
|
||||
- Upside/downside percentage
|
||||
- Verdict: Undervalued / Fairly Valued / Overvalued
|
||||
|
||||
2. **Key Assumptions Table**
|
||||
| Assumption | Value | Source |
|
||||
|---|---|---|
|
||||
| Growth Rate | X% | 5Y CAGR + analyst cross-check |
|
||||
| WACC | X% | Sector range + company adjustments |
|
||||
| Terminal Growth | X% | GDP proxy |
|
||||
| Tax Rate | X% | Effective rate from financials |
|
||||
|
||||
3. **Projected FCF Table**
|
||||
| Year | FCF | Growth | PV of FCF |
|
||||
|---|---|---|---|
|
||||
|
||||
4. **Valuation Bridge**
|
||||
- PV of projected FCFs
|
||||
- PV of Terminal Value
|
||||
- = Enterprise Value
|
||||
- - Net Debt
|
||||
- = Equity Value
|
||||
- / Shares Outstanding
|
||||
- = **Fair Value per Share**
|
||||
|
||||
5. **Sensitivity Matrix** (from Step 6)
|
||||
|
||||
6. **Risks & Caveats**
|
||||
- Key risks to the valuation thesis
|
||||
- DCF limitations (sensitive to growth and WACC assumptions)
|
||||
- Company-specific caveats (high debt, cyclicality, early-stage, etc.)
|
||||
40
skills/dcf-valuation/references/sector-wacc.md
Normal file
40
skills/dcf-valuation/references/sector-wacc.md
Normal file
|
|
@ -0,0 +1,40 @@
|
|||
# Sector WACC Reference
|
||||
|
||||
Use the company's `sector` from `get_company_facts` to look up the base WACC range below, then adjust for company-specific factors.
|
||||
|
||||
## WACC by Sector
|
||||
|
||||
| Sector | Typical WACC Range | Notes |
|
||||
|--------|-------------------|-------|
|
||||
| Communication Services | 8-10% | Mix of stable telecom and growth media |
|
||||
| Consumer Discretionary | 8-10% | Cyclical exposure |
|
||||
| Consumer Staples | 7-8% | Defensive, stable demand |
|
||||
| Energy | 9-11% | Commodity price exposure |
|
||||
| Financials | 8-10% | Leverage already in business model |
|
||||
| Health Care | 8-10% | Regulatory and pipeline risk |
|
||||
| Industrials | 8-9% | Moderate cyclicality |
|
||||
| Information Technology | 8-12% | Higher end for high-growth; lower for mature |
|
||||
| Materials | 8-10% | Cyclical, commodity exposure |
|
||||
| Real Estate | 7-9% | Interest rate sensitivity |
|
||||
| Utilities | 6-7% | Regulated, stable cash flows |
|
||||
|
||||
## Adjustment Factors
|
||||
|
||||
**Add to base WACC:**
|
||||
- High debt (D/E > 1.5): +1-2%
|
||||
- Small cap (< $2B market cap): +1-2%
|
||||
- Emerging markets exposure: +1-3%
|
||||
- Concentrated customer base: +0.5-1%
|
||||
- Regulatory uncertainty: +0.5-1.5%
|
||||
|
||||
**Subtract from base WACC:**
|
||||
- Market leader with moat: -0.5-1%
|
||||
- Recurring revenue model: -0.5-1%
|
||||
- Investment grade credit: -0.5%
|
||||
|
||||
## Sanity Checks
|
||||
|
||||
- WACC should typically be 2-4% below ROIC for value-creating companies
|
||||
- If WACC > ROIC, the company may be destroying value
|
||||
- Typical range for US large-cap: 7-12%
|
||||
- Anything below 6% or above 14% warrants extra scrutiny
|
||||
91
skills/finance-research/SKILL.md
Normal file
91
skills/finance-research/SKILL.md
Normal file
|
|
@ -0,0 +1,91 @@
|
|||
---
|
||||
name: Finance Research
|
||||
description: Conduct financial research and analysis including stock analysis, company fundamentals, SEC filings review, and market data retrieval. Use when the user asks about stocks, financial statements, company performance, market data, or investment analysis.
|
||||
version: 1.0.0
|
||||
metadata:
|
||||
emoji: "\U0001F4CA"
|
||||
requires:
|
||||
env:
|
||||
- FINANCIAL_DATASETS_API_KEY
|
||||
tags:
|
||||
- finance
|
||||
- research
|
||||
- stocks
|
||||
- data
|
||||
userInvocable: true
|
||||
disableModelInvocation: false
|
||||
---
|
||||
|
||||
## Instructions
|
||||
|
||||
You are conducting financial research using real market data. Use the `data` tool with `domain="finance"` and the appropriate action.
|
||||
|
||||
### Available Data Actions
|
||||
|
||||
#### Price Data
|
||||
- `get_price_snapshot` — Current stock price. Params: `{ ticker }`
|
||||
- `get_prices` — Historical OHLCV prices. Params: `{ ticker, start_date, end_date, interval?, interval_multiplier? }`
|
||||
- interval: "day" (default), "week", "month", "year"
|
||||
- `get_crypto_price_snapshot` — Current crypto price. Params: `{ ticker }` (e.g. "BTC-USD")
|
||||
- `get_crypto_prices` — Historical crypto prices. Same params as get_prices.
|
||||
- `get_available_crypto_tickers` — List available crypto tickers. Params: `{}`
|
||||
|
||||
#### Financial Statements
|
||||
All share params: `{ ticker, period, limit?, report_period_gt?, report_period_gte?, report_period_lt?, report_period_lte? }`
|
||||
- period: "annual", "quarterly", or "ttm"
|
||||
- Dates in YYYY-MM-DD format
|
||||
|
||||
Actions:
|
||||
- `get_income_statements` — Revenue, expenses, net income, EPS
|
||||
- `get_balance_sheets` — Assets, liabilities, equity, debt, cash
|
||||
- `get_cash_flow_statements` — Operating, investing, financing cash flows, FCF
|
||||
- `get_all_financial_statements` — All three at once (more efficient when you need multiple)
|
||||
|
||||
#### Metrics & Estimates
|
||||
- `get_financial_metrics_snapshot` — Current key ratios (P/E, market cap, margins, etc.). Params: `{ ticker }`
|
||||
- `get_financial_metrics` — Historical metrics. Params: `{ ticker, period?, limit?, report_period*? }`
|
||||
- `get_analyst_estimates` — EPS and revenue estimates. Params: `{ ticker, period? }`
|
||||
|
||||
#### Company Info
|
||||
- `get_company_facts` — Sector, industry, employees, exchange, website. Params: `{ ticker }`
|
||||
- `get_news` — Recent news articles. Params: `{ ticker, start_date?, end_date?, limit? }`
|
||||
- `get_insider_trades` — Insider buying/selling (SEC Form 4). Params: `{ ticker, limit?, filing_date*? }`
|
||||
- `get_segmented_revenues` — Revenue by segment/geography. Params: `{ ticker, period, limit? }`
|
||||
|
||||
#### SEC Filings
|
||||
- `get_filings` — List filings metadata. Params: `{ ticker, filing_type?, limit? }`
|
||||
- filing_type: "10-K", "10-Q", "8-K"
|
||||
- `get_filing_items` — Read specific filing sections. Params: `{ ticker, filing_type, accession_number?, item? }`
|
||||
- item: array of section names (e.g. ["Item-1A", "Item-7"] for 10-K)
|
||||
|
||||
### Research Workflow
|
||||
|
||||
1. **Understand** what financial data is needed
|
||||
2. **Get context** — start with `get_price_snapshot` and `get_company_facts` for orientation
|
||||
3. **Gather data** — use the appropriate actions for the analysis
|
||||
4. **Analyze** — interpret data with proper financial reasoning
|
||||
5. **Present** — clear findings with data tables and key takeaways
|
||||
|
||||
### Best Practices
|
||||
|
||||
- Use `get_all_financial_statements` when you need multiple statement types (saves API calls)
|
||||
- Use annual data for trend analysis, quarterly for recent performance, TTM for current state
|
||||
- Cross-reference metrics: revenue growth vs cash flow growth, margins vs peers
|
||||
- Always note the time period and currency when presenting financial data
|
||||
- For SEC filing analysis: first `get_filings` to find relevant filings, then `get_filing_items` to read specific sections
|
||||
- Common 10-K items: Item-1 (Business), Item-1A (Risk Factors), Item-7 (MD&A), Item-8 (Financial Statements)
|
||||
- Common 10-Q items: Part-1,Item-1 (Financial Statements), Part-1,Item-2 (MD&A)
|
||||
|
||||
### Example: Company Analysis
|
||||
|
||||
For "Analyze Apple's financial health":
|
||||
|
||||
```
|
||||
1. data(domain="finance", action="get_price_snapshot", params={ticker: "AAPL"})
|
||||
2. data(domain="finance", action="get_company_facts", params={ticker: "AAPL"})
|
||||
3. data(domain="finance", action="get_all_financial_statements", params={ticker: "AAPL", period: "annual", limit: 3})
|
||||
4. data(domain="finance", action="get_financial_metrics_snapshot", params={ticker: "AAPL"})
|
||||
5. data(domain="finance", action="get_analyst_estimates", params={ticker: "AAPL"})
|
||||
```
|
||||
|
||||
Then analyze trends, margins, growth rates, and present findings.
|
||||
Loading…
Add table
Add a link
Reference in a new issue