diff --git a/packages/core/src/agent/run-log.ts b/packages/core/src/agent/run-log.ts index 29b52585..3890ea92 100644 --- a/packages/core/src/agent/run-log.ts +++ b/packages/core/src/agent/run-log.ts @@ -71,7 +71,7 @@ import { join } from "path"; import { mkdirSync } from "fs"; import { appendFile } from "fs/promises"; -import { resolveBaseDir, type SessionStorageOptions } from "./session/storage.js"; +import { ensureSessionDir, resolveSessionDir, type SessionStorageOptions } from "./session/storage.js"; export interface RunLog { log(event: string, data?: Record): void; @@ -85,16 +85,10 @@ class FileRunLog implements RunLog { private flushScheduled = false; constructor(sessionId: string, options?: SessionStorageOptions) { - const sessionDir = join(resolveBaseDir(options), sessionId); - try { - mkdirSync(sessionDir, { recursive: true }); - } catch (err) { - if ((err as NodeJS.ErrnoException).code === "ENOENT") { - mkdirSync(sessionDir, { recursive: true }); - } else { - throw err; - } - } + ensureSessionDir(sessionId, options); + const sessionDir = resolveSessionDir(sessionId, options); + // keep an extra guard for run-log-only writes when session dir is cleaned externally + mkdirSync(sessionDir, { recursive: true }); this.filePath = join(sessionDir, "run-log.jsonl"); } diff --git a/packages/core/src/agent/runner.ts b/packages/core/src/agent/runner.ts index 320e76f1..47716152 100644 --- a/packages/core/src/agent/runner.ts +++ b/packages/core/src/agent/runner.ts @@ -400,12 +400,17 @@ export class Agent { // Load session metadata early so stored provider/model can inform defaults this.sessionId = options.sessionId ?? uuidv7(); this.guardedExecApproval = this.createGuardedExecApprovalCallback(options.onExecApprovalNeeded); + const storageAgentId = options.ownerAgentId; this.runLog = createRunLog( options.enableRunLog ?? !!process.env.MULTICA_RUN_LOG, this.sessionId, + storageAgentId ? { agentId: storageAgentId } : undefined, ); const storedMeta = (() => { - const tempSession = new SessionManager({ sessionId: this.sessionId }); + const tempSession = new SessionManager({ + sessionId: this.sessionId, + ...(storageAgentId ? { agentId: storageAgentId } : {}), + }); return tempSession.getMeta(); })(); @@ -554,6 +559,7 @@ export class Agent { // 创建 SessionManager(带 context window 配置) this.session = new SessionManager({ sessionId: this.sessionId, + ...(storageAgentId ? { agentId: storageAgentId } : {}), compactionMode, // Token 模式参数 contextWindowTokens: this.contextWindowGuard.tokens, diff --git a/packages/core/src/agent/session/session-manager.ts b/packages/core/src/agent/session/session-manager.ts index c51ea1bd..d5b68d05 100644 --- a/packages/core/src/agent/session/session-manager.ts +++ b/packages/core/src/agent/session/session-manager.ts @@ -1,7 +1,13 @@ import type { AgentMessage } from "@mariozechner/pi-agent-core"; import { getModel, type Model, type UserMessage } from "@mariozechner/pi-ai"; import type { SessionEntry, SessionMeta } from "./types.js"; -import { appendEntry, readEntries, resolveSessionPath, writeEntries } from "./storage.js"; +import { + appendEntry, + readEntries, + resolveSessionPath, + writeEntries, + type SessionStorageOptions, +} from "./storage.js"; import { compactMessages, compactMessagesAsync, type CompactionResult } from "./compaction.js"; import { estimateTokenUsage, estimateMessagesTokens, shouldCompact as shouldCompactTokens } from "../context-window/index.js"; import { credentialManager } from "../credentials.js"; @@ -36,6 +42,8 @@ function getSummaryApiKey(): string | undefined { export type SessionManagerOptions = { sessionId: string; baseDir?: string | undefined; + /** Logical owner agent ID for hierarchical session storage. */ + agentId?: string | undefined; // Compaction mode configuration /** Compaction mode: "tokens" uses token awareness, "summary" uses LLM summary (default) */ @@ -81,6 +89,7 @@ export type SessionManagerOptions = { export class SessionManager { private readonly sessionId: string; private readonly baseDir: string | undefined; + private readonly agentId: string | undefined; private readonly compactionMode: "tokens" | "summary"; // Token mode private readonly contextWindowTokens: number; @@ -108,6 +117,7 @@ export class SessionManager { constructor(options: SessionManagerOptions) { this.sessionId = options.sessionId; this.baseDir = options.baseDir; + this.agentId = options.agentId; // Compaction mode (default: summary with LLM-based summarization) this.compactionMode = options.compactionMode ?? "summary"; @@ -174,11 +184,11 @@ export class SessionManager { } loadEntries(): SessionEntry[] { - return readEntries(this.sessionId, { baseDir: this.baseDir }); + return readEntries(this.sessionId, this.getStorageOptions()); } async repairIfNeeded(warn?: (message: string) => void): Promise { - const filePath = resolveSessionPath(this.sessionId, { baseDir: this.baseDir }); + const filePath = resolveSessionPath(this.sessionId, this.getStorageOptions()); return repairSessionFileIfNeeded({ sessionFile: filePath, ...(warn !== undefined ? { warn } : {}) }); } @@ -240,7 +250,7 @@ export class SessionManager { appendEntry( this.sessionId, { type: "meta", meta, timestamp: Date.now() }, - { baseDir: this.baseDir }, + this.getStorageOptions(), ), ); } @@ -258,7 +268,7 @@ export class SessionManager { contextWindowTokens: this.contextWindowTokens, settings: this.toolResultTruncation, saveArtifact: (toolCallId, content) => - saveToolResultArtifact(this.sessionId, toolCallId, content, { baseDir: this.baseDir }), + saveToolResultArtifact(this.sessionId, toolCallId, content, this.getStorageOptions()), }); if (result.truncated) { persistMessage = result.message; @@ -286,7 +296,7 @@ export class SessionManager { : {}), ...(options?.source !== undefined ? { source: options.source } : {}), }, - { baseDir: this.baseDir }, + this.getStorageOptions(), ), ); } @@ -463,7 +473,7 @@ export class SessionManager { }); await this.enqueue(() => - writeEntries(this.sessionId, entries, { baseDir: this.baseDir }), + writeEntries(this.sessionId, entries, this.getStorageOptions()), ); return result; } @@ -483,4 +493,11 @@ export class SessionManager { }); return this.queue; } + + private getStorageOptions(): SessionStorageOptions { + return { + ...(this.baseDir !== undefined ? { baseDir: this.baseDir } : {}), + ...(this.agentId !== undefined ? { agentId: this.agentId } : {}), + }; + } } diff --git a/packages/core/src/agent/session/storage.test.ts b/packages/core/src/agent/session/storage.test.ts index 37c88bef..a97b4c7e 100644 --- a/packages/core/src/agent/session/storage.test.ts +++ b/packages/core/src/agent/session/storage.test.ts @@ -58,6 +58,11 @@ describe("session/storage", () => { const result = resolveSessionDir("session-123-abc", { baseDir: testBaseDir }); expect(result).toBe(join(testBaseDir, "session-123-abc")); }); + + it("should return hierarchical path when agentId is provided", () => { + const result = resolveSessionDir("conv-1", { baseDir: testBaseDir, agentId: "agent-1" }); + expect(result).toBe(join(testBaseDir, "agent-1", "conv-1")); + }); }); describe("resolveSessionPath", () => { @@ -65,6 +70,11 @@ describe("session/storage", () => { const result = resolveSessionPath("test-session", { baseDir: testBaseDir }); expect(result).toBe(join(testBaseDir, "test-session", "session.jsonl")); }); + + it("should return hierarchical path when agentId is provided", () => { + const result = resolveSessionPath("conv-1", { baseDir: testBaseDir, agentId: "agent-1" }); + expect(result).toBe(join(testBaseDir, "agent-1", "conv-1", "session.jsonl")); + }); }); describe("ensureSessionDir", () => { @@ -84,6 +94,20 @@ describe("session/storage", () => { expect(() => ensureSessionDir(sessionId, { baseDir: testBaseDir })).not.toThrow(); expect(existsSync(dir)).toBe(true); }); + + it("should migrate legacy flat directory into hierarchical path", () => { + const sessionId = "legacy-migrate"; + const legacyDir = join(testBaseDir, sessionId); + mkdirSync(legacyDir, { recursive: true }); + writeFileSync(join(legacyDir, "session.jsonl"), '{"type":"meta","meta":{},"timestamp":1}\n'); + + ensureSessionDir(sessionId, { baseDir: testBaseDir, agentId: "agent-1" }); + + const nextDir = join(testBaseDir, "agent-1", sessionId); + expect(existsSync(nextDir)).toBe(true); + expect(existsSync(join(nextDir, "session.jsonl"))).toBe(true); + expect(existsSync(legacyDir)).toBe(false); + }); }); describe("readEntries", () => { @@ -102,6 +126,21 @@ describe("session/storage", () => { expect(entries).toEqual([]); }); + it("should read legacy flat session when hierarchical path is requested", () => { + const sessionId = "legacy-read"; + const legacyDir = join(testBaseDir, sessionId); + mkdirSync(legacyDir, { recursive: true }); + const entry: SessionEntry = { + type: "message", + message: { role: "user", content: "legacy" } as any, + timestamp: 1000, + }; + writeFileSync(join(legacyDir, "session.jsonl"), `${JSON.stringify(entry)}\n`); + + const entries = readEntries(sessionId, { baseDir: testBaseDir, agentId: "agent-1" }); + expect(entries).toEqual([entry]); + }); + it("should parse valid JSONL entries", () => { const sessionId = "valid-session"; const dir = join(testBaseDir, sessionId); diff --git a/packages/core/src/agent/session/storage.ts b/packages/core/src/agent/session/storage.ts index 51c8245c..823dd01c 100644 --- a/packages/core/src/agent/session/storage.ts +++ b/packages/core/src/agent/session/storage.ts @@ -1,5 +1,5 @@ import { join } from "path"; -import { existsSync, mkdirSync, readFileSync, writeFileSync } from "fs"; +import { existsSync, mkdirSync, readFileSync, renameSync, writeFileSync } from "fs"; import { appendFile, writeFile } from "fs/promises"; import { createHash } from "node:crypto"; import type { SessionEntry } from "./types.js"; @@ -8,6 +8,8 @@ import { acquireSessionWriteLock } from "./session-write-lock.js"; export type SessionStorageOptions = { baseDir?: string | undefined; + /** Owner agent ID. When provided, sessions are stored under agent/conversation hierarchy. */ + agentId?: string | undefined; }; /** Minimum base64 data length to externalize (32KB decoded ≈ 43KB base64) */ @@ -17,10 +19,35 @@ export function resolveBaseDir(options?: SessionStorageOptions) { return options?.baseDir ?? join(DATA_DIR, "sessions"); } -export function resolveSessionDir(sessionId: string, options?: SessionStorageOptions) { +function normalizeId(value: string | undefined): string | undefined { + const normalized = (value ?? "").trim(); + return normalized.length > 0 ? normalized : undefined; +} + +function resolveLegacySessionDir(sessionId: string, options?: SessionStorageOptions): string { return join(resolveBaseDir(options), sessionId); } +function resolvePreferredSessionDir(sessionId: string, options?: SessionStorageOptions): string { + const normalizedAgentId = normalizeId(options?.agentId); + if (!normalizedAgentId) { + return resolveLegacySessionDir(sessionId, options); + } + return join(resolveBaseDir(options), normalizedAgentId, sessionId); +} + +function resolveExistingSessionDir(sessionId: string, options?: SessionStorageOptions): string { + const preferred = resolvePreferredSessionDir(sessionId, options); + if (existsSync(preferred)) return preferred; + const legacy = resolveLegacySessionDir(sessionId, options); + if (legacy !== preferred && existsSync(legacy)) return legacy; + return preferred; +} + +export function resolveSessionDir(sessionId: string, options?: SessionStorageOptions) { + return resolvePreferredSessionDir(sessionId, options); +} + export function resolveSessionPath(sessionId: string, options?: SessionStorageOptions) { return join(resolveSessionDir(sessionId, options), "session.jsonl"); } @@ -30,6 +57,19 @@ export function resolveMediaDir(sessionId: string, options?: SessionStorageOptio } export function ensureSessionDir(sessionId: string, options?: SessionStorageOptions) { + const preferredDir = resolvePreferredSessionDir(sessionId, options); + const legacyDir = resolveLegacySessionDir(sessionId, options); + + if (preferredDir !== legacyDir && existsSync(legacyDir) && !existsSync(preferredDir)) { + try { + mkdirSync(join(preferredDir, ".."), { recursive: true }); + renameSync(legacyDir, preferredDir); + return; + } catch { + // Fall through to normal mkdir flow below. + } + } + const dir = resolveSessionDir(sessionId, options); // mkdirSync with recursive is idempotent (no-op if dir exists), // so skip the existsSync check to avoid a TOCTOU race. @@ -127,7 +167,7 @@ function internalizeBlock( // Format A ref: { type: "image", $ref: "media/.bin" } if (typeof block.$ref === "string") { - const filePath = join(resolveSessionDir(sessionId, options), block.$ref); + const filePath = join(resolveExistingSessionDir(sessionId, options), block.$ref); try { const buffer = readFileSync(filePath); const data = buffer.toString("base64"); @@ -140,7 +180,7 @@ function internalizeBlock( // Format B ref: { type: "image", source: { type: "$ref", path: "media/.bin" } } if (block.source && typeof block.source === "object" && block.source.type === "$ref") { - const filePath = join(resolveSessionDir(sessionId, options), block.source.path); + const filePath = join(resolveExistingSessionDir(sessionId, options), block.source.path); try { const buffer = readFileSync(filePath); const data = buffer.toString("base64"); @@ -240,7 +280,7 @@ function internalizeImages( // ─── Public API ───────────────────────────────────────────────────────────── export function readEntries(sessionId: string, options?: SessionStorageOptions): SessionEntry[] { - const filePath = resolveSessionPath(sessionId, options); + const filePath = join(resolveExistingSessionDir(sessionId, options), "session.jsonl"); if (!existsSync(filePath)) return []; const content = readFileSync(filePath, "utf8"); const lines = content.split("\n").filter(Boolean); diff --git a/packages/core/src/agent/types.ts b/packages/core/src/agent/types.ts index 7663bba6..bc895d1a 100644 --- a/packages/core/src/agent/types.ts +++ b/packages/core/src/agent/types.ts @@ -41,6 +41,8 @@ export type AgentOptions = { /** Command execution directory */ cwd?: string | undefined; sessionId?: string | undefined; + /** Logical owner agent ID for hierarchical session storage (agent/conversation). */ + ownerAgentId?: string | undefined; logger?: AgentLogger | undefined; // === Context Window Guard Configuration === diff --git a/packages/core/src/hub/hub.ts b/packages/core/src/hub/hub.ts index 5b648091..00862897 100644 --- a/packages/core/src/hub/hub.ts +++ b/packages/core/src/hub/hub.ts @@ -620,6 +620,7 @@ export class Hub { const channels = this.channelManager.listChannelInfos(); const agent = new AsyncAgent({ sessionId: conversationId, + ownerAgentId: targetAgentId, profileId, onExecApprovalNeeded, onChannelSendFile,