refactor(hub): decouple agent and conversation runtime model

This commit is contained in:
Jiayuan Zhang 2026-02-17 02:59:39 +08:00
parent 6a778e38e7
commit b7b3d323b8
9 changed files with 690 additions and 108 deletions

View file

@ -118,9 +118,10 @@ export interface CreateAgentResult {
id: string;
}
/** createConversation - request params (conversation-first alias of createAgent) */
/** createConversation - request params (create a conversation, optionally under a specific agent) */
export interface CreateConversationParams {
id?: string;
agentId?: string;
}
/** createConversation - response payload */

View file

@ -0,0 +1,80 @@
import { mkdtempSync, mkdirSync, readFileSync, rmSync, writeFileSync } from "node:fs";
import { join } from "node:path";
import { tmpdir } from "node:os";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
describe("hub agent store", () => {
let testDir: string;
let previousDataDir: string | undefined;
beforeEach(() => {
testDir = mkdtempSync(join(tmpdir(), "multica-agent-store-"));
previousDataDir = process.env.SMC_DATA_DIR;
process.env.SMC_DATA_DIR = testDir;
vi.resetModules();
});
afterEach(() => {
if (previousDataDir === undefined) {
delete process.env.SMC_DATA_DIR;
} else {
process.env.SMC_DATA_DIR = previousDataDir;
}
vi.resetModules();
rmSync(testDir, { recursive: true, force: true });
});
it("migrates legacy single-layer records into agent+conversation snapshot", async () => {
const agentsDir = join(testDir, "agents");
mkdirSync(agentsDir, { recursive: true });
writeFileSync(
join(agentsDir, "agents.json"),
JSON.stringify([
{ id: "legacy-a", createdAt: 123 },
], null, 2),
"utf-8",
);
const store = await import("./agent-store.js");
const snapshot = store.loadHubStoreSnapshot();
expect(snapshot.version).toBe(2);
expect(snapshot.agents).toEqual([{ id: "legacy-a", createdAt: 123 }]);
expect(snapshot.conversations).toEqual([{ id: "legacy-a", agentId: "legacy-a", createdAt: 123 }]);
const persisted = JSON.parse(readFileSync(join(agentsDir, "agents.json"), "utf-8")) as {
version: number;
};
expect(persisted.version).toBe(2);
});
it("upserts conversations and auto-creates missing agents", async () => {
const store = await import("./agent-store.js");
store.upsertConversationRecord({
id: "conv-1",
agentId: "agent-1",
createdAt: 100,
});
const snapshot = store.loadHubStoreSnapshot();
expect(snapshot.agents).toEqual([{ id: "agent-1", createdAt: 100 }]);
expect(snapshot.conversations).toEqual([
{ id: "conv-1", agentId: "agent-1", createdAt: 100 },
]);
});
it("removes empty agent after last conversation is deleted", async () => {
const store = await import("./agent-store.js");
store.upsertConversationRecord({ id: "conv-1", agentId: "agent-1", createdAt: 100 });
store.upsertConversationRecord({ id: "conv-2", agentId: "agent-1", createdAt: 101 });
store.removeConversationRecordById("conv-1");
expect(store.loadHubStoreSnapshot().agents).toEqual([{ id: "agent-1", createdAt: 100 }]);
store.removeConversationRecordById("conv-2");
const snapshot = store.loadHubStoreSnapshot();
expect(snapshot.agents).toEqual([]);
expect(snapshot.conversations).toEqual([]);
});
});

View file

@ -5,6 +5,20 @@ import { DATA_DIR } from "@multica/utils";
export interface AgentRecord {
id: string;
createdAt: number;
profileId?: string;
}
export interface ConversationRecord {
id: string;
agentId: string;
createdAt: number;
profileId?: string;
}
export interface HubStoreSnapshot {
version: 2;
agents: AgentRecord[];
conversations: ConversationRecord[];
}
const AGENTS_DIR = join(DATA_DIR, "agents");
@ -16,32 +30,240 @@ function ensureDir(): void {
}
}
export function loadAgentRecords(): AgentRecord[] {
if (!existsSync(AGENTS_FILE)) return [];
function defaultSnapshot(): HubStoreSnapshot {
return {
version: 2,
agents: [],
conversations: [],
};
}
function isRecordLike(value: unknown): value is Record<string, unknown> {
return !!value && typeof value === "object";
}
function normalizeCreatedAt(raw: unknown): number {
if (typeof raw === "number" && Number.isFinite(raw)) {
return raw;
}
return Date.now();
}
function normalizeAgentRecords(input: unknown): AgentRecord[] {
if (!Array.isArray(input)) return [];
const dedup = new Map<string, AgentRecord>();
for (const item of input) {
if (!isRecordLike(item) || typeof item.id !== "string" || !item.id.trim()) continue;
const id = item.id.trim();
if (dedup.has(id)) continue;
dedup.set(id, {
id,
createdAt: normalizeCreatedAt(item.createdAt),
...(typeof item.profileId === "string" && item.profileId.trim() ? { profileId: item.profileId.trim() } : {}),
});
}
return Array.from(dedup.values()).sort((a, b) => a.createdAt - b.createdAt);
}
function normalizeConversationRecords(input: unknown): ConversationRecord[] {
if (!Array.isArray(input)) return [];
const dedup = new Map<string, ConversationRecord>();
for (const item of input) {
if (!isRecordLike(item) || typeof item.id !== "string" || !item.id.trim()) continue;
if (typeof item.agentId !== "string" || !item.agentId.trim()) continue;
const id = item.id.trim();
if (dedup.has(id)) continue;
dedup.set(id, {
id,
agentId: item.agentId.trim(),
createdAt: normalizeCreatedAt(item.createdAt),
...(typeof item.profileId === "string" && item.profileId.trim() ? { profileId: item.profileId.trim() } : {}),
});
}
return Array.from(dedup.values()).sort((a, b) => a.createdAt - b.createdAt);
}
function normalizeSnapshot(raw: unknown): { snapshot: HubStoreSnapshot; migrated: boolean } {
// Legacy format: AgentRecord[]
if (Array.isArray(raw)) {
const legacyAgents = normalizeAgentRecords(raw);
const conversations: ConversationRecord[] = legacyAgents.map((record) => ({
id: record.id,
agentId: record.id,
createdAt: record.createdAt,
...(record.profileId ? { profileId: record.profileId } : {}),
}));
return {
snapshot: {
version: 2,
agents: legacyAgents,
conversations,
},
migrated: true,
};
}
if (!isRecordLike(raw)) {
return { snapshot: defaultSnapshot(), migrated: false };
}
const agents = normalizeAgentRecords(raw.agents);
const conversations = normalizeConversationRecords(raw.conversations);
const agentMap = new Map<string, AgentRecord>(agents.map((agent) => [agent.id, agent]));
const normalizedConversations: ConversationRecord[] = [];
for (const conversation of conversations) {
if (!agentMap.has(conversation.agentId)) {
agentMap.set(conversation.agentId, {
id: conversation.agentId,
createdAt: conversation.createdAt,
...(conversation.profileId ? { profileId: conversation.profileId } : {}),
});
}
normalizedConversations.push(conversation);
}
// Ensure each agent has a main conversation for compatibility fallback.
for (const agent of agentMap.values()) {
const hasConversation = normalizedConversations.some((conversation) => conversation.agentId === agent.id);
if (hasConversation) continue;
normalizedConversations.push({
id: agent.id,
agentId: agent.id,
createdAt: agent.createdAt,
...(agent.profileId ? { profileId: agent.profileId } : {}),
});
}
return {
snapshot: {
version: 2,
agents: Array.from(agentMap.values()).sort((a, b) => a.createdAt - b.createdAt),
conversations: normalizedConversations.sort((a, b) => a.createdAt - b.createdAt),
},
migrated: (raw.version as unknown) !== 2,
};
}
export function saveHubStoreSnapshot(snapshot: HubStoreSnapshot): void {
ensureDir();
writeFileSync(AGENTS_FILE, JSON.stringify(snapshot, null, 2), "utf-8");
}
export function loadHubStoreSnapshot(): HubStoreSnapshot {
if (!existsSync(AGENTS_FILE)) return defaultSnapshot();
try {
const content = readFileSync(AGENTS_FILE, "utf-8");
return JSON.parse(content) as AgentRecord[];
const parsed = JSON.parse(content) as unknown;
const normalized = normalizeSnapshot(parsed);
if (normalized.migrated) {
saveHubStoreSnapshot(normalized.snapshot);
}
return normalized.snapshot;
} catch {
return [];
return defaultSnapshot();
}
}
export function upsertAgentRecord(record: AgentRecord): void {
const snapshot = loadHubStoreSnapshot();
const existing = snapshot.agents.filter((item) => item.id !== record.id);
existing.push(record);
snapshot.agents = existing.sort((a, b) => a.createdAt - b.createdAt);
saveHubStoreSnapshot(snapshot);
}
export function removeAgentRecordById(agentId: string): void {
const snapshot = loadHubStoreSnapshot();
const agents = snapshot.agents.filter((agent) => agent.id !== agentId);
const conversations = snapshot.conversations.filter((conversation) => conversation.agentId !== agentId);
if (agents.length === snapshot.agents.length && conversations.length === snapshot.conversations.length) {
return;
}
saveHubStoreSnapshot({
...snapshot,
agents,
conversations,
});
}
export function upsertConversationRecord(record: ConversationRecord): void {
const snapshot = loadHubStoreSnapshot();
const conversations = snapshot.conversations.filter((item) => item.id !== record.id);
conversations.push(record);
const hasAgent = snapshot.agents.some((agent) => agent.id === record.agentId);
const agents = hasAgent
? snapshot.agents
: [
...snapshot.agents,
{
id: record.agentId,
createdAt: record.createdAt,
...(record.profileId ? { profileId: record.profileId } : {}),
},
];
saveHubStoreSnapshot({
version: 2,
agents: agents.sort((a, b) => a.createdAt - b.createdAt),
conversations: conversations.sort((a, b) => a.createdAt - b.createdAt),
});
}
export function removeConversationRecordById(conversationId: string): void {
const snapshot = loadHubStoreSnapshot();
const conversations = snapshot.conversations.filter((conversation) => conversation.id !== conversationId);
if (conversations.length === snapshot.conversations.length) {
return;
}
const activeAgentIds = new Set(conversations.map((conversation) => conversation.agentId));
const agents = snapshot.agents.filter((agent) => activeAgentIds.has(agent.id));
saveHubStoreSnapshot({
...snapshot,
agents,
conversations,
});
}
// Legacy compatibility wrappers
// NOTE: In legacy mode, each agent record is treated as both agent and main conversation.
export function loadAgentRecords(): AgentRecord[] {
return loadHubStoreSnapshot().agents;
}
export function saveAgentRecords(records: AgentRecord[]): void {
ensureDir();
writeFileSync(AGENTS_FILE, JSON.stringify(records, null, 2), "utf-8");
const agents = normalizeAgentRecords(records);
const conversations = agents.map((record) => ({
id: record.id,
agentId: record.id,
createdAt: record.createdAt,
...(record.profileId ? { profileId: record.profileId } : {}),
}));
saveHubStoreSnapshot({
version: 2,
agents,
conversations,
});
}
export function addAgentRecord(record: AgentRecord): void {
const records = loadAgentRecords();
if (records.some((r) => r.id === record.id)) return;
records.push(record);
saveAgentRecords(records);
upsertAgentRecord(record);
upsertConversationRecord({
id: record.id,
agentId: record.id,
createdAt: record.createdAt,
...(record.profileId ? { profileId: record.profileId } : {}),
});
}
export function removeAgentRecord(id: string): void {
const records = loadAgentRecords();
const filtered = records.filter((r) => r.id !== id);
if (filtered.length !== records.length) {
saveAgentRecords(filtered);
// Legacy API accepts either agent id or conversation id.
const snapshot = loadHubStoreSnapshot();
const conversation = snapshot.conversations.find((item) => item.id === id);
if (conversation) {
removeConversationRecordById(conversation.id);
}
removeAgentRecordById(id);
}

View file

@ -17,7 +17,13 @@ import { AsyncAgent } from "../agent/async-agent.js";
import type { AgentOptions } from "../agent/types.js";
import { getHubId } from "./hub-identity.js";
import { setHub } from "./hub-singleton.js";
import { loadAgentRecords, addAgentRecord, removeAgentRecord } from "./agent-store.js";
import {
loadHubStoreSnapshot,
upsertAgentRecord,
upsertConversationRecord,
removeConversationRecordById,
removeAgentRecordById,
} from "./agent-store.js";
import { RpcDispatcher, RpcError } from "./rpc/dispatcher.js";
import { createGetAgentMessagesHandler } from "./rpc/handlers/get-agent-messages.js";
import { createGetHubInfoHandler } from "./rpc/handlers/get-hub-info.js";
@ -75,7 +81,14 @@ export interface InboundMessageEvent {
}
export class Hub {
// Runtime conversation map (conversationId -> AsyncAgent).
private readonly agents = new Map<string, AsyncAgent>();
// Conversation ownership map (conversationId -> logical agentId).
private readonly conversationAgents = new Map<string, string>();
// Main conversation pointer for each agent (agentId -> mainConversationId).
private readonly agentMainConversations = new Map<string, string>();
// Runtime profile for each logical agent.
private readonly agentProfiles = new Map<string, string>();
private readonly agentSenders = new Map<string, string>();
private readonly agentStreamIds = new Map<string, string>();
private readonly agentStreamCounters = new Map<string, number>();
@ -112,6 +125,7 @@ export class Hub {
this.rpc.register("verify", createVerifyHandler({
hubId: this.hubId,
deviceStore: this.deviceStore,
resolveMainConversationId: (agentId) => this.getAgentMainConversationId(agentId),
onConfirmDevice: (deviceId, agentId, meta) => {
if (!this._onConfirmDevice) {
// No UI confirm handler registered (CLI mode etc.) — auto-approve
@ -121,7 +135,9 @@ export class Hub {
},
}));
this.rpc.register("generateChannelWelcome", createGenerateChannelWelcomeHandler(this));
this.rpc.register("getAgentMessages", createGetAgentMessagesHandler());
this.rpc.register("getAgentMessages", createGetAgentMessagesHandler((agentId, conversationId) => {
return this.resolveConversationId(agentId, conversationId);
}));
this.rpc.register("getHubInfo", createGetHubInfoHandler(this));
this.rpc.register("listAgents", createListAgentsHandler(this));
this.rpc.register("createAgent", createCreateAgentHandler(this));
@ -207,20 +223,144 @@ export class Hub {
}
private getDefaultAgent(): AsyncAgent | null {
const first = this.listAgents()[0];
if (!first) return null;
return this.getAgent(first) ?? null;
const firstConversationId = this.listConversations()[0];
if (!firstConversationId) return null;
return this.getConversation(firstConversationId) ?? null;
}
/** Restore agents from persistent storage */
private restoreAgents(): void {
const records = loadAgentRecords();
for (const record of records) {
this.createAgent(record.id, { persist: false });
const snapshot = loadHubStoreSnapshot();
for (const agent of snapshot.agents) {
this.agentProfiles.set(agent.id, agent.profileId ?? "default");
}
if (records.length > 0) {
console.log(`[Hub] Restored ${records.length} agent(s)`);
for (const conversation of snapshot.conversations) {
this.createConversation(conversation.id, {
agentId: conversation.agentId,
profileId: conversation.profileId ?? this.agentProfiles.get(conversation.agentId) ?? "default",
persist: false,
createdAt: conversation.createdAt,
isMainConversation: !this.agentMainConversations.has(conversation.agentId),
});
}
if (snapshot.conversations.length > 0) {
console.log(
`[Hub] Restored ${snapshot.agents.length} agent(s), ${snapshot.conversations.length} conversation(s)`,
);
}
}
private normalizeId(value: string | undefined): string | undefined {
const normalized = (value ?? "").trim();
return normalized || undefined;
}
private listConversationIdsForAgent(agentId: string): string[] {
const ids: string[] = [];
for (const [conversationId, ownerAgentId] of this.conversationAgents.entries()) {
const runtime = this.agents.get(conversationId);
if (ownerAgentId === agentId && runtime && !runtime.closed) {
ids.push(conversationId);
}
}
return ids;
}
private resolveAgentMainConversationId(agentId: string): string | undefined {
const main = this.agentMainConversations.get(agentId);
if (main) {
const runtime = this.agents.get(main);
if (runtime && !runtime.closed) {
return main;
}
}
const fallback = this.listConversationIdsForAgent(agentId)[0];
if (!fallback) return undefined;
this.agentMainConversations.set(agentId, fallback);
return fallback;
}
private resolveAgentId(agentId: string | undefined, conversationId: string): string {
const explicitAgentId = this.normalizeId(agentId);
if (explicitAgentId && this.agentMainConversations.has(explicitAgentId)) {
return explicitAgentId;
}
if (explicitAgentId && this.conversationAgents.has(explicitAgentId)) {
return this.conversationAgents.get(explicitAgentId) ?? explicitAgentId;
}
const owner = this.conversationAgents.get(conversationId);
if (owner) return owner;
return explicitAgentId ?? conversationId;
}
private resolveTargetAgentId(agentId: string | undefined, fallbackConversationId: string): string {
const normalized = this.normalizeId(agentId);
if (normalized) return normalized;
const firstAgentId = this.listAgents()[0];
return firstAgentId ?? fallbackConversationId;
}
private registerAgent(
agentId: string,
options: { profileId: string; createdAt: number; persist: boolean },
): void {
const exists = this.agentProfiles.has(agentId);
if (exists) {
const currentProfileId = this.agentProfiles.get(agentId);
if (currentProfileId !== options.profileId) {
this.agentProfiles.set(agentId, options.profileId);
}
return;
}
this.agentProfiles.set(agentId, options.profileId);
if (options.persist) {
upsertAgentRecord({
id: agentId,
createdAt: options.createdAt,
profileId: options.profileId,
});
}
}
private clearAgentIfNoConversation(agentId: string): void {
const remaining = this.listConversationIdsForAgent(agentId);
if (remaining.length > 0) {
if (!this.agentMainConversations.get(agentId)) {
this.agentMainConversations.set(agentId, remaining[0]!);
}
return;
}
this.agentMainConversations.delete(agentId);
this.agentProfiles.delete(agentId);
removeAgentRecordById(agentId);
}
private closeConversationRuntime(conversationId: string, options?: { persist?: boolean }): { ok: boolean; agentId?: string } {
const runtime = this.agents.get(conversationId);
if (!runtime) return { ok: false };
const agentId = this.conversationAgents.get(conversationId) ?? conversationId;
runtime.close();
this.approvalManager.cancelPending(conversationId);
this.agents.delete(conversationId);
this.conversationAgents.delete(conversationId);
this.agentSenders.delete(conversationId);
this.agentStreamIds.delete(conversationId);
this.agentStreamCounters.delete(conversationId);
this.clearPendingAssistantStarts(conversationId);
this.suppressedStreamAgents.delete(conversationId);
this.localApprovalHandlers.delete(conversationId);
if (options?.persist !== false) {
removeConversationRecordById(conversationId);
}
return { ok: true, agentId };
}
private createClient(url: string): GatewayClient {
@ -285,10 +425,11 @@ export class Hub {
// Regular chat message
const payload = msg.payload as { agentId?: string; conversationId?: string; content?: string } | undefined;
const agentId = payload?.agentId;
const conversationId = this.resolveConversationId(agentId, payload?.conversationId);
const incomingAgentId = payload?.agentId;
const conversationId = this.resolveConversationId(incomingAgentId, payload?.conversationId);
const agentId = this.resolveAgentId(incomingAgentId, conversationId);
const content = payload?.content;
if (!agentId || !content) {
if (!incomingAgentId || !content) {
console.warn(`[Hub] Invalid payload, missing agentId or content`);
return;
}
@ -306,7 +447,7 @@ export class Hub {
});
agent.write(content, { source });
} else {
console.warn(`[Hub] Conversation not found or closed: ${conversationId} (agent=${agentId})`);
console.warn(`[Hub] Conversation not found or closed: ${conversationId} (agent=${incomingAgentId})`);
}
});
@ -348,7 +489,10 @@ export class Hub {
/** Register a one-time token for device verification (called when QR code is generated) */
registerToken(token: string, agentId: string, expiresAt: number): void {
this.deviceStore.registerToken(token, agentId, expiresAt);
const normalizedAgentId = this.normalizeId(agentId);
if (!normalizedAgentId) return;
const resolvedAgentId = this.conversationAgents.get(normalizedAgentId) ?? normalizedAgentId;
this.deviceStore.registerToken(token, resolvedAgentId, expiresAt);
}
/** 重连到新的 Gateway 地址 */
@ -375,47 +519,96 @@ export class Hub {
return this.approvalManager.resolveApproval(approvalId, decision);
}
/** Create new Agent, or rebuild with existing ID */
createAgent(id?: string, options?: { persist?: boolean; profileId?: string }): AsyncAgent {
if (id) {
const existing = this.agents.get(id);
/** Create a logical agent and its main conversation runtime. */
createAgent(
id?: string,
options?: { persist?: boolean; profileId?: string; mainConversationId?: string; createdAt?: number },
): AsyncAgent {
const agentId = this.normalizeId(id) ?? uuidv7();
const existingMainConversationId = this.resolveAgentMainConversationId(agentId);
if (existingMainConversationId) {
const existing = this.agents.get(existingMainConversationId);
if (existing && !existing.closed) {
return existing;
}
}
const profileId = options?.profileId ?? "default";
const sessionId = id ?? uuidv7();
const onExecApprovalNeeded = this.createExecApprovalCallback(sessionId, profileId);
const onChannelSendFile = this.createChannelSendFileCallback(sessionId);
const channels = this.channelManager.listChannelInfos();
const agent = new AsyncAgent({ sessionId, profileId, onExecApprovalNeeded, onChannelSendFile, channels });
this.agents.set(agent.sessionId, agent);
// Persist to agent store (skip during restore to avoid duplicates)
if (options?.persist !== false) {
addAgentRecord({ id: agent.sessionId, createdAt: Date.now() });
}
// Internally consume agent output (AgentEvent stream + error Messages)
void this.consumeAgent(agent);
this.heartbeatRunner?.updateConfig();
console.log(`Agent created: ${agent.sessionId}`);
return agent;
const mainConversationId = this.normalizeId(options?.mainConversationId) ?? agentId;
return this.createConversation(mainConversationId, {
agentId,
profileId: options?.profileId,
persist: options?.persist,
isMainConversation: true,
createdAt: options?.createdAt,
});
}
/**
* Create a new conversation runtime.
*
* Semantics:
* - Agent = capability/profile definition
* - Conversation = one isolated runtime/session thread
*
* Current runtime stores one AsyncAgent per conversation.
* - Agent = long-lived capability/profile identity
* - Conversation = isolated runtime/session thread
*/
createConversation(id?: string, options?: { persist?: boolean; profileId?: string }): AsyncAgent {
return this.createAgent(id, options);
createConversation(
id?: string,
options?: {
persist?: boolean;
profileId?: string;
agentId?: string;
isMainConversation?: boolean;
createdAt?: number;
},
): AsyncAgent {
const conversationId = this.normalizeId(id) ?? uuidv7();
const existing = this.agents.get(conversationId);
if (existing && !existing.closed) {
return existing;
}
const targetAgentId = this.resolveTargetAgentId(options?.agentId, conversationId);
const profileId = options?.profileId ?? this.agentProfiles.get(targetAgentId) ?? "default";
const createdAt = options?.createdAt ?? Date.now();
const persist = options?.persist !== false;
this.registerAgent(targetAgentId, {
profileId,
createdAt,
persist,
});
const onExecApprovalNeeded = this.createExecApprovalCallback(conversationId, targetAgentId, profileId);
const onChannelSendFile = this.createChannelSendFileCallback(conversationId);
const channels = this.channelManager.listChannelInfos();
const agent = new AsyncAgent({
sessionId: conversationId,
profileId,
onExecApprovalNeeded,
onChannelSendFile,
channels,
});
this.agents.set(conversationId, agent);
this.conversationAgents.set(conversationId, targetAgentId);
if (options?.isMainConversation || !this.agentMainConversations.has(targetAgentId)) {
this.agentMainConversations.set(targetAgentId, conversationId);
}
if (persist) {
upsertConversationRecord({
id: conversationId,
agentId: targetAgentId,
createdAt,
profileId,
});
}
// Internally consume agent output (AgentEvent stream + error Messages)
void this.consumeAgent(agent);
this.heartbeatRunner?.updateConfig();
console.log(`[Hub] Conversation created: ${conversationId} (agent: ${targetAgentId})`);
return agent;
}
private getMessageIdFromEvent(event: unknown): string | undefined {
@ -427,10 +620,17 @@ export class Hub {
}
private resolveConversationId(agentId: string | undefined, conversationId?: string): string {
const fallback = (agentId ?? "").trim();
if (!fallback) return "";
const normalizedConversationId = (conversationId ?? "").trim();
return normalizedConversationId || fallback;
const normalizedConversationId = this.normalizeId(conversationId);
if (normalizedConversationId) return normalizedConversationId;
const normalizedAgentId = this.normalizeId(agentId);
if (!normalizedAgentId) return "";
const mainConversationId = this.resolveAgentMainConversationId(normalizedAgentId);
if (mainConversationId) return mainConversationId;
// Legacy fallback: treat agentId as conversationId when no mapping exists yet.
return normalizedAgentId;
}
private beginStream(agentId: string, event: unknown): string {
@ -465,7 +665,7 @@ export class Hub {
/** Internally read agent output and send via Gateway */
private async consumeAgent(agent: AsyncAgent): Promise<void> {
const conversationId = agent.sessionId;
const agentId = agent.sessionId;
const agentId = this.conversationAgents.get(conversationId) ?? conversationId;
for await (const item of agent.read()) {
const targetDeviceId = this.agentSenders.get(conversationId);
if (!targetDeviceId) continue;
@ -619,7 +819,7 @@ export class Hub {
* Create an exec approval callback for an agent.
* This wires the safety evaluation + Hub approval manager together.
*/
private createExecApprovalCallback(conversationId: string, profileId: string): ExecApprovalCallback {
private createExecApprovalCallback(conversationId: string, agentId: string, profileId: string): ExecApprovalCallback {
return async (command: string, cwd: string | undefined): Promise<ApprovalResult> => {
// Load exec approval config from profile
let config: ExecApprovalConfig = {};
@ -673,7 +873,7 @@ export class Hub {
// Request approval via Hub → Gateway → Client
const result = await this.approvalManager.requestApproval({
agentId: conversationId,
agentId,
conversationId,
command,
...(cwd !== undefined ? { cwd } : {}),
@ -740,21 +940,57 @@ export class Hub {
}
getAgent(id: string): AsyncAgent | undefined {
return this.agents.get(id);
const normalizedId = this.normalizeId(id);
if (!normalizedId) return undefined;
const directConversation = this.agents.get(normalizedId);
if (directConversation && !directConversation.closed) {
return directConversation;
}
const mainConversationId = this.resolveAgentMainConversationId(normalizedId);
if (!mainConversationId) return undefined;
const mainConversation = this.agents.get(mainConversationId);
if (!mainConversation || mainConversation.closed) return undefined;
return mainConversation;
}
getConversation(id: string): AsyncAgent | undefined {
return this.getAgent(id);
const normalizedId = this.normalizeId(id);
if (!normalizedId) return undefined;
const conversation = this.agents.get(normalizedId);
if (!conversation || conversation.closed) return undefined;
return conversation;
}
getConversationAgentId(conversationId: string): string | undefined {
const normalizedConversationId = this.normalizeId(conversationId);
if (!normalizedConversationId) return undefined;
return this.conversationAgents.get(normalizedConversationId);
}
getAgentMainConversationId(agentId: string): string | undefined {
const normalizedAgentId = this.normalizeId(agentId);
if (!normalizedAgentId) return undefined;
return this.resolveAgentMainConversationId(normalizedAgentId);
}
listAgents(): string[] {
return Array.from(this.agents.entries())
.filter(([, a]) => !a.closed)
.map(([id]) => id);
const activeAgentIds = new Set<string>();
for (const [conversationId, runtime] of this.agents.entries()) {
if (runtime.closed) continue;
const agentId = this.conversationAgents.get(conversationId);
if (agentId) {
activeAgentIds.add(agentId);
}
}
return Array.from(activeAgentIds.values());
}
listConversations(): string[] {
return this.listAgents();
return Array.from(this.agents.entries())
.filter(([conversationId, runtime]) => !runtime.closed && this.conversationAgents.has(conversationId))
.map(([conversationId]) => conversationId);
}
/** Subscribe heartbeat state updates. Returns unsubscribe callback. */
@ -810,29 +1046,60 @@ export class Hub {
/** Enqueue a system event for a specific agent or the default agent. */
enqueueSystemEvent(text: string, opts?: { agentId?: string }): void {
const agentId = opts?.agentId ?? this.listAgents()[0];
if (!agentId) return;
enqueueSystemEvent(text, { sessionKey: agentId });
const conversationId = this.resolveConversationId(agentId, undefined);
if (!conversationId) return;
enqueueSystemEvent(text, { sessionKey: conversationId });
}
closeAgent(id: string): boolean {
const agent = this.agents.get(id);
if (!agent) return false;
agent.close();
this.approvalManager.cancelPending(id);
this.agents.delete(id);
this.agentSenders.delete(id);
this.agentStreamIds.delete(id);
this.agentStreamCounters.delete(id);
this.clearPendingAssistantStarts(id);
this.suppressedStreamAgents.delete(id);
this.localApprovalHandlers.delete(id);
removeAgentRecord(id);
const normalizedId = this.normalizeId(id);
if (!normalizedId) return false;
const resolvedAgentId = this.agentMainConversations.has(normalizedId)
? normalizedId
: this.conversationAgents.get(normalizedId) ?? normalizedId;
const conversationIds = this.listConversationIdsForAgent(resolvedAgentId);
if (conversationIds.length === 0) {
return this.closeConversation(normalizedId);
}
let closedAny = false;
for (const conversationId of conversationIds) {
const closed = this.closeConversationRuntime(conversationId, { persist: false });
closedAny = closedAny || closed.ok;
}
if (!closedAny) return false;
this.agentMainConversations.delete(resolvedAgentId);
this.agentProfiles.delete(resolvedAgentId);
removeAgentRecordById(resolvedAgentId);
this.heartbeatRunner?.updateConfig();
return true;
return closedAny;
}
closeConversation(id: string): boolean {
return this.closeAgent(id);
const normalizedId = this.normalizeId(id);
if (!normalizedId) return false;
const conversationId = this.agents.has(normalizedId)
? normalizedId
: this.resolveAgentMainConversationId(normalizedId);
if (!conversationId) return false;
const { ok, agentId } = this.closeConversationRuntime(conversationId);
if (!ok || !agentId) return false;
const currentMainConversationId = this.agentMainConversations.get(agentId);
if (currentMainConversationId === conversationId) {
this.agentMainConversations.delete(agentId);
const replacementConversationId = this.listConversationIdsForAgent(agentId)[0];
if (replacementConversationId) {
this.agentMainConversations.set(agentId, replacementConversationId);
}
}
this.clearAgentIfNoConversation(agentId);
this.heartbeatRunner?.updateConfig();
return true;
}
shutdown(): void {
@ -847,16 +1114,19 @@ export class Hub {
this.heartbeatUnsubscribe = null;
this.heartbeatListeners.clear();
for (const [id, agent] of this.agents) {
for (const [conversationId, agent] of this.agents) {
agent.close();
this.agents.delete(id);
this.agentSenders.delete(id);
this.agentStreamIds.delete(id);
this.agentStreamCounters.delete(id);
this.clearPendingAssistantStarts(id);
this.suppressedStreamAgents.delete(id);
this.localApprovalHandlers.delete(id);
this.agents.delete(conversationId);
this.conversationAgents.delete(conversationId);
this.agentSenders.delete(conversationId);
this.agentStreamIds.delete(conversationId);
this.agentStreamCounters.delete(conversationId);
this.clearPendingAssistantStarts(conversationId);
this.suppressedStreamAgents.delete(conversationId);
this.localApprovalHandlers.delete(conversationId);
}
this.agentMainConversations.clear();
this.agentProfiles.clear();
this.client.disconnect();
console.log("Hub shut down");
}

View file

@ -2,13 +2,13 @@ import { describe, it, expect, vi } from "vitest";
import { createCreateConversationHandler } from "./create-conversation.js";
describe("createCreateConversationHandler", () => {
it("creates conversation with explicit id", () => {
it("creates conversation with explicit id and agent id", () => {
const createConversation = vi.fn(() => ({ sessionId: "conv-1" }));
const handler = createCreateConversationHandler({ createConversation });
const result = handler({ id: "custom-id" }, "device-1") as { id: string };
const result = handler({ id: "custom-id", agentId: "agent-1" }, "device-1") as { id: string };
expect(createConversation).toHaveBeenCalledWith("custom-id");
expect(createConversation).toHaveBeenCalledWith("custom-id", { agentId: "agent-1" });
expect(result).toEqual({ id: "conv-1" });
});
@ -18,7 +18,7 @@ describe("createCreateConversationHandler", () => {
const result = handler(undefined, "device-1") as { id: string };
expect(createConversation).toHaveBeenCalledWith(undefined);
expect(createConversation).toHaveBeenCalledWith(undefined, { agentId: undefined });
expect(result).toEqual({ id: "conv-2" });
});
});

View file

@ -1,13 +1,13 @@
import type { RpcHandler } from "../dispatcher.js";
interface HubLike {
createConversation(id?: string): { sessionId: string };
createConversation(id?: string, options?: { agentId?: string }): { sessionId: string };
}
export function createCreateConversationHandler(hub: HubLike): RpcHandler {
return (params: unknown) => {
const { id } = (params ?? {}) as { id?: string };
const conversation = hub.createConversation(id);
const { id, agentId } = (params ?? {}) as { id?: string; agentId?: string };
const conversation = hub.createConversation(id, { agentId });
return { id: conversation.sessionId };
};
}

View file

@ -13,7 +13,9 @@ interface GetAgentMessagesParams {
limit?: number;
}
export function createGetAgentMessagesHandler(): RpcHandler {
type ConversationResolver = (agentId: string, conversationId?: string) => string;
export function createGetAgentMessagesHandler(resolveConversationId?: ConversationResolver): RpcHandler {
return (params: unknown) => {
if (!params || typeof params !== "object") {
throw new RpcError("INVALID_PARAMS", "params must be an object");
@ -23,7 +25,10 @@ export function createGetAgentMessagesHandler(): RpcHandler {
if (!agentId) {
throw new RpcError("INVALID_PARAMS", "Missing required param: agentId");
}
const resolvedConversationId = (conversationId ?? "").trim() || agentId;
const fallbackConversationId = (conversationId ?? "").trim() || agentId;
const resolvedConversationId = resolveConversationId
? resolveConversationId(agentId, conversationId)
: fallbackConversationId;
const sessionPath = resolveSessionPath(resolvedConversationId);
if (!existsSync(sessionPath)) {

View file

@ -5,6 +5,7 @@ import type { DeviceStore, DeviceMeta } from "../../device-store.js";
interface VerifyContext {
hubId: string;
deviceStore: DeviceStore;
resolveMainConversationId?: (agentId: string) => string | undefined;
/** Called for first-time connections. Returns true if user approves, false if rejected. */
onConfirmDevice: (deviceId: string, agentId: string, meta?: DeviceMeta) => Promise<boolean>;
}
@ -21,10 +22,11 @@ export function createVerifyHandler(ctx: VerifyContext): RpcHandler {
// 1. Already in whitelist → pass through (reconnection, no confirmation needed)
const allowed = ctx.deviceStore.isAllowed(from);
if (allowed) {
const mainConversationId = ctx.resolveMainConversationId?.(allowed.agentId) ?? allowed.agentId;
return {
hubId: ctx.hubId,
agentId: allowed.agentId,
mainConversationId: allowed.agentId,
mainConversationId,
isNewDevice: false,
};
}
@ -47,10 +49,11 @@ export function createVerifyHandler(ctx: VerifyContext): RpcHandler {
// 4. User confirmed → add to whitelist (with device metadata)
ctx.deviceStore.allowDevice(from, result.agentId, meta);
const mainConversationId = ctx.resolveMainConversationId?.(result.agentId) ?? result.agentId;
return {
hubId: ctx.hubId,
agentId: result.agentId,
mainConversationId: result.agentId,
mainConversationId,
isNewDevice: true,
};
};

View file

@ -127,9 +127,10 @@ export interface CreateAgentResult {
id: string;
}
/** createConversation - request params (conversation-first alias of createAgent) */
/** createConversation - request params (create a conversation, optionally under a specific agent) */
export interface CreateConversationParams {
id?: string;
agentId?: string;
}
/** createConversation - response payload */