feat(agent): add streaming support for AI message generation

AsyncAgent now subscribes to pi-agent-core events (message_start,
message_update, message_end) and forwards incremental text deltas
through a stream callback. Hub registers the callback and sends
stream payloads to the requesting client via Gateway.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Naiyuan Qing 2026-02-02 17:18:10 +08:00
parent 6b0340480b
commit d04bed8175
7 changed files with 106 additions and 21 deletions

View file

@ -27,4 +27,4 @@ export {
type UpdateGatewayResult,
} from "./rpc";
export { StreamAction, type StreamPayload } from "./stream";
export { StreamAction, type StreamState, type StreamPayload } from "./stream";

View file

@ -2,10 +2,19 @@
export const StreamAction = "stream" as const;
/** 流消息状态 */
export type StreamState = "delta" | "final" | "error";
/** 流消息 payload */
export interface StreamPayload<T = unknown> {
/** 流 ID用于关联同一个流的所有消息 */
export interface StreamPayload {
/** 流 ID(即 messageId,关联同一个流的所有消息 */
streamId: string;
/** 数据 */
data: T;
/** 所属 agent ID */
agentId: string;
/** 流状态 */
state: StreamState;
/** 累计文本内容delta/final 时) */
content?: string;
/** 错误信息error 时) */
error?: string;
}

View file

@ -1,7 +1,9 @@
import { v7 as uuidv7 } from "uuid";
import { Agent } from "./runner.js";
import { Channel } from "./channel.js";
import { extractText } from "./extract-text.js";
import type { AgentOptions, Message } from "./types.js";
import type { StreamPayload } from "@multica/sdk";
const devNull = { write: () => true } as NodeJS.WritableStream;
@ -10,6 +12,7 @@ export class AsyncAgent {
private readonly channel = new Channel<Message>();
private _closed = false;
private queue: Promise<void> = Promise.resolve();
private streamCallback?: (payload: StreamPayload) => void;
readonly sessionId: string;
constructor(options?: AgentOptions) {
@ -18,12 +21,18 @@ export class AsyncAgent {
logger: { stdout: devNull, stderr: devNull },
});
this.sessionId = this.agent.sessionId;
this.setupStreamEvents();
}
get closed(): boolean {
return this._closed;
}
/** Register callback for streaming events */
onStream(cb: (payload: StreamPayload) => void): void {
this.streamCallback = cb;
}
/** Write message to agent (non-blocking, serialized queue) */
write(content: string): void {
if (this._closed) throw new Error("Agent is closed");
@ -32,11 +41,15 @@ export class AsyncAgent {
.then(async () => {
if (this._closed) return;
const result = await this.agent.run(content);
if (result.text) {
this.channel.send({ id: uuidv7(), content: result.text });
}
if (result.error) {
this.channel.send({ id: uuidv7(), content: `[error] ${result.error}` });
// Only send final message via channel if no stream callback
// (stream callback already sent the final content)
if (!this.streamCallback) {
if (result.text) {
this.channel.send({ id: uuidv7(), content: result.text });
}
if (result.error) {
this.channel.send({ id: uuidv7(), content: `[error] ${result.error}` });
}
}
})
.catch((err) => {
@ -56,4 +69,50 @@ export class AsyncAgent {
this._closed = true;
this.channel.close();
}
private setupStreamEvents(): void {
let currentStreamId: string | null = null;
this.agent.subscribe((event) => {
if (!this.streamCallback) return;
switch (event.type) {
case "message_start": {
if (event.message.role === "assistant") {
currentStreamId = uuidv7();
this.streamCallback({
streamId: currentStreamId,
agentId: this.sessionId,
state: "delta",
content: extractText(event.message),
});
}
break;
}
case "message_update": {
if (event.message.role === "assistant" && currentStreamId) {
this.streamCallback({
streamId: currentStreamId,
agentId: this.sessionId,
state: "delta",
content: extractText(event.message),
});
}
break;
}
case "message_end": {
if (event.message.role === "assistant" && currentStreamId) {
this.streamCallback({
streamId: currentStreamId,
agentId: this.sessionId,
state: "final",
content: extractText(event.message),
});
currentStreamId = null;
}
break;
}
}
});
}
}

View file

@ -1,5 +1,6 @@
import type { AgentEvent, AgentMessage } from "@mariozechner/pi-agent-core";
import { colors, createSpinner } from "./colors.js";
import { extractText } from "../extract-text.js";
export type AgentOutputState = {
lastAssistantText: string;
@ -12,16 +13,6 @@ export type AgentOutput = {
handleEvent: (event: AgentEvent) => void;
};
function extractText(message: AgentMessage | undefined): string {
if (!message || typeof message !== "object" || !("content" in message)) return "";
const content = (message as { content?: Array<{ type: string; text?: string }> }).content;
if (!Array.isArray(content)) return "";
return content
.filter((c) => c.type === "text")
.map((c) => c.text ?? "")
.join("");
}
function truncate(s: string, max: number): string {
return s.length > max ? s.slice(0, max) + "…" : s;
}

12
src/agent/extract-text.ts Normal file
View file

@ -0,0 +1,12 @@
import type { AgentMessage } from "@mariozechner/pi-agent-core";
/** Extract plain text content from an AgentMessage */
export function extractText(message: AgentMessage | undefined): string {
if (!message || typeof message !== "object" || !("content" in message)) return "";
const content = (message as { content?: Array<{ type: string; text?: string }> }).content;
if (!Array.isArray(content)) return "";
return content
.filter((c) => c.type === "text")
.map((c) => c.text ?? "")
.join("");
}

View file

@ -234,6 +234,11 @@ export class Agent {
});
}
/** Subscribe to agent events (returns unsubscribe function) */
subscribe(fn: (event: AgentEvent) => void): () => void {
return this.agent.subscribe(fn);
}
async run(prompt: string): Promise<AgentRunResult> {
this.output.state.lastAssistantText = "";
await this.agent.prompt(prompt);

View file

@ -3,6 +3,7 @@ import {
type ConnectionState,
RequestAction,
ResponseAction,
StreamAction,
type RequestPayload,
type ResponseSuccessPayload,
type ResponseErrorPayload,
@ -143,7 +144,15 @@ export class Hub {
addAgentRecord({ id: agent.sessionId, createdAt: Date.now() });
}
// Internally consume messages produced by agent
// Forward streaming events to the requesting client
agent.onStream((payload) => {
const targetDeviceId = this.agentSenders.get(agent.sessionId);
if (targetDeviceId) {
this.client.send(targetDeviceId, StreamAction, payload);
}
});
// Internally consume messages produced by agent (fallback for non-stream scenarios)
void this.consumeAgent(agent);
console.log(`Agent created: ${agent.sessionId}`);