From 273e49f678ae4fa725e5dedac05d694ba9b02e17 Mon Sep 17 00:00:00 2001 From: yushen Date: Tue, 3 Feb 2026 14:22:49 +0800 Subject: [PATCH] feat(agent): stream raw AgentEvent from engine to Hub via Gateway Expose PiAgentCore events through the full backend pipeline: - Agent.subscribe() transparently forwards engine events - AsyncAgent pushes all AgentEvent into Channel alongside error Messages - Hub discriminates ChannelItem and forwards events via StreamAction Co-Authored-By: Claude Opus 4.5 --- src/agent/async-agent.ts | 19 +++++++++++++------ src/agent/runner.ts | 5 +++++ src/hub/hub.ts | 19 +++++++++++++++---- 3 files changed, 33 insertions(+), 10 deletions(-) diff --git a/src/agent/async-agent.ts b/src/agent/async-agent.ts index 06f753bb..97b47378 100644 --- a/src/agent/async-agent.ts +++ b/src/agent/async-agent.ts @@ -1,13 +1,17 @@ import { v7 as uuidv7 } from "uuid"; +import type { AgentEvent } from "@mariozechner/pi-agent-core"; import { Agent } from "./runner.js"; import { Channel } from "./channel.js"; import type { AgentOptions, Message } from "./types.js"; const devNull = { write: () => true } as NodeJS.WritableStream; +/** Discriminated union of legacy Message (error fallback) and raw AgentEvent */ +export type ChannelItem = Message | AgentEvent; + export class AsyncAgent { private readonly agent: Agent; - private readonly channel = new Channel(); + private readonly channel = new Channel(); private _closed = false; private queue: Promise = Promise.resolve(); readonly sessionId: string; @@ -18,6 +22,11 @@ export class AsyncAgent { logger: { stdout: devNull, stderr: devNull }, }); this.sessionId = this.agent.sessionId; + + // Forward raw AgentEvent into the channel + this.agent.subscribe((event: AgentEvent) => { + this.channel.send(event); + }); } get closed(): boolean { @@ -32,9 +41,7 @@ export class AsyncAgent { .then(async () => { if (this._closed) return; const result = await this.agent.run(content); - if (result.text) { - this.channel.send({ id: uuidv7(), content: result.text }); - } + // Normal text is delivered via message_end event; only handle errors here if (result.error) { this.channel.send({ id: uuidv7(), content: `[error] ${result.error}` }); } @@ -45,8 +52,8 @@ export class AsyncAgent { }); } - /** Continuously read message stream */ - read(): AsyncIterable { + /** Continuously read channel stream (AgentEvent + error Messages) */ + read(): AsyncIterable { return this.channel; } diff --git a/src/agent/runner.ts b/src/agent/runner.ts index bf73268f..dfccb9bc 100644 --- a/src/agent/runner.ts +++ b/src/agent/runner.ts @@ -234,6 +234,11 @@ export class Agent { }); } + /** Subscribe to raw AgentEvent from the underlying engine */ + subscribe(fn: (event: AgentEvent) => void): () => void { + return this.agent.subscribe(fn); + } + async run(prompt: string): Promise { this.output.state.lastAssistantText = ""; await this.agent.prompt(prompt); diff --git a/src/hub/hub.ts b/src/hub/hub.ts index 984082a2..bee58b66 100644 --- a/src/hub/hub.ts +++ b/src/hub/hub.ts @@ -3,9 +3,11 @@ import { type ConnectionState, RequestAction, ResponseAction, + StreamAction, type RequestPayload, type ResponseSuccessPayload, type ResponseErrorPayload, + type StreamPayload, } from "@multica/sdk"; import { AsyncAgent } from "../agent/async-agent.js"; import { getHubId } from "./hub-identity.js"; @@ -152,13 +154,22 @@ export class Hub { /** Internally read agent output and send via Gateway */ private async consumeAgent(agent: AsyncAgent): Promise { - for await (const msg of agent.read()) { - console.log(`[${agent.sessionId}] ${msg.content}`); + for await (const item of agent.read()) { const targetDeviceId = this.agentSenders.get(agent.sessionId); - if (targetDeviceId) { + if (!targetDeviceId) continue; + + if ("content" in item) { + // Legacy Message (error fallback) + console.log(`[${agent.sessionId}] ${item.content}`); this.client.send(targetDeviceId, "message", { agentId: agent.sessionId, - content: msg.content, + content: item.content, + }); + } else { + // Raw AgentEvent — forward via StreamAction + this.client.send(targetDeviceId, StreamAction, { + streamId: agent.sessionId, + data: item, }); } }