diff --git a/src/agent/async-agent.ts b/src/agent/async-agent.ts index d6443b8b..97b47378 100644 --- a/src/agent/async-agent.ts +++ b/src/agent/async-agent.ts @@ -1,18 +1,19 @@ import { v7 as uuidv7 } from "uuid"; +import type { AgentEvent } from "@mariozechner/pi-agent-core"; import { Agent } from "./runner.js"; import { Channel } from "./channel.js"; -import { extractText } from "./extract-text.js"; import type { AgentOptions, Message } from "./types.js"; -import type { StreamPayload } from "@multica/sdk"; const devNull = { write: () => true } as NodeJS.WritableStream; +/** Discriminated union of legacy Message (error fallback) and raw AgentEvent */ +export type ChannelItem = Message | AgentEvent; + export class AsyncAgent { private readonly agent: Agent; - private readonly channel = new Channel(); + private readonly channel = new Channel(); private _closed = false; private queue: Promise = Promise.resolve(); - private streamCallback?: (payload: StreamPayload) => void; readonly sessionId: string; constructor(options?: AgentOptions) { @@ -21,18 +22,17 @@ export class AsyncAgent { logger: { stdout: devNull, stderr: devNull }, }); this.sessionId = this.agent.sessionId; - this.setupStreamEvents(); + + // Forward raw AgentEvent into the channel + this.agent.subscribe((event: AgentEvent) => { + this.channel.send(event); + }); } get closed(): boolean { return this._closed; } - /** Register callback for streaming events */ - onStream(cb: (payload: StreamPayload) => void): void { - this.streamCallback = cb; - } - /** Write message to agent (non-blocking, serialized queue) */ write(content: string): void { if (this._closed) throw new Error("Agent is closed"); @@ -41,15 +41,9 @@ export class AsyncAgent { .then(async () => { if (this._closed) return; const result = await this.agent.run(content); - // Only send final message via channel if no stream callback - // (stream callback already sent the final content) - if (!this.streamCallback) { - if (result.text) { - this.channel.send({ id: uuidv7(), content: result.text }); - } - if (result.error) { - this.channel.send({ id: uuidv7(), content: `[error] ${result.error}` }); - } + // Normal text is delivered via message_end event; only handle errors here + if (result.error) { + this.channel.send({ id: uuidv7(), content: `[error] ${result.error}` }); } }) .catch((err) => { @@ -58,8 +52,8 @@ export class AsyncAgent { }); } - /** Continuously read message stream */ - read(): AsyncIterable { + /** Continuously read channel stream (AgentEvent + error Messages) */ + read(): AsyncIterable { return this.channel; } @@ -69,50 +63,4 @@ export class AsyncAgent { this._closed = true; this.channel.close(); } - - private setupStreamEvents(): void { - let currentStreamId: string | null = null; - - this.agent.subscribe((event) => { - if (!this.streamCallback) return; - - switch (event.type) { - case "message_start": { - if (event.message.role === "assistant") { - currentStreamId = uuidv7(); - this.streamCallback({ - streamId: currentStreamId, - agentId: this.sessionId, - state: "delta", - content: extractText(event.message), - }); - } - break; - } - case "message_update": { - if (event.message.role === "assistant" && currentStreamId) { - this.streamCallback({ - streamId: currentStreamId, - agentId: this.sessionId, - state: "delta", - content: extractText(event.message), - }); - } - break; - } - case "message_end": { - if (event.message.role === "assistant" && currentStreamId) { - this.streamCallback({ - streamId: currentStreamId, - agentId: this.sessionId, - state: "final", - content: extractText(event.message), - }); - currentStreamId = null; - } - break; - } - } - }); - } } diff --git a/src/agent/providers/registry.ts b/src/agent/providers/registry.ts index 50809def..cd406482 100644 --- a/src/agent/providers/registry.ts +++ b/src/agent/providers/registry.ts @@ -134,6 +134,7 @@ const PROVIDER_REGISTRY: Record = { */ export const PROVIDER_ALIAS: Record = { "claude-code": "anthropic", // Claude Code OAuth uses anthropic API + "openai-codex": "openai", // Codex OAuth uses OpenAI API }; // ============================================================ diff --git a/src/agent/runner.ts b/src/agent/runner.ts index 7ac62bc6..90f5416a 100644 --- a/src/agent/runner.ts +++ b/src/agent/runner.ts @@ -3,17 +3,11 @@ import { v7 as uuidv7 } from "uuid"; import type { AgentOptions, AgentRunResult } from "./types.js"; import { createAgentOutput } from "./cli/output.js"; import { resolveModel, resolveTools } from "./tools.js"; +import { resolveApiKey, resolveBaseUrl, resolveModelId } from "./providers/index.js"; import { SessionManager } from "./session/session-manager.js"; import { ProfileManager } from "./profile/index.js"; import { SkillManager } from "./skills/index.js"; import { credentialManager, getCredentialsPath } from "./credentials.js"; -import { - resolveApiKey, - resolveBaseUrl, - resolveModelId, - isOAuthProvider, - getLoginInstructions, -} from "./providers/index.js"; import { checkContextWindow, DEFAULT_CONTEXT_TOKENS, @@ -44,37 +38,10 @@ export class Agent { const resolvedModel = resolveModelId(resolvedProvider, options.model); const apiKey = resolveApiKey(resolvedProvider, options.apiKey); - // Validate credentials before proceeding - if (!apiKey) { - if (isOAuthProvider(resolvedProvider)) { - // OAuth provider without valid credentials - show login instructions - const instructions = getLoginInstructions(resolvedProvider); - throw new Error( - `Provider "${resolvedProvider}" requires authentication.\n\n` + - `${instructions}\n\n` + - `After logging in, run: multica --provider ${resolvedProvider}`, - ); - } - // API Key provider without key - show configuration instructions - throw new Error( - `Provider "${resolvedProvider}" requires an API key.\n\n` + - `Add your API key to: ${getCredentialsPath()}\n\n` + - `Example:\n` + - `{\n` + - ` "llm": {\n` + - ` "provider": "${resolvedProvider}",\n` + - ` "providers": {\n` + - ` "${resolvedProvider}": {\n` + - ` "apiKey": "your-api-key-here"\n` + - ` }\n` + - ` }\n` + - ` }\n` + - `}`, - ); - } - this.agent = new PiAgentCore( - { getApiKey: (_provider: string) => apiKey }, + apiKey + ? { getApiKey: (_provider: string) => apiKey } + : {}, ); // Load Agent Profile (if profileId is specified) @@ -162,7 +129,9 @@ export class Agent { const compactionMode = options.compactionMode ?? "tokens"; // 默认使用 token 模式 // 获取 API Key(用于 summary 模式) - const summaryApiKey = compactionMode === "summary" ? resolveApiKey(model.provider, options.apiKey) : undefined; + const summaryApiKey = compactionMode === "summary" + ? resolveApiKey(resolvedProvider, options.apiKey) + : undefined; // 创建 SessionManager(带 context window 配置) this.session = new SessionManager({ @@ -241,7 +210,7 @@ export class Agent { }); } - /** Subscribe to agent events (returns unsubscribe function) */ + /** Subscribe to raw AgentEvent from the underlying engine */ subscribe(fn: (event: AgentEvent) => void): () => void { return this.agent.subscribe(fn); } diff --git a/src/hub/hub.ts b/src/hub/hub.ts index 3d1eae3b..10236ff3 100644 --- a/src/hub/hub.ts +++ b/src/hub/hub.ts @@ -22,6 +22,8 @@ import { createUpdateGatewayHandler } from "./rpc/handlers/update-gateway.js"; export class Hub { private readonly agents = new Map(); private readonly agentSenders = new Map(); + private readonly agentStreamIds = new Map(); + private readonly agentStreamCounters = new Map(); private readonly rpc: RpcDispatcher; private client: GatewayClient; url: string; @@ -144,31 +146,77 @@ export class Hub { addAgentRecord({ id: agent.sessionId, createdAt: Date.now() }); } - // Forward streaming events to the requesting client - agent.onStream((payload) => { - const targetDeviceId = this.agentSenders.get(agent.sessionId); - if (targetDeviceId) { - this.client.send(targetDeviceId, StreamAction, payload); - } - }); - - // Internally consume messages produced by agent (fallback for non-stream scenarios) + // Internally consume agent output (AgentEvent stream + error Messages) void this.consumeAgent(agent); console.log(`Agent created: ${agent.sessionId}`); return agent; } + private getMessageIdFromEvent(event: unknown): string | undefined { + if (!event || typeof event !== "object") return undefined; + const maybeMsg = (event as { message?: unknown }).message; + if (!maybeMsg || typeof maybeMsg !== "object") return undefined; + const id = (maybeMsg as { id?: unknown }).id; + return typeof id === "string" && id.length > 0 ? id : undefined; + } + + private beginStream(agentId: string, event: unknown): string { + const explicitId = this.getMessageIdFromEvent(event); + if (explicitId) { + this.agentStreamIds.set(agentId, explicitId); + return explicitId; + } + const next = (this.agentStreamCounters.get(agentId) ?? 0) + 1; + this.agentStreamCounters.set(agentId, next); + const fallback = `${agentId}:${next}`; + this.agentStreamIds.set(agentId, fallback); + return fallback; + } + + private getActiveStreamId(agentId: string, event: unknown): string { + return this.agentStreamIds.get(agentId) ?? this.getMessageIdFromEvent(event) ?? agentId; + } + + private endStream(agentId: string): void { + this.agentStreamIds.delete(agentId); + } + /** Internally read agent output and send via Gateway */ private async consumeAgent(agent: AsyncAgent): Promise { - for await (const msg of agent.read()) { - console.log(`[${agent.sessionId}] ${msg.content}`); + for await (const item of agent.read()) { const targetDeviceId = this.agentSenders.get(agent.sessionId); - if (targetDeviceId) { + if (!targetDeviceId) continue; + + if ("content" in item) { + // Legacy Message (error fallback) + console.log(`[${agent.sessionId}] ${item.content}`); this.client.send(targetDeviceId, "message", { agentId: agent.sessionId, - content: msg.content, + content: item.content, }); + } else { + // Filter: only forward events useful for frontend rendering + const maybeMessage = (item as { message?: { role?: string } }).message; + const isAssistantMessage = maybeMessage?.role === "assistant"; + const shouldForward = + ((item.type === "message_start" || item.type === "message_update" || item.type === "message_end") && isAssistantMessage) + || item.type === "tool_execution_start" + || item.type === "tool_execution_end"; + if (!shouldForward) continue; + + if (item.type === "message_start") { + this.beginStream(agent.sessionId, item); + } + const streamId = this.getActiveStreamId(agent.sessionId, item); + this.client.send(targetDeviceId, StreamAction, { + streamId, + agentId: agent.sessionId, + event: item, + }); + if (item.type === "message_end") { + this.endStream(agent.sessionId); + } } } } @@ -211,6 +259,8 @@ export class Hub { agent.close(); this.agents.delete(id); this.agentSenders.delete(id); + this.agentStreamIds.delete(id); + this.agentStreamCounters.delete(id); removeAgentRecord(id); return true; } @@ -219,6 +269,9 @@ export class Hub { for (const [id, agent] of this.agents) { agent.close(); this.agents.delete(id); + this.agentSenders.delete(id); + this.agentStreamIds.delete(id); + this.agentStreamCounters.delete(id); } this.client.disconnect(); console.log("Hub shut down");