feat(agent): stream raw AgentEvent from engine to Hub via Gateway

Expose PiAgentCore events through the full backend pipeline:
- Agent.subscribe() transparently forwards engine events
- AsyncAgent pushes all AgentEvent into Channel alongside error Messages
- Hub discriminates ChannelItem and forwards events via StreamAction

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
yushen 2026-02-03 14:22:49 +08:00
parent 6b0340480b
commit 273e49f678
3 changed files with 33 additions and 10 deletions

View file

@ -1,13 +1,17 @@
import { v7 as uuidv7 } from "uuid";
import type { AgentEvent } from "@mariozechner/pi-agent-core";
import { Agent } from "./runner.js";
import { Channel } from "./channel.js";
import type { AgentOptions, Message } from "./types.js";
const devNull = { write: () => true } as NodeJS.WritableStream;
/** Discriminated union of legacy Message (error fallback) and raw AgentEvent */
export type ChannelItem = Message | AgentEvent;
export class AsyncAgent {
private readonly agent: Agent;
private readonly channel = new Channel<Message>();
private readonly channel = new Channel<ChannelItem>();
private _closed = false;
private queue: Promise<void> = Promise.resolve();
readonly sessionId: string;
@ -18,6 +22,11 @@ export class AsyncAgent {
logger: { stdout: devNull, stderr: devNull },
});
this.sessionId = this.agent.sessionId;
// Forward raw AgentEvent into the channel
this.agent.subscribe((event: AgentEvent) => {
this.channel.send(event);
});
}
get closed(): boolean {
@ -32,9 +41,7 @@ export class AsyncAgent {
.then(async () => {
if (this._closed) return;
const result = await this.agent.run(content);
if (result.text) {
this.channel.send({ id: uuidv7(), content: result.text });
}
// Normal text is delivered via message_end event; only handle errors here
if (result.error) {
this.channel.send({ id: uuidv7(), content: `[error] ${result.error}` });
}
@ -45,8 +52,8 @@ export class AsyncAgent {
});
}
/** Continuously read message stream */
read(): AsyncIterable<Message> {
/** Continuously read channel stream (AgentEvent + error Messages) */
read(): AsyncIterable<ChannelItem> {
return this.channel;
}

View file

@ -234,6 +234,11 @@ export class Agent {
});
}
/** Subscribe to raw AgentEvent from the underlying engine */
subscribe(fn: (event: AgentEvent) => void): () => void {
return this.agent.subscribe(fn);
}
async run(prompt: string): Promise<AgentRunResult> {
this.output.state.lastAssistantText = "";
await this.agent.prompt(prompt);

View file

@ -3,9 +3,11 @@ import {
type ConnectionState,
RequestAction,
ResponseAction,
StreamAction,
type RequestPayload,
type ResponseSuccessPayload,
type ResponseErrorPayload,
type StreamPayload,
} from "@multica/sdk";
import { AsyncAgent } from "../agent/async-agent.js";
import { getHubId } from "./hub-identity.js";
@ -152,13 +154,22 @@ export class Hub {
/** Internally read agent output and send via Gateway */
private async consumeAgent(agent: AsyncAgent): Promise<void> {
for await (const msg of agent.read()) {
console.log(`[${agent.sessionId}] ${msg.content}`);
for await (const item of agent.read()) {
const targetDeviceId = this.agentSenders.get(agent.sessionId);
if (targetDeviceId) {
if (!targetDeviceId) continue;
if ("content" in item) {
// Legacy Message (error fallback)
console.log(`[${agent.sessionId}] ${item.content}`);
this.client.send(targetDeviceId, "message", {
agentId: agent.sessionId,
content: msg.content,
content: item.content,
});
} else {
// Raw AgentEvent — forward via StreamAction
this.client.send<StreamPayload>(targetDeviceId, StreamAction, {
streamId: agent.sessionId,
data: item,
});
}
}