diff --git a/packages/core/src/client/actions/rpc.ts b/packages/core/src/client/actions/rpc.ts index 28d31474..761839fb 100644 --- a/packages/core/src/client/actions/rpc.ts +++ b/packages/core/src/client/actions/rpc.ts @@ -118,9 +118,10 @@ export interface CreateAgentResult { id: string; } -/** createConversation - request params (conversation-first alias of createAgent) */ +/** createConversation - request params (create a conversation, optionally under a specific agent) */ export interface CreateConversationParams { id?: string; + agentId?: string; } /** createConversation - response payload */ diff --git a/packages/core/src/hub/agent-store.test.ts b/packages/core/src/hub/agent-store.test.ts new file mode 100644 index 00000000..e6d8bc9f --- /dev/null +++ b/packages/core/src/hub/agent-store.test.ts @@ -0,0 +1,80 @@ +import { mkdtempSync, mkdirSync, readFileSync, rmSync, writeFileSync } from "node:fs"; +import { join } from "node:path"; +import { tmpdir } from "node:os"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; + +describe("hub agent store", () => { + let testDir: string; + let previousDataDir: string | undefined; + + beforeEach(() => { + testDir = mkdtempSync(join(tmpdir(), "multica-agent-store-")); + previousDataDir = process.env.SMC_DATA_DIR; + process.env.SMC_DATA_DIR = testDir; + vi.resetModules(); + }); + + afterEach(() => { + if (previousDataDir === undefined) { + delete process.env.SMC_DATA_DIR; + } else { + process.env.SMC_DATA_DIR = previousDataDir; + } + vi.resetModules(); + rmSync(testDir, { recursive: true, force: true }); + }); + + it("migrates legacy single-layer records into agent+conversation snapshot", async () => { + const agentsDir = join(testDir, "agents"); + mkdirSync(agentsDir, { recursive: true }); + writeFileSync( + join(agentsDir, "agents.json"), + JSON.stringify([ + { id: "legacy-a", createdAt: 123 }, + ], null, 2), + "utf-8", + ); + + const store = await import("./agent-store.js"); + const snapshot = store.loadHubStoreSnapshot(); + + expect(snapshot.version).toBe(2); + expect(snapshot.agents).toEqual([{ id: "legacy-a", createdAt: 123 }]); + expect(snapshot.conversations).toEqual([{ id: "legacy-a", agentId: "legacy-a", createdAt: 123 }]); + + const persisted = JSON.parse(readFileSync(join(agentsDir, "agents.json"), "utf-8")) as { + version: number; + }; + expect(persisted.version).toBe(2); + }); + + it("upserts conversations and auto-creates missing agents", async () => { + const store = await import("./agent-store.js"); + + store.upsertConversationRecord({ + id: "conv-1", + agentId: "agent-1", + createdAt: 100, + }); + + const snapshot = store.loadHubStoreSnapshot(); + expect(snapshot.agents).toEqual([{ id: "agent-1", createdAt: 100 }]); + expect(snapshot.conversations).toEqual([ + { id: "conv-1", agentId: "agent-1", createdAt: 100 }, + ]); + }); + + it("removes empty agent after last conversation is deleted", async () => { + const store = await import("./agent-store.js"); + + store.upsertConversationRecord({ id: "conv-1", agentId: "agent-1", createdAt: 100 }); + store.upsertConversationRecord({ id: "conv-2", agentId: "agent-1", createdAt: 101 }); + store.removeConversationRecordById("conv-1"); + expect(store.loadHubStoreSnapshot().agents).toEqual([{ id: "agent-1", createdAt: 100 }]); + + store.removeConversationRecordById("conv-2"); + const snapshot = store.loadHubStoreSnapshot(); + expect(snapshot.agents).toEqual([]); + expect(snapshot.conversations).toEqual([]); + }); +}); diff --git a/packages/core/src/hub/agent-store.ts b/packages/core/src/hub/agent-store.ts index 832879ae..2068eeea 100644 --- a/packages/core/src/hub/agent-store.ts +++ b/packages/core/src/hub/agent-store.ts @@ -5,6 +5,20 @@ import { DATA_DIR } from "@multica/utils"; export interface AgentRecord { id: string; createdAt: number; + profileId?: string; +} + +export interface ConversationRecord { + id: string; + agentId: string; + createdAt: number; + profileId?: string; +} + +export interface HubStoreSnapshot { + version: 2; + agents: AgentRecord[]; + conversations: ConversationRecord[]; } const AGENTS_DIR = join(DATA_DIR, "agents"); @@ -16,32 +30,240 @@ function ensureDir(): void { } } -export function loadAgentRecords(): AgentRecord[] { - if (!existsSync(AGENTS_FILE)) return []; +function defaultSnapshot(): HubStoreSnapshot { + return { + version: 2, + agents: [], + conversations: [], + }; +} + +function isRecordLike(value: unknown): value is Record { + return !!value && typeof value === "object"; +} + +function normalizeCreatedAt(raw: unknown): number { + if (typeof raw === "number" && Number.isFinite(raw)) { + return raw; + } + return Date.now(); +} + +function normalizeAgentRecords(input: unknown): AgentRecord[] { + if (!Array.isArray(input)) return []; + const dedup = new Map(); + for (const item of input) { + if (!isRecordLike(item) || typeof item.id !== "string" || !item.id.trim()) continue; + const id = item.id.trim(); + if (dedup.has(id)) continue; + dedup.set(id, { + id, + createdAt: normalizeCreatedAt(item.createdAt), + ...(typeof item.profileId === "string" && item.profileId.trim() ? { profileId: item.profileId.trim() } : {}), + }); + } + return Array.from(dedup.values()).sort((a, b) => a.createdAt - b.createdAt); +} + +function normalizeConversationRecords(input: unknown): ConversationRecord[] { + if (!Array.isArray(input)) return []; + const dedup = new Map(); + for (const item of input) { + if (!isRecordLike(item) || typeof item.id !== "string" || !item.id.trim()) continue; + if (typeof item.agentId !== "string" || !item.agentId.trim()) continue; + const id = item.id.trim(); + if (dedup.has(id)) continue; + dedup.set(id, { + id, + agentId: item.agentId.trim(), + createdAt: normalizeCreatedAt(item.createdAt), + ...(typeof item.profileId === "string" && item.profileId.trim() ? { profileId: item.profileId.trim() } : {}), + }); + } + return Array.from(dedup.values()).sort((a, b) => a.createdAt - b.createdAt); +} + +function normalizeSnapshot(raw: unknown): { snapshot: HubStoreSnapshot; migrated: boolean } { + // Legacy format: AgentRecord[] + if (Array.isArray(raw)) { + const legacyAgents = normalizeAgentRecords(raw); + const conversations: ConversationRecord[] = legacyAgents.map((record) => ({ + id: record.id, + agentId: record.id, + createdAt: record.createdAt, + ...(record.profileId ? { profileId: record.profileId } : {}), + })); + return { + snapshot: { + version: 2, + agents: legacyAgents, + conversations, + }, + migrated: true, + }; + } + + if (!isRecordLike(raw)) { + return { snapshot: defaultSnapshot(), migrated: false }; + } + + const agents = normalizeAgentRecords(raw.agents); + const conversations = normalizeConversationRecords(raw.conversations); + const agentMap = new Map(agents.map((agent) => [agent.id, agent])); + const normalizedConversations: ConversationRecord[] = []; + + for (const conversation of conversations) { + if (!agentMap.has(conversation.agentId)) { + agentMap.set(conversation.agentId, { + id: conversation.agentId, + createdAt: conversation.createdAt, + ...(conversation.profileId ? { profileId: conversation.profileId } : {}), + }); + } + normalizedConversations.push(conversation); + } + + // Ensure each agent has a main conversation for compatibility fallback. + for (const agent of agentMap.values()) { + const hasConversation = normalizedConversations.some((conversation) => conversation.agentId === agent.id); + if (hasConversation) continue; + normalizedConversations.push({ + id: agent.id, + agentId: agent.id, + createdAt: agent.createdAt, + ...(agent.profileId ? { profileId: agent.profileId } : {}), + }); + } + + return { + snapshot: { + version: 2, + agents: Array.from(agentMap.values()).sort((a, b) => a.createdAt - b.createdAt), + conversations: normalizedConversations.sort((a, b) => a.createdAt - b.createdAt), + }, + migrated: (raw.version as unknown) !== 2, + }; +} + +export function saveHubStoreSnapshot(snapshot: HubStoreSnapshot): void { + ensureDir(); + writeFileSync(AGENTS_FILE, JSON.stringify(snapshot, null, 2), "utf-8"); +} + +export function loadHubStoreSnapshot(): HubStoreSnapshot { + if (!existsSync(AGENTS_FILE)) return defaultSnapshot(); try { const content = readFileSync(AGENTS_FILE, "utf-8"); - return JSON.parse(content) as AgentRecord[]; + const parsed = JSON.parse(content) as unknown; + const normalized = normalizeSnapshot(parsed); + if (normalized.migrated) { + saveHubStoreSnapshot(normalized.snapshot); + } + return normalized.snapshot; } catch { - return []; + return defaultSnapshot(); } } +export function upsertAgentRecord(record: AgentRecord): void { + const snapshot = loadHubStoreSnapshot(); + const existing = snapshot.agents.filter((item) => item.id !== record.id); + existing.push(record); + snapshot.agents = existing.sort((a, b) => a.createdAt - b.createdAt); + saveHubStoreSnapshot(snapshot); +} + +export function removeAgentRecordById(agentId: string): void { + const snapshot = loadHubStoreSnapshot(); + const agents = snapshot.agents.filter((agent) => agent.id !== agentId); + const conversations = snapshot.conversations.filter((conversation) => conversation.agentId !== agentId); + if (agents.length === snapshot.agents.length && conversations.length === snapshot.conversations.length) { + return; + } + saveHubStoreSnapshot({ + ...snapshot, + agents, + conversations, + }); +} + +export function upsertConversationRecord(record: ConversationRecord): void { + const snapshot = loadHubStoreSnapshot(); + const conversations = snapshot.conversations.filter((item) => item.id !== record.id); + conversations.push(record); + + const hasAgent = snapshot.agents.some((agent) => agent.id === record.agentId); + const agents = hasAgent + ? snapshot.agents + : [ + ...snapshot.agents, + { + id: record.agentId, + createdAt: record.createdAt, + ...(record.profileId ? { profileId: record.profileId } : {}), + }, + ]; + + saveHubStoreSnapshot({ + version: 2, + agents: agents.sort((a, b) => a.createdAt - b.createdAt), + conversations: conversations.sort((a, b) => a.createdAt - b.createdAt), + }); +} + +export function removeConversationRecordById(conversationId: string): void { + const snapshot = loadHubStoreSnapshot(); + const conversations = snapshot.conversations.filter((conversation) => conversation.id !== conversationId); + if (conversations.length === snapshot.conversations.length) { + return; + } + + const activeAgentIds = new Set(conversations.map((conversation) => conversation.agentId)); + const agents = snapshot.agents.filter((agent) => activeAgentIds.has(agent.id)); + saveHubStoreSnapshot({ + ...snapshot, + agents, + conversations, + }); +} + +// Legacy compatibility wrappers +// NOTE: In legacy mode, each agent record is treated as both agent and main conversation. +export function loadAgentRecords(): AgentRecord[] { + return loadHubStoreSnapshot().agents; +} + export function saveAgentRecords(records: AgentRecord[]): void { - ensureDir(); - writeFileSync(AGENTS_FILE, JSON.stringify(records, null, 2), "utf-8"); + const agents = normalizeAgentRecords(records); + const conversations = agents.map((record) => ({ + id: record.id, + agentId: record.id, + createdAt: record.createdAt, + ...(record.profileId ? { profileId: record.profileId } : {}), + })); + saveHubStoreSnapshot({ + version: 2, + agents, + conversations, + }); } export function addAgentRecord(record: AgentRecord): void { - const records = loadAgentRecords(); - if (records.some((r) => r.id === record.id)) return; - records.push(record); - saveAgentRecords(records); + upsertAgentRecord(record); + upsertConversationRecord({ + id: record.id, + agentId: record.id, + createdAt: record.createdAt, + ...(record.profileId ? { profileId: record.profileId } : {}), + }); } export function removeAgentRecord(id: string): void { - const records = loadAgentRecords(); - const filtered = records.filter((r) => r.id !== id); - if (filtered.length !== records.length) { - saveAgentRecords(filtered); + // Legacy API accepts either agent id or conversation id. + const snapshot = loadHubStoreSnapshot(); + const conversation = snapshot.conversations.find((item) => item.id === id); + if (conversation) { + removeConversationRecordById(conversation.id); } + removeAgentRecordById(id); } diff --git a/packages/core/src/hub/hub.ts b/packages/core/src/hub/hub.ts index e5312626..3d0968f4 100644 --- a/packages/core/src/hub/hub.ts +++ b/packages/core/src/hub/hub.ts @@ -17,7 +17,13 @@ import { AsyncAgent } from "../agent/async-agent.js"; import type { AgentOptions } from "../agent/types.js"; import { getHubId } from "./hub-identity.js"; import { setHub } from "./hub-singleton.js"; -import { loadAgentRecords, addAgentRecord, removeAgentRecord } from "./agent-store.js"; +import { + loadHubStoreSnapshot, + upsertAgentRecord, + upsertConversationRecord, + removeConversationRecordById, + removeAgentRecordById, +} from "./agent-store.js"; import { RpcDispatcher, RpcError } from "./rpc/dispatcher.js"; import { createGetAgentMessagesHandler } from "./rpc/handlers/get-agent-messages.js"; import { createGetHubInfoHandler } from "./rpc/handlers/get-hub-info.js"; @@ -75,7 +81,14 @@ export interface InboundMessageEvent { } export class Hub { + // Runtime conversation map (conversationId -> AsyncAgent). private readonly agents = new Map(); + // Conversation ownership map (conversationId -> logical agentId). + private readonly conversationAgents = new Map(); + // Main conversation pointer for each agent (agentId -> mainConversationId). + private readonly agentMainConversations = new Map(); + // Runtime profile for each logical agent. + private readonly agentProfiles = new Map(); private readonly agentSenders = new Map(); private readonly agentStreamIds = new Map(); private readonly agentStreamCounters = new Map(); @@ -112,6 +125,7 @@ export class Hub { this.rpc.register("verify", createVerifyHandler({ hubId: this.hubId, deviceStore: this.deviceStore, + resolveMainConversationId: (agentId) => this.getAgentMainConversationId(agentId), onConfirmDevice: (deviceId, agentId, meta) => { if (!this._onConfirmDevice) { // No UI confirm handler registered (CLI mode etc.) — auto-approve @@ -121,7 +135,9 @@ export class Hub { }, })); this.rpc.register("generateChannelWelcome", createGenerateChannelWelcomeHandler(this)); - this.rpc.register("getAgentMessages", createGetAgentMessagesHandler()); + this.rpc.register("getAgentMessages", createGetAgentMessagesHandler((agentId, conversationId) => { + return this.resolveConversationId(agentId, conversationId); + })); this.rpc.register("getHubInfo", createGetHubInfoHandler(this)); this.rpc.register("listAgents", createListAgentsHandler(this)); this.rpc.register("createAgent", createCreateAgentHandler(this)); @@ -207,20 +223,144 @@ export class Hub { } private getDefaultAgent(): AsyncAgent | null { - const first = this.listAgents()[0]; - if (!first) return null; - return this.getAgent(first) ?? null; + const firstConversationId = this.listConversations()[0]; + if (!firstConversationId) return null; + return this.getConversation(firstConversationId) ?? null; } /** Restore agents from persistent storage */ private restoreAgents(): void { - const records = loadAgentRecords(); - for (const record of records) { - this.createAgent(record.id, { persist: false }); + const snapshot = loadHubStoreSnapshot(); + + for (const agent of snapshot.agents) { + this.agentProfiles.set(agent.id, agent.profileId ?? "default"); } - if (records.length > 0) { - console.log(`[Hub] Restored ${records.length} agent(s)`); + + for (const conversation of snapshot.conversations) { + this.createConversation(conversation.id, { + agentId: conversation.agentId, + profileId: conversation.profileId ?? this.agentProfiles.get(conversation.agentId) ?? "default", + persist: false, + createdAt: conversation.createdAt, + isMainConversation: !this.agentMainConversations.has(conversation.agentId), + }); } + + if (snapshot.conversations.length > 0) { + console.log( + `[Hub] Restored ${snapshot.agents.length} agent(s), ${snapshot.conversations.length} conversation(s)`, + ); + } + } + + private normalizeId(value: string | undefined): string | undefined { + const normalized = (value ?? "").trim(); + return normalized || undefined; + } + + private listConversationIdsForAgent(agentId: string): string[] { + const ids: string[] = []; + for (const [conversationId, ownerAgentId] of this.conversationAgents.entries()) { + const runtime = this.agents.get(conversationId); + if (ownerAgentId === agentId && runtime && !runtime.closed) { + ids.push(conversationId); + } + } + return ids; + } + + private resolveAgentMainConversationId(agentId: string): string | undefined { + const main = this.agentMainConversations.get(agentId); + if (main) { + const runtime = this.agents.get(main); + if (runtime && !runtime.closed) { + return main; + } + } + + const fallback = this.listConversationIdsForAgent(agentId)[0]; + if (!fallback) return undefined; + this.agentMainConversations.set(agentId, fallback); + return fallback; + } + + private resolveAgentId(agentId: string | undefined, conversationId: string): string { + const explicitAgentId = this.normalizeId(agentId); + if (explicitAgentId && this.agentMainConversations.has(explicitAgentId)) { + return explicitAgentId; + } + if (explicitAgentId && this.conversationAgents.has(explicitAgentId)) { + return this.conversationAgents.get(explicitAgentId) ?? explicitAgentId; + } + const owner = this.conversationAgents.get(conversationId); + if (owner) return owner; + return explicitAgentId ?? conversationId; + } + + private resolveTargetAgentId(agentId: string | undefined, fallbackConversationId: string): string { + const normalized = this.normalizeId(agentId); + if (normalized) return normalized; + const firstAgentId = this.listAgents()[0]; + return firstAgentId ?? fallbackConversationId; + } + + private registerAgent( + agentId: string, + options: { profileId: string; createdAt: number; persist: boolean }, + ): void { + const exists = this.agentProfiles.has(agentId); + if (exists) { + const currentProfileId = this.agentProfiles.get(agentId); + if (currentProfileId !== options.profileId) { + this.agentProfiles.set(agentId, options.profileId); + } + return; + } + + this.agentProfiles.set(agentId, options.profileId); + if (options.persist) { + upsertAgentRecord({ + id: agentId, + createdAt: options.createdAt, + profileId: options.profileId, + }); + } + } + + private clearAgentIfNoConversation(agentId: string): void { + const remaining = this.listConversationIdsForAgent(agentId); + if (remaining.length > 0) { + if (!this.agentMainConversations.get(agentId)) { + this.agentMainConversations.set(agentId, remaining[0]!); + } + return; + } + this.agentMainConversations.delete(agentId); + this.agentProfiles.delete(agentId); + removeAgentRecordById(agentId); + } + + private closeConversationRuntime(conversationId: string, options?: { persist?: boolean }): { ok: boolean; agentId?: string } { + const runtime = this.agents.get(conversationId); + if (!runtime) return { ok: false }; + + const agentId = this.conversationAgents.get(conversationId) ?? conversationId; + runtime.close(); + this.approvalManager.cancelPending(conversationId); + this.agents.delete(conversationId); + this.conversationAgents.delete(conversationId); + this.agentSenders.delete(conversationId); + this.agentStreamIds.delete(conversationId); + this.agentStreamCounters.delete(conversationId); + this.clearPendingAssistantStarts(conversationId); + this.suppressedStreamAgents.delete(conversationId); + this.localApprovalHandlers.delete(conversationId); + + if (options?.persist !== false) { + removeConversationRecordById(conversationId); + } + + return { ok: true, agentId }; } private createClient(url: string): GatewayClient { @@ -285,10 +425,11 @@ export class Hub { // Regular chat message const payload = msg.payload as { agentId?: string; conversationId?: string; content?: string } | undefined; - const agentId = payload?.agentId; - const conversationId = this.resolveConversationId(agentId, payload?.conversationId); + const incomingAgentId = payload?.agentId; + const conversationId = this.resolveConversationId(incomingAgentId, payload?.conversationId); + const agentId = this.resolveAgentId(incomingAgentId, conversationId); const content = payload?.content; - if (!agentId || !content) { + if (!incomingAgentId || !content) { console.warn(`[Hub] Invalid payload, missing agentId or content`); return; } @@ -306,7 +447,7 @@ export class Hub { }); agent.write(content, { source }); } else { - console.warn(`[Hub] Conversation not found or closed: ${conversationId} (agent=${agentId})`); + console.warn(`[Hub] Conversation not found or closed: ${conversationId} (agent=${incomingAgentId})`); } }); @@ -348,7 +489,10 @@ export class Hub { /** Register a one-time token for device verification (called when QR code is generated) */ registerToken(token: string, agentId: string, expiresAt: number): void { - this.deviceStore.registerToken(token, agentId, expiresAt); + const normalizedAgentId = this.normalizeId(agentId); + if (!normalizedAgentId) return; + const resolvedAgentId = this.conversationAgents.get(normalizedAgentId) ?? normalizedAgentId; + this.deviceStore.registerToken(token, resolvedAgentId, expiresAt); } /** 重连到新的 Gateway 地址 */ @@ -375,47 +519,96 @@ export class Hub { return this.approvalManager.resolveApproval(approvalId, decision); } - /** Create new Agent, or rebuild with existing ID */ - createAgent(id?: string, options?: { persist?: boolean; profileId?: string }): AsyncAgent { - if (id) { - const existing = this.agents.get(id); + /** Create a logical agent and its main conversation runtime. */ + createAgent( + id?: string, + options?: { persist?: boolean; profileId?: string; mainConversationId?: string; createdAt?: number }, + ): AsyncAgent { + const agentId = this.normalizeId(id) ?? uuidv7(); + const existingMainConversationId = this.resolveAgentMainConversationId(agentId); + if (existingMainConversationId) { + const existing = this.agents.get(existingMainConversationId); if (existing && !existing.closed) { return existing; } } - const profileId = options?.profileId ?? "default"; - const sessionId = id ?? uuidv7(); - const onExecApprovalNeeded = this.createExecApprovalCallback(sessionId, profileId); - const onChannelSendFile = this.createChannelSendFileCallback(sessionId); - const channels = this.channelManager.listChannelInfos(); - const agent = new AsyncAgent({ sessionId, profileId, onExecApprovalNeeded, onChannelSendFile, channels }); - this.agents.set(agent.sessionId, agent); - - // Persist to agent store (skip during restore to avoid duplicates) - if (options?.persist !== false) { - addAgentRecord({ id: agent.sessionId, createdAt: Date.now() }); - } - - // Internally consume agent output (AgentEvent stream + error Messages) - void this.consumeAgent(agent); - this.heartbeatRunner?.updateConfig(); - - console.log(`Agent created: ${agent.sessionId}`); - return agent; + const mainConversationId = this.normalizeId(options?.mainConversationId) ?? agentId; + return this.createConversation(mainConversationId, { + agentId, + profileId: options?.profileId, + persist: options?.persist, + isMainConversation: true, + createdAt: options?.createdAt, + }); } /** * Create a new conversation runtime. * * Semantics: - * - Agent = capability/profile definition - * - Conversation = one isolated runtime/session thread - * - * Current runtime stores one AsyncAgent per conversation. + * - Agent = long-lived capability/profile identity + * - Conversation = isolated runtime/session thread */ - createConversation(id?: string, options?: { persist?: boolean; profileId?: string }): AsyncAgent { - return this.createAgent(id, options); + createConversation( + id?: string, + options?: { + persist?: boolean; + profileId?: string; + agentId?: string; + isMainConversation?: boolean; + createdAt?: number; + }, + ): AsyncAgent { + const conversationId = this.normalizeId(id) ?? uuidv7(); + const existing = this.agents.get(conversationId); + if (existing && !existing.closed) { + return existing; + } + + const targetAgentId = this.resolveTargetAgentId(options?.agentId, conversationId); + const profileId = options?.profileId ?? this.agentProfiles.get(targetAgentId) ?? "default"; + const createdAt = options?.createdAt ?? Date.now(); + const persist = options?.persist !== false; + + this.registerAgent(targetAgentId, { + profileId, + createdAt, + persist, + }); + + const onExecApprovalNeeded = this.createExecApprovalCallback(conversationId, targetAgentId, profileId); + const onChannelSendFile = this.createChannelSendFileCallback(conversationId); + const channels = this.channelManager.listChannelInfos(); + const agent = new AsyncAgent({ + sessionId: conversationId, + profileId, + onExecApprovalNeeded, + onChannelSendFile, + channels, + }); + + this.agents.set(conversationId, agent); + this.conversationAgents.set(conversationId, targetAgentId); + if (options?.isMainConversation || !this.agentMainConversations.has(targetAgentId)) { + this.agentMainConversations.set(targetAgentId, conversationId); + } + + if (persist) { + upsertConversationRecord({ + id: conversationId, + agentId: targetAgentId, + createdAt, + profileId, + }); + } + + // Internally consume agent output (AgentEvent stream + error Messages) + void this.consumeAgent(agent); + this.heartbeatRunner?.updateConfig(); + + console.log(`[Hub] Conversation created: ${conversationId} (agent: ${targetAgentId})`); + return agent; } private getMessageIdFromEvent(event: unknown): string | undefined { @@ -427,10 +620,17 @@ export class Hub { } private resolveConversationId(agentId: string | undefined, conversationId?: string): string { - const fallback = (agentId ?? "").trim(); - if (!fallback) return ""; - const normalizedConversationId = (conversationId ?? "").trim(); - return normalizedConversationId || fallback; + const normalizedConversationId = this.normalizeId(conversationId); + if (normalizedConversationId) return normalizedConversationId; + + const normalizedAgentId = this.normalizeId(agentId); + if (!normalizedAgentId) return ""; + + const mainConversationId = this.resolveAgentMainConversationId(normalizedAgentId); + if (mainConversationId) return mainConversationId; + + // Legacy fallback: treat agentId as conversationId when no mapping exists yet. + return normalizedAgentId; } private beginStream(agentId: string, event: unknown): string { @@ -465,7 +665,7 @@ export class Hub { /** Internally read agent output and send via Gateway */ private async consumeAgent(agent: AsyncAgent): Promise { const conversationId = agent.sessionId; - const agentId = agent.sessionId; + const agentId = this.conversationAgents.get(conversationId) ?? conversationId; for await (const item of agent.read()) { const targetDeviceId = this.agentSenders.get(conversationId); if (!targetDeviceId) continue; @@ -619,7 +819,7 @@ export class Hub { * Create an exec approval callback for an agent. * This wires the safety evaluation + Hub approval manager together. */ - private createExecApprovalCallback(conversationId: string, profileId: string): ExecApprovalCallback { + private createExecApprovalCallback(conversationId: string, agentId: string, profileId: string): ExecApprovalCallback { return async (command: string, cwd: string | undefined): Promise => { // Load exec approval config from profile let config: ExecApprovalConfig = {}; @@ -673,7 +873,7 @@ export class Hub { // Request approval via Hub → Gateway → Client const result = await this.approvalManager.requestApproval({ - agentId: conversationId, + agentId, conversationId, command, ...(cwd !== undefined ? { cwd } : {}), @@ -740,21 +940,57 @@ export class Hub { } getAgent(id: string): AsyncAgent | undefined { - return this.agents.get(id); + const normalizedId = this.normalizeId(id); + if (!normalizedId) return undefined; + + const directConversation = this.agents.get(normalizedId); + if (directConversation && !directConversation.closed) { + return directConversation; + } + + const mainConversationId = this.resolveAgentMainConversationId(normalizedId); + if (!mainConversationId) return undefined; + const mainConversation = this.agents.get(mainConversationId); + if (!mainConversation || mainConversation.closed) return undefined; + return mainConversation; } getConversation(id: string): AsyncAgent | undefined { - return this.getAgent(id); + const normalizedId = this.normalizeId(id); + if (!normalizedId) return undefined; + const conversation = this.agents.get(normalizedId); + if (!conversation || conversation.closed) return undefined; + return conversation; + } + + getConversationAgentId(conversationId: string): string | undefined { + const normalizedConversationId = this.normalizeId(conversationId); + if (!normalizedConversationId) return undefined; + return this.conversationAgents.get(normalizedConversationId); + } + + getAgentMainConversationId(agentId: string): string | undefined { + const normalizedAgentId = this.normalizeId(agentId); + if (!normalizedAgentId) return undefined; + return this.resolveAgentMainConversationId(normalizedAgentId); } listAgents(): string[] { - return Array.from(this.agents.entries()) - .filter(([, a]) => !a.closed) - .map(([id]) => id); + const activeAgentIds = new Set(); + for (const [conversationId, runtime] of this.agents.entries()) { + if (runtime.closed) continue; + const agentId = this.conversationAgents.get(conversationId); + if (agentId) { + activeAgentIds.add(agentId); + } + } + return Array.from(activeAgentIds.values()); } listConversations(): string[] { - return this.listAgents(); + return Array.from(this.agents.entries()) + .filter(([conversationId, runtime]) => !runtime.closed && this.conversationAgents.has(conversationId)) + .map(([conversationId]) => conversationId); } /** Subscribe heartbeat state updates. Returns unsubscribe callback. */ @@ -810,29 +1046,60 @@ export class Hub { /** Enqueue a system event for a specific agent or the default agent. */ enqueueSystemEvent(text: string, opts?: { agentId?: string }): void { const agentId = opts?.agentId ?? this.listAgents()[0]; - if (!agentId) return; - enqueueSystemEvent(text, { sessionKey: agentId }); + const conversationId = this.resolveConversationId(agentId, undefined); + if (!conversationId) return; + enqueueSystemEvent(text, { sessionKey: conversationId }); } closeAgent(id: string): boolean { - const agent = this.agents.get(id); - if (!agent) return false; - agent.close(); - this.approvalManager.cancelPending(id); - this.agents.delete(id); - this.agentSenders.delete(id); - this.agentStreamIds.delete(id); - this.agentStreamCounters.delete(id); - this.clearPendingAssistantStarts(id); - this.suppressedStreamAgents.delete(id); - this.localApprovalHandlers.delete(id); - removeAgentRecord(id); + const normalizedId = this.normalizeId(id); + if (!normalizedId) return false; + + const resolvedAgentId = this.agentMainConversations.has(normalizedId) + ? normalizedId + : this.conversationAgents.get(normalizedId) ?? normalizedId; + const conversationIds = this.listConversationIdsForAgent(resolvedAgentId); + if (conversationIds.length === 0) { + return this.closeConversation(normalizedId); + } + + let closedAny = false; + for (const conversationId of conversationIds) { + const closed = this.closeConversationRuntime(conversationId, { persist: false }); + closedAny = closedAny || closed.ok; + } + if (!closedAny) return false; + + this.agentMainConversations.delete(resolvedAgentId); + this.agentProfiles.delete(resolvedAgentId); + removeAgentRecordById(resolvedAgentId); this.heartbeatRunner?.updateConfig(); - return true; + return closedAny; } closeConversation(id: string): boolean { - return this.closeAgent(id); + const normalizedId = this.normalizeId(id); + if (!normalizedId) return false; + const conversationId = this.agents.has(normalizedId) + ? normalizedId + : this.resolveAgentMainConversationId(normalizedId); + if (!conversationId) return false; + + const { ok, agentId } = this.closeConversationRuntime(conversationId); + if (!ok || !agentId) return false; + + const currentMainConversationId = this.agentMainConversations.get(agentId); + if (currentMainConversationId === conversationId) { + this.agentMainConversations.delete(agentId); + const replacementConversationId = this.listConversationIdsForAgent(agentId)[0]; + if (replacementConversationId) { + this.agentMainConversations.set(agentId, replacementConversationId); + } + } + + this.clearAgentIfNoConversation(agentId); + this.heartbeatRunner?.updateConfig(); + return true; } shutdown(): void { @@ -847,16 +1114,19 @@ export class Hub { this.heartbeatUnsubscribe = null; this.heartbeatListeners.clear(); - for (const [id, agent] of this.agents) { + for (const [conversationId, agent] of this.agents) { agent.close(); - this.agents.delete(id); - this.agentSenders.delete(id); - this.agentStreamIds.delete(id); - this.agentStreamCounters.delete(id); - this.clearPendingAssistantStarts(id); - this.suppressedStreamAgents.delete(id); - this.localApprovalHandlers.delete(id); + this.agents.delete(conversationId); + this.conversationAgents.delete(conversationId); + this.agentSenders.delete(conversationId); + this.agentStreamIds.delete(conversationId); + this.agentStreamCounters.delete(conversationId); + this.clearPendingAssistantStarts(conversationId); + this.suppressedStreamAgents.delete(conversationId); + this.localApprovalHandlers.delete(conversationId); } + this.agentMainConversations.clear(); + this.agentProfiles.clear(); this.client.disconnect(); console.log("Hub shut down"); } diff --git a/packages/core/src/hub/rpc/handlers/create-conversation.test.ts b/packages/core/src/hub/rpc/handlers/create-conversation.test.ts index ad308ff7..81a86bc2 100644 --- a/packages/core/src/hub/rpc/handlers/create-conversation.test.ts +++ b/packages/core/src/hub/rpc/handlers/create-conversation.test.ts @@ -2,13 +2,13 @@ import { describe, it, expect, vi } from "vitest"; import { createCreateConversationHandler } from "./create-conversation.js"; describe("createCreateConversationHandler", () => { - it("creates conversation with explicit id", () => { + it("creates conversation with explicit id and agent id", () => { const createConversation = vi.fn(() => ({ sessionId: "conv-1" })); const handler = createCreateConversationHandler({ createConversation }); - const result = handler({ id: "custom-id" }, "device-1") as { id: string }; + const result = handler({ id: "custom-id", agentId: "agent-1" }, "device-1") as { id: string }; - expect(createConversation).toHaveBeenCalledWith("custom-id"); + expect(createConversation).toHaveBeenCalledWith("custom-id", { agentId: "agent-1" }); expect(result).toEqual({ id: "conv-1" }); }); @@ -18,7 +18,7 @@ describe("createCreateConversationHandler", () => { const result = handler(undefined, "device-1") as { id: string }; - expect(createConversation).toHaveBeenCalledWith(undefined); + expect(createConversation).toHaveBeenCalledWith(undefined, { agentId: undefined }); expect(result).toEqual({ id: "conv-2" }); }); }); diff --git a/packages/core/src/hub/rpc/handlers/create-conversation.ts b/packages/core/src/hub/rpc/handlers/create-conversation.ts index 15a570ab..6862aadd 100644 --- a/packages/core/src/hub/rpc/handlers/create-conversation.ts +++ b/packages/core/src/hub/rpc/handlers/create-conversation.ts @@ -1,13 +1,13 @@ import type { RpcHandler } from "../dispatcher.js"; interface HubLike { - createConversation(id?: string): { sessionId: string }; + createConversation(id?: string, options?: { agentId?: string }): { sessionId: string }; } export function createCreateConversationHandler(hub: HubLike): RpcHandler { return (params: unknown) => { - const { id } = (params ?? {}) as { id?: string }; - const conversation = hub.createConversation(id); + const { id, agentId } = (params ?? {}) as { id?: string; agentId?: string }; + const conversation = hub.createConversation(id, { agentId }); return { id: conversation.sessionId }; }; } diff --git a/packages/core/src/hub/rpc/handlers/get-agent-messages.ts b/packages/core/src/hub/rpc/handlers/get-agent-messages.ts index e5a164da..75b7ac26 100644 --- a/packages/core/src/hub/rpc/handlers/get-agent-messages.ts +++ b/packages/core/src/hub/rpc/handlers/get-agent-messages.ts @@ -13,7 +13,9 @@ interface GetAgentMessagesParams { limit?: number; } -export function createGetAgentMessagesHandler(): RpcHandler { +type ConversationResolver = (agentId: string, conversationId?: string) => string; + +export function createGetAgentMessagesHandler(resolveConversationId?: ConversationResolver): RpcHandler { return (params: unknown) => { if (!params || typeof params !== "object") { throw new RpcError("INVALID_PARAMS", "params must be an object"); @@ -23,7 +25,10 @@ export function createGetAgentMessagesHandler(): RpcHandler { if (!agentId) { throw new RpcError("INVALID_PARAMS", "Missing required param: agentId"); } - const resolvedConversationId = (conversationId ?? "").trim() || agentId; + const fallbackConversationId = (conversationId ?? "").trim() || agentId; + const resolvedConversationId = resolveConversationId + ? resolveConversationId(agentId, conversationId) + : fallbackConversationId; const sessionPath = resolveSessionPath(resolvedConversationId); if (!existsSync(sessionPath)) { diff --git a/packages/core/src/hub/rpc/handlers/verify.ts b/packages/core/src/hub/rpc/handlers/verify.ts index 68c6a9d3..8ad87d18 100644 --- a/packages/core/src/hub/rpc/handlers/verify.ts +++ b/packages/core/src/hub/rpc/handlers/verify.ts @@ -5,6 +5,7 @@ import type { DeviceStore, DeviceMeta } from "../../device-store.js"; interface VerifyContext { hubId: string; deviceStore: DeviceStore; + resolveMainConversationId?: (agentId: string) => string | undefined; /** Called for first-time connections. Returns true if user approves, false if rejected. */ onConfirmDevice: (deviceId: string, agentId: string, meta?: DeviceMeta) => Promise; } @@ -21,10 +22,11 @@ export function createVerifyHandler(ctx: VerifyContext): RpcHandler { // 1. Already in whitelist → pass through (reconnection, no confirmation needed) const allowed = ctx.deviceStore.isAllowed(from); if (allowed) { + const mainConversationId = ctx.resolveMainConversationId?.(allowed.agentId) ?? allowed.agentId; return { hubId: ctx.hubId, agentId: allowed.agentId, - mainConversationId: allowed.agentId, + mainConversationId, isNewDevice: false, }; } @@ -47,10 +49,11 @@ export function createVerifyHandler(ctx: VerifyContext): RpcHandler { // 4. User confirmed → add to whitelist (with device metadata) ctx.deviceStore.allowDevice(from, result.agentId, meta); + const mainConversationId = ctx.resolveMainConversationId?.(result.agentId) ?? result.agentId; return { hubId: ctx.hubId, agentId: result.agentId, - mainConversationId: result.agentId, + mainConversationId, isNewDevice: true, }; }; diff --git a/packages/sdk/src/actions/rpc.ts b/packages/sdk/src/actions/rpc.ts index 2849e655..aa104da1 100644 --- a/packages/sdk/src/actions/rpc.ts +++ b/packages/sdk/src/actions/rpc.ts @@ -127,9 +127,10 @@ export interface CreateAgentResult { id: string; } -/** createConversation - request params (conversation-first alias of createAgent) */ +/** createConversation - request params (create a conversation, optionally under a specific agent) */ export interface CreateConversationParams { id?: string; + agentId?: string; } /** createConversation - response payload */