Merge pull request #64 from multica-ai/agent-intermediate-steps
feat(agent): stream raw AgentEvent to Hub via Gateway
This commit is contained in:
commit
09f7e93496
4 changed files with 90 additions and 119 deletions
|
|
@ -1,18 +1,19 @@
|
|||
import { v7 as uuidv7 } from "uuid";
|
||||
import type { AgentEvent } from "@mariozechner/pi-agent-core";
|
||||
import { Agent } from "./runner.js";
|
||||
import { Channel } from "./channel.js";
|
||||
import { extractText } from "./extract-text.js";
|
||||
import type { AgentOptions, Message } from "./types.js";
|
||||
import type { StreamPayload } from "@multica/sdk";
|
||||
|
||||
const devNull = { write: () => true } as NodeJS.WritableStream;
|
||||
|
||||
/** Discriminated union of legacy Message (error fallback) and raw AgentEvent */
|
||||
export type ChannelItem = Message | AgentEvent;
|
||||
|
||||
export class AsyncAgent {
|
||||
private readonly agent: Agent;
|
||||
private readonly channel = new Channel<Message>();
|
||||
private readonly channel = new Channel<ChannelItem>();
|
||||
private _closed = false;
|
||||
private queue: Promise<void> = Promise.resolve();
|
||||
private streamCallback?: (payload: StreamPayload) => void;
|
||||
readonly sessionId: string;
|
||||
|
||||
constructor(options?: AgentOptions) {
|
||||
|
|
@ -21,18 +22,17 @@ export class AsyncAgent {
|
|||
logger: { stdout: devNull, stderr: devNull },
|
||||
});
|
||||
this.sessionId = this.agent.sessionId;
|
||||
this.setupStreamEvents();
|
||||
|
||||
// Forward raw AgentEvent into the channel
|
||||
this.agent.subscribe((event: AgentEvent) => {
|
||||
this.channel.send(event);
|
||||
});
|
||||
}
|
||||
|
||||
get closed(): boolean {
|
||||
return this._closed;
|
||||
}
|
||||
|
||||
/** Register callback for streaming events */
|
||||
onStream(cb: (payload: StreamPayload) => void): void {
|
||||
this.streamCallback = cb;
|
||||
}
|
||||
|
||||
/** Write message to agent (non-blocking, serialized queue) */
|
||||
write(content: string): void {
|
||||
if (this._closed) throw new Error("Agent is closed");
|
||||
|
|
@ -41,15 +41,9 @@ export class AsyncAgent {
|
|||
.then(async () => {
|
||||
if (this._closed) return;
|
||||
const result = await this.agent.run(content);
|
||||
// Only send final message via channel if no stream callback
|
||||
// (stream callback already sent the final content)
|
||||
if (!this.streamCallback) {
|
||||
if (result.text) {
|
||||
this.channel.send({ id: uuidv7(), content: result.text });
|
||||
}
|
||||
if (result.error) {
|
||||
this.channel.send({ id: uuidv7(), content: `[error] ${result.error}` });
|
||||
}
|
||||
// Normal text is delivered via message_end event; only handle errors here
|
||||
if (result.error) {
|
||||
this.channel.send({ id: uuidv7(), content: `[error] ${result.error}` });
|
||||
}
|
||||
})
|
||||
.catch((err) => {
|
||||
|
|
@ -58,8 +52,8 @@ export class AsyncAgent {
|
|||
});
|
||||
}
|
||||
|
||||
/** Continuously read message stream */
|
||||
read(): AsyncIterable<Message> {
|
||||
/** Continuously read channel stream (AgentEvent + error Messages) */
|
||||
read(): AsyncIterable<ChannelItem> {
|
||||
return this.channel;
|
||||
}
|
||||
|
||||
|
|
@ -69,50 +63,4 @@ export class AsyncAgent {
|
|||
this._closed = true;
|
||||
this.channel.close();
|
||||
}
|
||||
|
||||
private setupStreamEvents(): void {
|
||||
let currentStreamId: string | null = null;
|
||||
|
||||
this.agent.subscribe((event) => {
|
||||
if (!this.streamCallback) return;
|
||||
|
||||
switch (event.type) {
|
||||
case "message_start": {
|
||||
if (event.message.role === "assistant") {
|
||||
currentStreamId = uuidv7();
|
||||
this.streamCallback({
|
||||
streamId: currentStreamId,
|
||||
agentId: this.sessionId,
|
||||
state: "delta",
|
||||
content: extractText(event.message),
|
||||
});
|
||||
}
|
||||
break;
|
||||
}
|
||||
case "message_update": {
|
||||
if (event.message.role === "assistant" && currentStreamId) {
|
||||
this.streamCallback({
|
||||
streamId: currentStreamId,
|
||||
agentId: this.sessionId,
|
||||
state: "delta",
|
||||
content: extractText(event.message),
|
||||
});
|
||||
}
|
||||
break;
|
||||
}
|
||||
case "message_end": {
|
||||
if (event.message.role === "assistant" && currentStreamId) {
|
||||
this.streamCallback({
|
||||
streamId: currentStreamId,
|
||||
agentId: this.sessionId,
|
||||
state: "final",
|
||||
content: extractText(event.message),
|
||||
});
|
||||
currentStreamId = null;
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -134,6 +134,7 @@ const PROVIDER_REGISTRY: Record<string, ProviderMeta> = {
|
|||
*/
|
||||
export const PROVIDER_ALIAS: Record<string, string> = {
|
||||
"claude-code": "anthropic", // Claude Code OAuth uses anthropic API
|
||||
"openai-codex": "openai", // Codex OAuth uses OpenAI API
|
||||
};
|
||||
|
||||
// ============================================================
|
||||
|
|
|
|||
|
|
@ -3,17 +3,11 @@ import { v7 as uuidv7 } from "uuid";
|
|||
import type { AgentOptions, AgentRunResult } from "./types.js";
|
||||
import { createAgentOutput } from "./cli/output.js";
|
||||
import { resolveModel, resolveTools } from "./tools.js";
|
||||
import { resolveApiKey, resolveBaseUrl, resolveModelId } from "./providers/index.js";
|
||||
import { SessionManager } from "./session/session-manager.js";
|
||||
import { ProfileManager } from "./profile/index.js";
|
||||
import { SkillManager } from "./skills/index.js";
|
||||
import { credentialManager, getCredentialsPath } from "./credentials.js";
|
||||
import {
|
||||
resolveApiKey,
|
||||
resolveBaseUrl,
|
||||
resolveModelId,
|
||||
isOAuthProvider,
|
||||
getLoginInstructions,
|
||||
} from "./providers/index.js";
|
||||
import {
|
||||
checkContextWindow,
|
||||
DEFAULT_CONTEXT_TOKENS,
|
||||
|
|
@ -44,37 +38,10 @@ export class Agent {
|
|||
const resolvedModel = resolveModelId(resolvedProvider, options.model);
|
||||
const apiKey = resolveApiKey(resolvedProvider, options.apiKey);
|
||||
|
||||
// Validate credentials before proceeding
|
||||
if (!apiKey) {
|
||||
if (isOAuthProvider(resolvedProvider)) {
|
||||
// OAuth provider without valid credentials - show login instructions
|
||||
const instructions = getLoginInstructions(resolvedProvider);
|
||||
throw new Error(
|
||||
`Provider "${resolvedProvider}" requires authentication.\n\n` +
|
||||
`${instructions}\n\n` +
|
||||
`After logging in, run: multica --provider ${resolvedProvider}`,
|
||||
);
|
||||
}
|
||||
// API Key provider without key - show configuration instructions
|
||||
throw new Error(
|
||||
`Provider "${resolvedProvider}" requires an API key.\n\n` +
|
||||
`Add your API key to: ${getCredentialsPath()}\n\n` +
|
||||
`Example:\n` +
|
||||
`{\n` +
|
||||
` "llm": {\n` +
|
||||
` "provider": "${resolvedProvider}",\n` +
|
||||
` "providers": {\n` +
|
||||
` "${resolvedProvider}": {\n` +
|
||||
` "apiKey": "your-api-key-here"\n` +
|
||||
` }\n` +
|
||||
` }\n` +
|
||||
` }\n` +
|
||||
`}`,
|
||||
);
|
||||
}
|
||||
|
||||
this.agent = new PiAgentCore(
|
||||
{ getApiKey: (_provider: string) => apiKey },
|
||||
apiKey
|
||||
? { getApiKey: (_provider: string) => apiKey }
|
||||
: {},
|
||||
);
|
||||
|
||||
// Load Agent Profile (if profileId is specified)
|
||||
|
|
@ -162,7 +129,9 @@ export class Agent {
|
|||
const compactionMode = options.compactionMode ?? "tokens"; // 默认使用 token 模式
|
||||
|
||||
// 获取 API Key(用于 summary 模式)
|
||||
const summaryApiKey = compactionMode === "summary" ? resolveApiKey(model.provider, options.apiKey) : undefined;
|
||||
const summaryApiKey = compactionMode === "summary"
|
||||
? resolveApiKey(resolvedProvider, options.apiKey)
|
||||
: undefined;
|
||||
|
||||
// 创建 SessionManager(带 context window 配置)
|
||||
this.session = new SessionManager({
|
||||
|
|
@ -241,7 +210,7 @@ export class Agent {
|
|||
});
|
||||
}
|
||||
|
||||
/** Subscribe to agent events (returns unsubscribe function) */
|
||||
/** Subscribe to raw AgentEvent from the underlying engine */
|
||||
subscribe(fn: (event: AgentEvent) => void): () => void {
|
||||
return this.agent.subscribe(fn);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -22,6 +22,8 @@ import { createUpdateGatewayHandler } from "./rpc/handlers/update-gateway.js";
|
|||
export class Hub {
|
||||
private readonly agents = new Map<string, AsyncAgent>();
|
||||
private readonly agentSenders = new Map<string, string>();
|
||||
private readonly agentStreamIds = new Map<string, string>();
|
||||
private readonly agentStreamCounters = new Map<string, number>();
|
||||
private readonly rpc: RpcDispatcher;
|
||||
private client: GatewayClient;
|
||||
url: string;
|
||||
|
|
@ -144,31 +146,77 @@ export class Hub {
|
|||
addAgentRecord({ id: agent.sessionId, createdAt: Date.now() });
|
||||
}
|
||||
|
||||
// Forward streaming events to the requesting client
|
||||
agent.onStream((payload) => {
|
||||
const targetDeviceId = this.agentSenders.get(agent.sessionId);
|
||||
if (targetDeviceId) {
|
||||
this.client.send(targetDeviceId, StreamAction, payload);
|
||||
}
|
||||
});
|
||||
|
||||
// Internally consume messages produced by agent (fallback for non-stream scenarios)
|
||||
// Internally consume agent output (AgentEvent stream + error Messages)
|
||||
void this.consumeAgent(agent);
|
||||
|
||||
console.log(`Agent created: ${agent.sessionId}`);
|
||||
return agent;
|
||||
}
|
||||
|
||||
private getMessageIdFromEvent(event: unknown): string | undefined {
|
||||
if (!event || typeof event !== "object") return undefined;
|
||||
const maybeMsg = (event as { message?: unknown }).message;
|
||||
if (!maybeMsg || typeof maybeMsg !== "object") return undefined;
|
||||
const id = (maybeMsg as { id?: unknown }).id;
|
||||
return typeof id === "string" && id.length > 0 ? id : undefined;
|
||||
}
|
||||
|
||||
private beginStream(agentId: string, event: unknown): string {
|
||||
const explicitId = this.getMessageIdFromEvent(event);
|
||||
if (explicitId) {
|
||||
this.agentStreamIds.set(agentId, explicitId);
|
||||
return explicitId;
|
||||
}
|
||||
const next = (this.agentStreamCounters.get(agentId) ?? 0) + 1;
|
||||
this.agentStreamCounters.set(agentId, next);
|
||||
const fallback = `${agentId}:${next}`;
|
||||
this.agentStreamIds.set(agentId, fallback);
|
||||
return fallback;
|
||||
}
|
||||
|
||||
private getActiveStreamId(agentId: string, event: unknown): string {
|
||||
return this.agentStreamIds.get(agentId) ?? this.getMessageIdFromEvent(event) ?? agentId;
|
||||
}
|
||||
|
||||
private endStream(agentId: string): void {
|
||||
this.agentStreamIds.delete(agentId);
|
||||
}
|
||||
|
||||
/** Internally read agent output and send via Gateway */
|
||||
private async consumeAgent(agent: AsyncAgent): Promise<void> {
|
||||
for await (const msg of agent.read()) {
|
||||
console.log(`[${agent.sessionId}] ${msg.content}`);
|
||||
for await (const item of agent.read()) {
|
||||
const targetDeviceId = this.agentSenders.get(agent.sessionId);
|
||||
if (targetDeviceId) {
|
||||
if (!targetDeviceId) continue;
|
||||
|
||||
if ("content" in item) {
|
||||
// Legacy Message (error fallback)
|
||||
console.log(`[${agent.sessionId}] ${item.content}`);
|
||||
this.client.send(targetDeviceId, "message", {
|
||||
agentId: agent.sessionId,
|
||||
content: msg.content,
|
||||
content: item.content,
|
||||
});
|
||||
} else {
|
||||
// Filter: only forward events useful for frontend rendering
|
||||
const maybeMessage = (item as { message?: { role?: string } }).message;
|
||||
const isAssistantMessage = maybeMessage?.role === "assistant";
|
||||
const shouldForward =
|
||||
((item.type === "message_start" || item.type === "message_update" || item.type === "message_end") && isAssistantMessage)
|
||||
|| item.type === "tool_execution_start"
|
||||
|| item.type === "tool_execution_end";
|
||||
if (!shouldForward) continue;
|
||||
|
||||
if (item.type === "message_start") {
|
||||
this.beginStream(agent.sessionId, item);
|
||||
}
|
||||
const streamId = this.getActiveStreamId(agent.sessionId, item);
|
||||
this.client.send(targetDeviceId, StreamAction, {
|
||||
streamId,
|
||||
agentId: agent.sessionId,
|
||||
event: item,
|
||||
});
|
||||
if (item.type === "message_end") {
|
||||
this.endStream(agent.sessionId);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -211,6 +259,8 @@ export class Hub {
|
|||
agent.close();
|
||||
this.agents.delete(id);
|
||||
this.agentSenders.delete(id);
|
||||
this.agentStreamIds.delete(id);
|
||||
this.agentStreamCounters.delete(id);
|
||||
removeAgentRecord(id);
|
||||
return true;
|
||||
}
|
||||
|
|
@ -219,6 +269,9 @@ export class Hub {
|
|||
for (const [id, agent] of this.agents) {
|
||||
agent.close();
|
||||
this.agents.delete(id);
|
||||
this.agentSenders.delete(id);
|
||||
this.agentStreamIds.delete(id);
|
||||
this.agentStreamCounters.delete(id);
|
||||
}
|
||||
this.client.disconnect();
|
||||
console.log("Hub shut down");
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue