From 273e49f678ae4fa725e5dedac05d694ba9b02e17 Mon Sep 17 00:00:00 2001 From: yushen Date: Tue, 3 Feb 2026 14:22:49 +0800 Subject: [PATCH 1/2] feat(agent): stream raw AgentEvent from engine to Hub via Gateway Expose PiAgentCore events through the full backend pipeline: - Agent.subscribe() transparently forwards engine events - AsyncAgent pushes all AgentEvent into Channel alongside error Messages - Hub discriminates ChannelItem and forwards events via StreamAction Co-Authored-By: Claude Opus 4.5 --- src/agent/async-agent.ts | 19 +++++++++++++------ src/agent/runner.ts | 5 +++++ src/hub/hub.ts | 19 +++++++++++++++---- 3 files changed, 33 insertions(+), 10 deletions(-) diff --git a/src/agent/async-agent.ts b/src/agent/async-agent.ts index 06f753bb..97b47378 100644 --- a/src/agent/async-agent.ts +++ b/src/agent/async-agent.ts @@ -1,13 +1,17 @@ import { v7 as uuidv7 } from "uuid"; +import type { AgentEvent } from "@mariozechner/pi-agent-core"; import { Agent } from "./runner.js"; import { Channel } from "./channel.js"; import type { AgentOptions, Message } from "./types.js"; const devNull = { write: () => true } as NodeJS.WritableStream; +/** Discriminated union of legacy Message (error fallback) and raw AgentEvent */ +export type ChannelItem = Message | AgentEvent; + export class AsyncAgent { private readonly agent: Agent; - private readonly channel = new Channel(); + private readonly channel = new Channel(); private _closed = false; private queue: Promise = Promise.resolve(); readonly sessionId: string; @@ -18,6 +22,11 @@ export class AsyncAgent { logger: { stdout: devNull, stderr: devNull }, }); this.sessionId = this.agent.sessionId; + + // Forward raw AgentEvent into the channel + this.agent.subscribe((event: AgentEvent) => { + this.channel.send(event); + }); } get closed(): boolean { @@ -32,9 +41,7 @@ export class AsyncAgent { .then(async () => { if (this._closed) return; const result = await this.agent.run(content); - if (result.text) { - this.channel.send({ id: uuidv7(), content: result.text }); - } + // Normal text is delivered via message_end event; only handle errors here if (result.error) { this.channel.send({ id: uuidv7(), content: `[error] ${result.error}` }); } @@ -45,8 +52,8 @@ export class AsyncAgent { }); } - /** Continuously read message stream */ - read(): AsyncIterable { + /** Continuously read channel stream (AgentEvent + error Messages) */ + read(): AsyncIterable { return this.channel; } diff --git a/src/agent/runner.ts b/src/agent/runner.ts index bf73268f..dfccb9bc 100644 --- a/src/agent/runner.ts +++ b/src/agent/runner.ts @@ -234,6 +234,11 @@ export class Agent { }); } + /** Subscribe to raw AgentEvent from the underlying engine */ + subscribe(fn: (event: AgentEvent) => void): () => void { + return this.agent.subscribe(fn); + } + async run(prompt: string): Promise { this.output.state.lastAssistantText = ""; await this.agent.prompt(prompt); diff --git a/src/hub/hub.ts b/src/hub/hub.ts index 984082a2..bee58b66 100644 --- a/src/hub/hub.ts +++ b/src/hub/hub.ts @@ -3,9 +3,11 @@ import { type ConnectionState, RequestAction, ResponseAction, + StreamAction, type RequestPayload, type ResponseSuccessPayload, type ResponseErrorPayload, + type StreamPayload, } from "@multica/sdk"; import { AsyncAgent } from "../agent/async-agent.js"; import { getHubId } from "./hub-identity.js"; @@ -152,13 +154,22 @@ export class Hub { /** Internally read agent output and send via Gateway */ private async consumeAgent(agent: AsyncAgent): Promise { - for await (const msg of agent.read()) { - console.log(`[${agent.sessionId}] ${msg.content}`); + for await (const item of agent.read()) { const targetDeviceId = this.agentSenders.get(agent.sessionId); - if (targetDeviceId) { + if (!targetDeviceId) continue; + + if ("content" in item) { + // Legacy Message (error fallback) + console.log(`[${agent.sessionId}] ${item.content}`); this.client.send(targetDeviceId, "message", { agentId: agent.sessionId, - content: msg.content, + content: item.content, + }); + } else { + // Raw AgentEvent — forward via StreamAction + this.client.send(targetDeviceId, StreamAction, { + streamId: agent.sessionId, + data: item, }); } } From 203d7600348b8f4a4283119f1e8ad6548212c3d2 Mon Sep 17 00:00:00 2001 From: yushen Date: Tue, 3 Feb 2026 15:08:46 +0800 Subject: [PATCH 2/2] feat(hub): filter forwarded events to 5 types useful for frontend Only forward message_start/update/end (assistant role) and tool_execution_start/end to clients. Drop agent_start, agent_end, turn_start, turn_end, and tool_execution_update at the Hub layer. Co-Authored-By: Claude Opus 4.5 --- src/hub/hub.ts | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/src/hub/hub.ts b/src/hub/hub.ts index 3d5deb71..10236ff3 100644 --- a/src/hub/hub.ts +++ b/src/hub/hub.ts @@ -196,19 +196,25 @@ export class Hub { content: item.content, }); } else { + // Filter: only forward events useful for frontend rendering const maybeMessage = (item as { message?: { role?: string } }).message; const isAssistantMessage = maybeMessage?.role === "assistant"; - if (item.type === "message_start" && isAssistantMessage) { + const shouldForward = + ((item.type === "message_start" || item.type === "message_update" || item.type === "message_end") && isAssistantMessage) + || item.type === "tool_execution_start" + || item.type === "tool_execution_end"; + if (!shouldForward) continue; + + if (item.type === "message_start") { this.beginStream(agent.sessionId, item); } const streamId = this.getActiveStreamId(agent.sessionId, item); - // Raw AgentEvent — forward via StreamAction this.client.send(targetDeviceId, StreamAction, { streamId, agentId: agent.sessionId, event: item, }); - if (item.type === "message_end" && isAssistantMessage) { + if (item.type === "message_end") { this.endStream(agent.sessionId); } }