diff --git a/packages/sdk/src/actions/index.ts b/packages/sdk/src/actions/index.ts index d39cc5b0..2f6ab2e3 100644 --- a/packages/sdk/src/actions/index.ts +++ b/packages/sdk/src/actions/index.ts @@ -27,4 +27,4 @@ export { type UpdateGatewayResult, } from "./rpc"; -export { StreamAction, type StreamPayload } from "./stream"; +export { StreamAction, type StreamState, type StreamPayload } from "./stream"; diff --git a/packages/sdk/src/actions/stream.ts b/packages/sdk/src/actions/stream.ts index 98f52423..da8dae31 100644 --- a/packages/sdk/src/actions/stream.ts +++ b/packages/sdk/src/actions/stream.ts @@ -2,10 +2,19 @@ export const StreamAction = "stream" as const; +/** 流消息状态 */ +export type StreamState = "delta" | "final" | "error"; + /** 流消息 payload */ -export interface StreamPayload { - /** 流 ID,用于关联同一个流的所有消息 */ +export interface StreamPayload { + /** 流 ID(即 messageId),关联同一个流的所有消息 */ streamId: string; - /** 数据 */ - data: T; + /** 所属 agent ID */ + agentId: string; + /** 流状态 */ + state: StreamState; + /** 累计文本内容(delta/final 时) */ + content?: string; + /** 错误信息(error 时) */ + error?: string; } diff --git a/src/agent/async-agent.ts b/src/agent/async-agent.ts index 06f753bb..d6443b8b 100644 --- a/src/agent/async-agent.ts +++ b/src/agent/async-agent.ts @@ -1,7 +1,9 @@ import { v7 as uuidv7 } from "uuid"; import { Agent } from "./runner.js"; import { Channel } from "./channel.js"; +import { extractText } from "./extract-text.js"; import type { AgentOptions, Message } from "./types.js"; +import type { StreamPayload } from "@multica/sdk"; const devNull = { write: () => true } as NodeJS.WritableStream; @@ -10,6 +12,7 @@ export class AsyncAgent { private readonly channel = new Channel(); private _closed = false; private queue: Promise = Promise.resolve(); + private streamCallback?: (payload: StreamPayload) => void; readonly sessionId: string; constructor(options?: AgentOptions) { @@ -18,12 +21,18 @@ export class AsyncAgent { logger: { stdout: devNull, stderr: devNull }, }); this.sessionId = this.agent.sessionId; + this.setupStreamEvents(); } get closed(): boolean { return this._closed; } + /** Register callback for streaming events */ + onStream(cb: (payload: StreamPayload) => void): void { + this.streamCallback = cb; + } + /** Write message to agent (non-blocking, serialized queue) */ write(content: string): void { if (this._closed) throw new Error("Agent is closed"); @@ -32,11 +41,15 @@ export class AsyncAgent { .then(async () => { if (this._closed) return; const result = await this.agent.run(content); - if (result.text) { - this.channel.send({ id: uuidv7(), content: result.text }); - } - if (result.error) { - this.channel.send({ id: uuidv7(), content: `[error] ${result.error}` }); + // Only send final message via channel if no stream callback + // (stream callback already sent the final content) + if (!this.streamCallback) { + if (result.text) { + this.channel.send({ id: uuidv7(), content: result.text }); + } + if (result.error) { + this.channel.send({ id: uuidv7(), content: `[error] ${result.error}` }); + } } }) .catch((err) => { @@ -56,4 +69,50 @@ export class AsyncAgent { this._closed = true; this.channel.close(); } + + private setupStreamEvents(): void { + let currentStreamId: string | null = null; + + this.agent.subscribe((event) => { + if (!this.streamCallback) return; + + switch (event.type) { + case "message_start": { + if (event.message.role === "assistant") { + currentStreamId = uuidv7(); + this.streamCallback({ + streamId: currentStreamId, + agentId: this.sessionId, + state: "delta", + content: extractText(event.message), + }); + } + break; + } + case "message_update": { + if (event.message.role === "assistant" && currentStreamId) { + this.streamCallback({ + streamId: currentStreamId, + agentId: this.sessionId, + state: "delta", + content: extractText(event.message), + }); + } + break; + } + case "message_end": { + if (event.message.role === "assistant" && currentStreamId) { + this.streamCallback({ + streamId: currentStreamId, + agentId: this.sessionId, + state: "final", + content: extractText(event.message), + }); + currentStreamId = null; + } + break; + } + } + }); + } } diff --git a/src/agent/cli/output.ts b/src/agent/cli/output.ts index 3c6c9835..ba3d465e 100644 --- a/src/agent/cli/output.ts +++ b/src/agent/cli/output.ts @@ -1,5 +1,6 @@ import type { AgentEvent, AgentMessage } from "@mariozechner/pi-agent-core"; import { colors, createSpinner } from "./colors.js"; +import { extractText } from "../extract-text.js"; export type AgentOutputState = { lastAssistantText: string; @@ -12,16 +13,6 @@ export type AgentOutput = { handleEvent: (event: AgentEvent) => void; }; -function extractText(message: AgentMessage | undefined): string { - if (!message || typeof message !== "object" || !("content" in message)) return ""; - const content = (message as { content?: Array<{ type: string; text?: string }> }).content; - if (!Array.isArray(content)) return ""; - return content - .filter((c) => c.type === "text") - .map((c) => c.text ?? "") - .join(""); -} - function truncate(s: string, max: number): string { return s.length > max ? s.slice(0, max) + "…" : s; } diff --git a/src/agent/extract-text.ts b/src/agent/extract-text.ts new file mode 100644 index 00000000..145a2c97 --- /dev/null +++ b/src/agent/extract-text.ts @@ -0,0 +1,12 @@ +import type { AgentMessage } from "@mariozechner/pi-agent-core"; + +/** Extract plain text content from an AgentMessage */ +export function extractText(message: AgentMessage | undefined): string { + if (!message || typeof message !== "object" || !("content" in message)) return ""; + const content = (message as { content?: Array<{ type: string; text?: string }> }).content; + if (!Array.isArray(content)) return ""; + return content + .filter((c) => c.type === "text") + .map((c) => c.text ?? "") + .join(""); +} diff --git a/src/agent/runner.ts b/src/agent/runner.ts index bf73268f..4a161aea 100644 --- a/src/agent/runner.ts +++ b/src/agent/runner.ts @@ -234,6 +234,11 @@ export class Agent { }); } + /** Subscribe to agent events (returns unsubscribe function) */ + subscribe(fn: (event: AgentEvent) => void): () => void { + return this.agent.subscribe(fn); + } + async run(prompt: string): Promise { this.output.state.lastAssistantText = ""; await this.agent.prompt(prompt); diff --git a/src/hub/hub.ts b/src/hub/hub.ts index 984082a2..3d1eae3b 100644 --- a/src/hub/hub.ts +++ b/src/hub/hub.ts @@ -3,6 +3,7 @@ import { type ConnectionState, RequestAction, ResponseAction, + StreamAction, type RequestPayload, type ResponseSuccessPayload, type ResponseErrorPayload, @@ -143,7 +144,15 @@ export class Hub { addAgentRecord({ id: agent.sessionId, createdAt: Date.now() }); } - // Internally consume messages produced by agent + // Forward streaming events to the requesting client + agent.onStream((payload) => { + const targetDeviceId = this.agentSenders.get(agent.sessionId); + if (targetDeviceId) { + this.client.send(targetDeviceId, StreamAction, payload); + } + }); + + // Internally consume messages produced by agent (fallback for non-stream scenarios) void this.consumeAgent(agent); console.log(`Agent created: ${agent.sessionId}`);