Merge pull request #26 from multica-ai/view-recent-commits

feat(agent): add SyncAgent/AsyncAgent wrappers and integrate with Hub
This commit is contained in:
LinYushen 2026-01-30 12:51:08 +08:00 committed by GitHub
commit 6f8c9ef383
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 95 additions and 101 deletions

59
src/agent/async-agent.ts Normal file
View file

@ -0,0 +1,59 @@
import { v7 as uuidv7 } from "uuid";
import { Agent } from "./runner.js";
import { Channel } from "./channel.js";
import type { AgentOptions, Message } from "./types.js";
const devNull = { write: () => true } as NodeJS.WritableStream;
export class AsyncAgent {
private readonly agent: Agent;
private readonly channel = new Channel<Message>();
private _closed = false;
private queue: Promise<void> = Promise.resolve();
readonly sessionId: string;
constructor(options?: AgentOptions) {
this.agent = new Agent({
...options,
logger: { stdout: devNull, stderr: devNull },
});
this.sessionId = this.agent.sessionId;
}
get closed(): boolean {
return this._closed;
}
/** Write message to agent (non-blocking, serialized queue) */
write(content: string): void {
if (this._closed) throw new Error("Agent is closed");
this.queue = this.queue
.then(async () => {
if (this._closed) return;
const result = await this.agent.run(content);
if (result.text) {
this.channel.send({ id: uuidv7(), content: result.text });
}
if (result.error) {
this.channel.send({ id: uuidv7(), content: `[error] ${result.error}` });
}
})
.catch((err) => {
const message = err instanceof Error ? err.message : String(err);
this.channel.send({ id: uuidv7(), content: `[error] ${message}` });
});
}
/** Continuously read message stream */
read(): AsyncIterable<Message> {
return this.channel;
}
/** Close agent, stop all reads */
close(): void {
if (this._closed) return;
this._closed = true;
this.channel.close();
}
}

View file

@ -3,3 +3,6 @@ export * from "./types.js";
export * from "./profile/index.js";
export * from "./context-window/index.js";
export * from "./skills/index.js";
export * from "./channel.js";
export * from "./sync-agent.js";
export * from "./async-agent.js";

16
src/agent/sync-agent.ts Normal file
View file

@ -0,0 +1,16 @@
import { Agent } from "./runner.js";
import type { AgentOptions, AgentRunResult } from "./types.js";
export class SyncAgent {
private readonly agent: Agent;
readonly sessionId: string;
constructor(options?: AgentOptions) {
this.agent = new Agent(options);
this.sessionId = this.agent.sessionId;
}
async run(prompt: string): Promise<AgentRunResult> {
return this.agent.run(prompt);
}
}

View file

@ -55,3 +55,8 @@ export type AgentOptions = {
/** Additional directories to search for skills */
extraSkillDirs?: string[] | undefined;
};
export interface Message {
readonly id: string;
readonly content: string;
}

View file

@ -1,87 +0,0 @@
import { v7 as uuidv7 } from "uuid";
import { Agent as CoreAgent } from "../agent/runner.js";
import { Channel } from "./channel.js";
import type { Message } from "./types.js";
/**
* Agent uses pi-agent-core for real inference.
* write() triggers a model run, read() outputs streaming results.
*/
export class Agent {
readonly id: string;
private readonly channel = new Channel<Message>();
private _closed = false;
private readonly agent: CoreAgent;
private queue: Promise<void> = Promise.resolve();
constructor(id?: string) {
this.id = id ?? uuidv7();
this.agent = new CoreAgent({
logger: {
stdout: this.createChannelStream("[assistant] "),
stderr: this.createChannelStream("[tool] "),
},
sessionId: this.id,
});
}
get closed(): boolean {
return this._closed;
}
/** Write message to agent (non-blocking, serialized queue) */
write(content: string): void {
if (this._closed) {
throw new Error("Agent is closed");
}
this.queue = this.queue
.then(async () => {
const result = await this.agent.run(content);
if (result.error) {
this.channel.send({
id: uuidv7(),
content: `[error] ${result.error}`,
});
}
})
.catch((err) => {
const message = err instanceof Error ? err.message : String(err);
this.channel.send({ id: uuidv7(), content: `[error] ${message}` });
});
}
/** Continuously read message stream */
read(): AsyncIterable<Message> {
return this.channel;
}
/** Close agent, stop all reads */
close(): void {
if (this._closed) return;
this._closed = true;
this.channel.close();
}
private createChannelStream(prefix: string): NodeJS.WritableStream {
let buffer = "";
return {
write: (chunk: any) => {
if (this._closed) return false;
const text =
typeof chunk === "string"
? chunk
: chunk?.toString?.() ?? String(chunk);
if (!text) return true;
buffer += text;
const parts = buffer.split("\n");
buffer = parts.pop() ?? "";
for (const part of parts) {
if (part.length === 0) continue;
this.channel.send({ id: uuidv7(), content: `${prefix}${part}` });
}
return true;
},
} as NodeJS.WritableStream;
}
}

View file

@ -1,11 +1,11 @@
import type { HubOptions } from "./types.js";
import type { ConnectionState } from "../shared/gateway-sdk/types.js";
import { Agent } from "./agent.js";
import { AsyncAgent } from "../agent/async-agent.js";
import { getDeviceId } from "./device.js";
import { GatewayClient } from "../shared/gateway-sdk/client.js";
export class Hub {
private readonly agents = new Map<string, Agent>();
private readonly agents = new Map<string, AsyncAgent>();
private readonly agentSenders = new Map<string, string>();
private client: GatewayClient;
url: string;
@ -82,7 +82,7 @@ export class Hub {
}
/** Create new Agent, or rebuild with existing ID */
createAgent(id?: string): Agent {
createAgent(id?: string): AsyncAgent {
if (id) {
const existing = this.agents.get(id);
if (existing && !existing.closed) {
@ -90,31 +90,31 @@ export class Hub {
}
}
const agent = new Agent(id);
this.agents.set(agent.id, agent);
const agent = new AsyncAgent({ sessionId: id });
this.agents.set(agent.sessionId, agent);
// Internally consume messages produced by agent
void this.consumeAgent(agent);
console.log(`Agent created: ${agent.id}`);
console.log(`Agent created: ${agent.sessionId}`);
return agent;
}
/** Internally read agent output and send via Gateway */
private async consumeAgent(agent: Agent): Promise<void> {
private async consumeAgent(agent: AsyncAgent): Promise<void> {
for await (const msg of agent.read()) {
console.log(`[${agent.id}] ${msg.content}`);
const targetDeviceId = this.agentSenders.get(agent.id);
console.log(`[${agent.sessionId}] ${msg.content}`);
const targetDeviceId = this.agentSenders.get(agent.sessionId);
if (targetDeviceId) {
this.client.send(targetDeviceId, "message", {
agentId: agent.id,
agentId: agent.sessionId,
content: msg.content,
});
}
}
}
getAgent(id: string): Agent | undefined {
getAgent(id: string): AsyncAgent | undefined {
return this.agents.get(id);
}

View file

@ -1,5 +1,3 @@
export { Channel } from "./channel.js";
export { Agent } from "./agent.js";
export { Hub } from "./hub.js";
export { getDeviceId } from "./device.js";
export type { Message, HubOptions } from "./types.js";
export type { HubOptions } from "./types.js";