From 6e82219630c07e3707fd43b815e1877c00512a1c Mon Sep 17 00:00:00 2001 From: yushen Date: Fri, 30 Jan 2026 11:37:09 +0800 Subject: [PATCH 1/4] feat(agent): add SyncAgent and AsyncAgent wrapper classes Introduce two wrapper classes around the core Agent: - SyncAgent: exposes run() for synchronous request-response usage - AsyncAgent: exposes write()/read()/close() for non-blocking streaming via Channel Co-Authored-By: Claude Opus 4.5 --- src/agent/async-agent.ts | 81 ++++++++++++++++++++++++++++++++++++++++ src/agent/channel.ts | 64 +++++++++++++++++++++++++++++++ src/agent/index.ts | 3 ++ src/agent/sync-agent.ts | 16 ++++++++ src/agent/types.ts | 5 +++ 5 files changed, 169 insertions(+) create mode 100644 src/agent/async-agent.ts create mode 100644 src/agent/channel.ts create mode 100644 src/agent/sync-agent.ts diff --git a/src/agent/async-agent.ts b/src/agent/async-agent.ts new file mode 100644 index 00000000..f979fb8d --- /dev/null +++ b/src/agent/async-agent.ts @@ -0,0 +1,81 @@ +import { v7 as uuidv7 } from "uuid"; +import { Agent } from "./runner.js"; +import { Channel } from "./channel.js"; +import type { AgentOptions, Message } from "./types.js"; + +export class AsyncAgent { + private readonly agent: Agent; + private readonly channel = new Channel(); + private _closed = false; + private queue: Promise = Promise.resolve(); + readonly sessionId: string; + + constructor(options?: AgentOptions) { + this.agent = new Agent({ + ...options, + logger: { + stdout: this.createChannelStream("[assistant] "), + stderr: this.createChannelStream("[tool] "), + }, + }); + this.sessionId = this.agent.sessionId; + } + + get closed(): boolean { + return this._closed; + } + + /** Write message to agent (non-blocking, serialized queue) */ + write(content: string): void { + if (this._closed) throw new Error("Agent is closed"); + + this.queue = this.queue + .then(async () => { + const result = await this.agent.run(content); + if (result.error) { + this.channel.send({ + id: uuidv7(), + content: `[error] ${result.error}`, + }); + } + }) + .catch((err) => { + const message = err instanceof Error ? err.message : String(err); + this.channel.send({ id: uuidv7(), content: `[error] ${message}` }); + }); + } + + /** Continuously read message stream */ + read(): AsyncIterable { + return this.channel; + } + + /** Close agent, stop all reads */ + close(): void { + if (this._closed) return; + this._closed = true; + this.channel.close(); + } + + private createChannelStream(prefix: string): NodeJS.WritableStream { + let buffer = ""; + return { + write: (chunk: any) => { + if (this._closed) return false; + const text = + typeof chunk === "string" + ? chunk + : chunk?.toString?.() ?? String(chunk); + if (!text) return true; + buffer += text; + const parts = buffer.split("\n"); + buffer = parts.pop() ?? ""; + for (const part of parts) { + if (part.length === 0) continue; + this.channel.send({ id: uuidv7(), content: `${prefix}${part}` }); + } + return true; + }, + } as NodeJS.WritableStream; + } +} diff --git a/src/agent/channel.ts b/src/agent/channel.ts new file mode 100644 index 00000000..5073c3cd --- /dev/null +++ b/src/agent/channel.ts @@ -0,0 +1,64 @@ +/** + * Go channel style async iterable queue. + * Supports multiple writers, single reader, iteration ends after close. + */ +export class Channel implements AsyncIterable { + private buffer: T[] = []; + private closed = false; + + private readers: Array<{ + resolve: (result: IteratorResult) => void; + }> = []; + + get isClosed(): boolean { + return this.closed; + } + + get size(): number { + return this.buffer.length; + } + + /** Send value to channel. Returns false when channel is closed. */ + send(value: T): boolean { + if (this.closed) return false; + + const reader = this.readers.shift(); + if (reader) { + reader.resolve({ value, done: false }); + return true; + } + + this.buffer.push(value); + return true; + } + + /** Close channel, wake up all waiting readers. */ + close(): void { + if (this.closed) return; + this.closed = true; + + for (const reader of this.readers) { + reader.resolve({ value: undefined as T, done: true }); + } + this.readers = []; + } + + [Symbol.asyncIterator](): AsyncIterator { + return { + next: (): Promise> => { + if (this.buffer.length > 0) { + const value = this.buffer.shift()!; + return Promise.resolve({ value, done: false }); + } + + if (this.closed) { + return Promise.resolve({ value: undefined as T, done: true }); + } + + return new Promise>((resolve) => { + this.readers.push({ resolve }); + }); + }, + }; + } +} diff --git a/src/agent/index.ts b/src/agent/index.ts index c3384160..16dac2f9 100644 --- a/src/agent/index.ts +++ b/src/agent/index.ts @@ -3,3 +3,6 @@ export * from "./types.js"; export * from "./profile/index.js"; export * from "./context-window/index.js"; export * from "./skills/index.js"; +export * from "./channel.js"; +export * from "./sync-agent.js"; +export * from "./async-agent.js"; diff --git a/src/agent/sync-agent.ts b/src/agent/sync-agent.ts new file mode 100644 index 00000000..d2f77ae2 --- /dev/null +++ b/src/agent/sync-agent.ts @@ -0,0 +1,16 @@ +import { Agent } from "./runner.js"; +import type { AgentOptions, AgentRunResult } from "./types.js"; + +export class SyncAgent { + private readonly agent: Agent; + readonly sessionId: string; + + constructor(options?: AgentOptions) { + this.agent = new Agent(options); + this.sessionId = this.agent.sessionId; + } + + async run(prompt: string): Promise { + return this.agent.run(prompt); + } +} diff --git a/src/agent/types.ts b/src/agent/types.ts index 60494a07..94ab9c45 100644 --- a/src/agent/types.ts +++ b/src/agent/types.ts @@ -55,3 +55,8 @@ export type AgentOptions = { /** Additional directories to search for skills */ extraSkillDirs?: string[] | undefined; }; + +export interface Message { + readonly id: string; + readonly content: string; +} From da80ba1cb0ebf560ddec83f93fabb0e6fd7a658e Mon Sep 17 00:00:00 2001 From: yushen Date: Fri, 30 Jan 2026 11:52:01 +0800 Subject: [PATCH 2/4] refactor(agent): simplify AsyncAgent to use result.text instead of stream interception Replace stdout/stderr stream interception with direct result.text push to Channel. Also fix queued tasks still executing after close(). Co-Authored-By: Claude Opus 4.5 --- src/agent/async-agent.ts | 38 ++++++++------------------------------ 1 file changed, 8 insertions(+), 30 deletions(-) diff --git a/src/agent/async-agent.ts b/src/agent/async-agent.ts index f979fb8d..06f753bb 100644 --- a/src/agent/async-agent.ts +++ b/src/agent/async-agent.ts @@ -3,6 +3,8 @@ import { Agent } from "./runner.js"; import { Channel } from "./channel.js"; import type { AgentOptions, Message } from "./types.js"; +const devNull = { write: () => true } as NodeJS.WritableStream; + export class AsyncAgent { private readonly agent: Agent; private readonly channel = new Channel(); @@ -13,10 +15,7 @@ export class AsyncAgent { constructor(options?: AgentOptions) { this.agent = new Agent({ ...options, - logger: { - stdout: this.createChannelStream("[assistant] "), - stderr: this.createChannelStream("[tool] "), - }, + logger: { stdout: devNull, stderr: devNull }, }); this.sessionId = this.agent.sessionId; } @@ -31,12 +30,13 @@ export class AsyncAgent { this.queue = this.queue .then(async () => { + if (this._closed) return; const result = await this.agent.run(content); + if (result.text) { + this.channel.send({ id: uuidv7(), content: result.text }); + } if (result.error) { - this.channel.send({ - id: uuidv7(), - content: `[error] ${result.error}`, - }); + this.channel.send({ id: uuidv7(), content: `[error] ${result.error}` }); } }) .catch((err) => { @@ -56,26 +56,4 @@ export class AsyncAgent { this._closed = true; this.channel.close(); } - - private createChannelStream(prefix: string): NodeJS.WritableStream { - let buffer = ""; - return { - write: (chunk: any) => { - if (this._closed) return false; - const text = - typeof chunk === "string" - ? chunk - : chunk?.toString?.() ?? String(chunk); - if (!text) return true; - buffer += text; - const parts = buffer.split("\n"); - buffer = parts.pop() ?? ""; - for (const part of parts) { - if (part.length === 0) continue; - this.channel.send({ id: uuidv7(), content: `${prefix}${part}` }); - } - return true; - }, - } as NodeJS.WritableStream; - } } From 49540e63e7b6ab113950fece6af6e2650e211677 Mon Sep 17 00:00:00 2001 From: yushen Date: Fri, 30 Jan 2026 11:58:21 +0800 Subject: [PATCH 3/4] refactor(hub): replace mock Agent with AsyncAgent Hub now uses AsyncAgent from src/agent/ instead of its own Agent implementation. Deleted hub/agent.ts and hub/channel.ts as they are no longer needed. Co-Authored-By: Claude Opus 4.5 --- src/hub/agent.ts | 87 ---------------------------------------------- src/hub/channel.ts | 64 ---------------------------------- src/hub/hub.ts | 22 ++++++------ src/hub/index.ts | 4 +-- 4 files changed, 12 insertions(+), 165 deletions(-) delete mode 100644 src/hub/agent.ts delete mode 100644 src/hub/channel.ts diff --git a/src/hub/agent.ts b/src/hub/agent.ts deleted file mode 100644 index d25b83c5..00000000 --- a/src/hub/agent.ts +++ /dev/null @@ -1,87 +0,0 @@ -import { v7 as uuidv7 } from "uuid"; -import { Agent as CoreAgent } from "../agent/runner.js"; -import { Channel } from "./channel.js"; -import type { Message } from "./types.js"; - -/** - * Agent — uses pi-agent-core for real inference. - * write() triggers a model run, read() outputs streaming results. - */ -export class Agent { - readonly id: string; - private readonly channel = new Channel(); - private _closed = false; - private readonly agent: CoreAgent; - private queue: Promise = Promise.resolve(); - - constructor(id?: string) { - this.id = id ?? uuidv7(); - this.agent = new CoreAgent({ - logger: { - stdout: this.createChannelStream("[assistant] "), - stderr: this.createChannelStream("[tool] "), - }, - sessionId: this.id, - }); - } - - get closed(): boolean { - return this._closed; - } - - /** Write message to agent (non-blocking, serialized queue) */ - write(content: string): void { - if (this._closed) { - throw new Error("Agent is closed"); - } - - this.queue = this.queue - .then(async () => { - const result = await this.agent.run(content); - if (result.error) { - this.channel.send({ - id: uuidv7(), - content: `[error] ${result.error}`, - }); - } - }) - .catch((err) => { - const message = err instanceof Error ? err.message : String(err); - this.channel.send({ id: uuidv7(), content: `[error] ${message}` }); - }); - } - - /** Continuously read message stream */ - read(): AsyncIterable { - return this.channel; - } - - /** Close agent, stop all reads */ - close(): void { - if (this._closed) return; - this._closed = true; - this.channel.close(); - } - - private createChannelStream(prefix: string): NodeJS.WritableStream { - let buffer = ""; - return { - write: (chunk: any) => { - if (this._closed) return false; - const text = - typeof chunk === "string" - ? chunk - : chunk?.toString?.() ?? String(chunk); - if (!text) return true; - buffer += text; - const parts = buffer.split("\n"); - buffer = parts.pop() ?? ""; - for (const part of parts) { - if (part.length === 0) continue; - this.channel.send({ id: uuidv7(), content: `${prefix}${part}` }); - } - return true; - }, - } as NodeJS.WritableStream; - } -} diff --git a/src/hub/channel.ts b/src/hub/channel.ts deleted file mode 100644 index 5073c3cd..00000000 --- a/src/hub/channel.ts +++ /dev/null @@ -1,64 +0,0 @@ -/** - * Go channel style async iterable queue. - * Supports multiple writers, single reader, iteration ends after close. - */ -export class Channel implements AsyncIterable { - private buffer: T[] = []; - private closed = false; - - private readers: Array<{ - resolve: (result: IteratorResult) => void; - }> = []; - - get isClosed(): boolean { - return this.closed; - } - - get size(): number { - return this.buffer.length; - } - - /** Send value to channel. Returns false when channel is closed. */ - send(value: T): boolean { - if (this.closed) return false; - - const reader = this.readers.shift(); - if (reader) { - reader.resolve({ value, done: false }); - return true; - } - - this.buffer.push(value); - return true; - } - - /** Close channel, wake up all waiting readers. */ - close(): void { - if (this.closed) return; - this.closed = true; - - for (const reader of this.readers) { - reader.resolve({ value: undefined as T, done: true }); - } - this.readers = []; - } - - [Symbol.asyncIterator](): AsyncIterator { - return { - next: (): Promise> => { - if (this.buffer.length > 0) { - const value = this.buffer.shift()!; - return Promise.resolve({ value, done: false }); - } - - if (this.closed) { - return Promise.resolve({ value: undefined as T, done: true }); - } - - return new Promise>((resolve) => { - this.readers.push({ resolve }); - }); - }, - }; - } -} diff --git a/src/hub/hub.ts b/src/hub/hub.ts index ae6c5cfe..27195583 100644 --- a/src/hub/hub.ts +++ b/src/hub/hub.ts @@ -1,11 +1,11 @@ import type { HubOptions } from "./types.js"; import type { ConnectionState } from "../shared/gateway-sdk/types.js"; -import { Agent } from "./agent.js"; +import { AsyncAgent } from "../agent/async-agent.js"; import { getDeviceId } from "./device.js"; import { GatewayClient } from "../shared/gateway-sdk/client.js"; export class Hub { - private readonly agents = new Map(); + private readonly agents = new Map(); private readonly agentSenders = new Map(); private client: GatewayClient; url: string; @@ -82,7 +82,7 @@ export class Hub { } /** Create new Agent, or rebuild with existing ID */ - createAgent(id?: string): Agent { + createAgent(id?: string): AsyncAgent { if (id) { const existing = this.agents.get(id); if (existing && !existing.closed) { @@ -90,31 +90,31 @@ export class Hub { } } - const agent = new Agent(id); - this.agents.set(agent.id, agent); + const agent = new AsyncAgent({ sessionId: id }); + this.agents.set(agent.sessionId, agent); // Internally consume messages produced by agent void this.consumeAgent(agent); - console.log(`Agent created: ${agent.id}`); + console.log(`Agent created: ${agent.sessionId}`); return agent; } /** Internally read agent output and send via Gateway */ - private async consumeAgent(agent: Agent): Promise { + private async consumeAgent(agent: AsyncAgent): Promise { for await (const msg of agent.read()) { - console.log(`[${agent.id}] ${msg.content}`); - const targetDeviceId = this.agentSenders.get(agent.id); + console.log(`[${agent.sessionId}] ${msg.content}`); + const targetDeviceId = this.agentSenders.get(agent.sessionId); if (targetDeviceId) { this.client.send(targetDeviceId, "message", { - agentId: agent.id, + agentId: agent.sessionId, content: msg.content, }); } } } - getAgent(id: string): Agent | undefined { + getAgent(id: string): AsyncAgent | undefined { return this.agents.get(id); } diff --git a/src/hub/index.ts b/src/hub/index.ts index 54adb722..4ea50bc6 100644 --- a/src/hub/index.ts +++ b/src/hub/index.ts @@ -1,5 +1,3 @@ -export { Channel } from "./channel.js"; -export { Agent } from "./agent.js"; export { Hub } from "./hub.js"; export { getDeviceId } from "./device.js"; -export type { Message, HubOptions } from "./types.js"; +export type { HubOptions } from "./types.js"; From e0615dbe69d970adc979e7b64f305e39bd75973d Mon Sep 17 00:00:00 2001 From: yushen Date: Fri, 30 Jan 2026 12:48:22 +0800 Subject: [PATCH 4/4] chore: add .env to gitignore Co-Authored-By: Claude Opus 4.5 --- .gitignore | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.gitignore b/.gitignore index 75352116..33fbca5b 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,5 @@ node_modules dist *.log .DS_Store +.env +.env.*