diff --git a/src/agent/async-agent.ts b/src/agent/async-agent.ts new file mode 100644 index 00000000..06f753bb --- /dev/null +++ b/src/agent/async-agent.ts @@ -0,0 +1,59 @@ +import { v7 as uuidv7 } from "uuid"; +import { Agent } from "./runner.js"; +import { Channel } from "./channel.js"; +import type { AgentOptions, Message } from "./types.js"; + +const devNull = { write: () => true } as NodeJS.WritableStream; + +export class AsyncAgent { + private readonly agent: Agent; + private readonly channel = new Channel(); + private _closed = false; + private queue: Promise = Promise.resolve(); + readonly sessionId: string; + + constructor(options?: AgentOptions) { + this.agent = new Agent({ + ...options, + logger: { stdout: devNull, stderr: devNull }, + }); + this.sessionId = this.agent.sessionId; + } + + get closed(): boolean { + return this._closed; + } + + /** Write message to agent (non-blocking, serialized queue) */ + write(content: string): void { + if (this._closed) throw new Error("Agent is closed"); + + this.queue = this.queue + .then(async () => { + if (this._closed) return; + const result = await this.agent.run(content); + if (result.text) { + this.channel.send({ id: uuidv7(), content: result.text }); + } + if (result.error) { + this.channel.send({ id: uuidv7(), content: `[error] ${result.error}` }); + } + }) + .catch((err) => { + const message = err instanceof Error ? err.message : String(err); + this.channel.send({ id: uuidv7(), content: `[error] ${message}` }); + }); + } + + /** Continuously read message stream */ + read(): AsyncIterable { + return this.channel; + } + + /** Close agent, stop all reads */ + close(): void { + if (this._closed) return; + this._closed = true; + this.channel.close(); + } +} diff --git a/src/hub/channel.ts b/src/agent/channel.ts similarity index 100% rename from src/hub/channel.ts rename to src/agent/channel.ts diff --git a/src/agent/index.ts b/src/agent/index.ts index c3384160..16dac2f9 100644 --- a/src/agent/index.ts +++ b/src/agent/index.ts @@ -3,3 +3,6 @@ export * from "./types.js"; export * from "./profile/index.js"; export * from "./context-window/index.js"; export * from "./skills/index.js"; +export * from "./channel.js"; +export * from "./sync-agent.js"; +export * from "./async-agent.js"; diff --git a/src/agent/sync-agent.ts b/src/agent/sync-agent.ts new file mode 100644 index 00000000..d2f77ae2 --- /dev/null +++ b/src/agent/sync-agent.ts @@ -0,0 +1,16 @@ +import { Agent } from "./runner.js"; +import type { AgentOptions, AgentRunResult } from "./types.js"; + +export class SyncAgent { + private readonly agent: Agent; + readonly sessionId: string; + + constructor(options?: AgentOptions) { + this.agent = new Agent(options); + this.sessionId = this.agent.sessionId; + } + + async run(prompt: string): Promise { + return this.agent.run(prompt); + } +} diff --git a/src/agent/types.ts b/src/agent/types.ts index 60494a07..94ab9c45 100644 --- a/src/agent/types.ts +++ b/src/agent/types.ts @@ -55,3 +55,8 @@ export type AgentOptions = { /** Additional directories to search for skills */ extraSkillDirs?: string[] | undefined; }; + +export interface Message { + readonly id: string; + readonly content: string; +} diff --git a/src/hub/agent.ts b/src/hub/agent.ts deleted file mode 100644 index d25b83c5..00000000 --- a/src/hub/agent.ts +++ /dev/null @@ -1,87 +0,0 @@ -import { v7 as uuidv7 } from "uuid"; -import { Agent as CoreAgent } from "../agent/runner.js"; -import { Channel } from "./channel.js"; -import type { Message } from "./types.js"; - -/** - * Agent — uses pi-agent-core for real inference. - * write() triggers a model run, read() outputs streaming results. - */ -export class Agent { - readonly id: string; - private readonly channel = new Channel(); - private _closed = false; - private readonly agent: CoreAgent; - private queue: Promise = Promise.resolve(); - - constructor(id?: string) { - this.id = id ?? uuidv7(); - this.agent = new CoreAgent({ - logger: { - stdout: this.createChannelStream("[assistant] "), - stderr: this.createChannelStream("[tool] "), - }, - sessionId: this.id, - }); - } - - get closed(): boolean { - return this._closed; - } - - /** Write message to agent (non-blocking, serialized queue) */ - write(content: string): void { - if (this._closed) { - throw new Error("Agent is closed"); - } - - this.queue = this.queue - .then(async () => { - const result = await this.agent.run(content); - if (result.error) { - this.channel.send({ - id: uuidv7(), - content: `[error] ${result.error}`, - }); - } - }) - .catch((err) => { - const message = err instanceof Error ? err.message : String(err); - this.channel.send({ id: uuidv7(), content: `[error] ${message}` }); - }); - } - - /** Continuously read message stream */ - read(): AsyncIterable { - return this.channel; - } - - /** Close agent, stop all reads */ - close(): void { - if (this._closed) return; - this._closed = true; - this.channel.close(); - } - - private createChannelStream(prefix: string): NodeJS.WritableStream { - let buffer = ""; - return { - write: (chunk: any) => { - if (this._closed) return false; - const text = - typeof chunk === "string" - ? chunk - : chunk?.toString?.() ?? String(chunk); - if (!text) return true; - buffer += text; - const parts = buffer.split("\n"); - buffer = parts.pop() ?? ""; - for (const part of parts) { - if (part.length === 0) continue; - this.channel.send({ id: uuidv7(), content: `${prefix}${part}` }); - } - return true; - }, - } as NodeJS.WritableStream; - } -} diff --git a/src/hub/hub.ts b/src/hub/hub.ts index ae6c5cfe..27195583 100644 --- a/src/hub/hub.ts +++ b/src/hub/hub.ts @@ -1,11 +1,11 @@ import type { HubOptions } from "./types.js"; import type { ConnectionState } from "../shared/gateway-sdk/types.js"; -import { Agent } from "./agent.js"; +import { AsyncAgent } from "../agent/async-agent.js"; import { getDeviceId } from "./device.js"; import { GatewayClient } from "../shared/gateway-sdk/client.js"; export class Hub { - private readonly agents = new Map(); + private readonly agents = new Map(); private readonly agentSenders = new Map(); private client: GatewayClient; url: string; @@ -82,7 +82,7 @@ export class Hub { } /** Create new Agent, or rebuild with existing ID */ - createAgent(id?: string): Agent { + createAgent(id?: string): AsyncAgent { if (id) { const existing = this.agents.get(id); if (existing && !existing.closed) { @@ -90,31 +90,31 @@ export class Hub { } } - const agent = new Agent(id); - this.agents.set(agent.id, agent); + const agent = new AsyncAgent({ sessionId: id }); + this.agents.set(agent.sessionId, agent); // Internally consume messages produced by agent void this.consumeAgent(agent); - console.log(`Agent created: ${agent.id}`); + console.log(`Agent created: ${agent.sessionId}`); return agent; } /** Internally read agent output and send via Gateway */ - private async consumeAgent(agent: Agent): Promise { + private async consumeAgent(agent: AsyncAgent): Promise { for await (const msg of agent.read()) { - console.log(`[${agent.id}] ${msg.content}`); - const targetDeviceId = this.agentSenders.get(agent.id); + console.log(`[${agent.sessionId}] ${msg.content}`); + const targetDeviceId = this.agentSenders.get(agent.sessionId); if (targetDeviceId) { this.client.send(targetDeviceId, "message", { - agentId: agent.id, + agentId: agent.sessionId, content: msg.content, }); } } } - getAgent(id: string): Agent | undefined { + getAgent(id: string): AsyncAgent | undefined { return this.agents.get(id); } diff --git a/src/hub/index.ts b/src/hub/index.ts index 54adb722..4ea50bc6 100644 --- a/src/hub/index.ts +++ b/src/hub/index.ts @@ -1,5 +1,3 @@ -export { Channel } from "./channel.js"; -export { Agent } from "./agent.js"; export { Hub } from "./hub.js"; export { getDeviceId } from "./device.js"; -export type { Message, HubOptions } from "./types.js"; +export type { HubOptions } from "./types.js";