refactor(session): add agent/conversation hierarchical storage

This commit is contained in:
Jiayuan Zhang 2026-02-17 03:46:30 +08:00
parent a0bb88e7b7
commit 4de89943f2
7 changed files with 123 additions and 24 deletions

View file

@ -71,7 +71,7 @@
import { join } from "path";
import { mkdirSync } from "fs";
import { appendFile } from "fs/promises";
import { resolveBaseDir, type SessionStorageOptions } from "./session/storage.js";
import { ensureSessionDir, resolveSessionDir, type SessionStorageOptions } from "./session/storage.js";
export interface RunLog {
log(event: string, data?: Record<string, unknown>): void;
@ -85,16 +85,10 @@ class FileRunLog implements RunLog {
private flushScheduled = false;
constructor(sessionId: string, options?: SessionStorageOptions) {
const sessionDir = join(resolveBaseDir(options), sessionId);
try {
mkdirSync(sessionDir, { recursive: true });
} catch (err) {
if ((err as NodeJS.ErrnoException).code === "ENOENT") {
mkdirSync(sessionDir, { recursive: true });
} else {
throw err;
}
}
ensureSessionDir(sessionId, options);
const sessionDir = resolveSessionDir(sessionId, options);
// keep an extra guard for run-log-only writes when session dir is cleaned externally
mkdirSync(sessionDir, { recursive: true });
this.filePath = join(sessionDir, "run-log.jsonl");
}

View file

@ -400,12 +400,17 @@ export class Agent {
// Load session metadata early so stored provider/model can inform defaults
this.sessionId = options.sessionId ?? uuidv7();
this.guardedExecApproval = this.createGuardedExecApprovalCallback(options.onExecApprovalNeeded);
const storageAgentId = options.ownerAgentId;
this.runLog = createRunLog(
options.enableRunLog ?? !!process.env.MULTICA_RUN_LOG,
this.sessionId,
storageAgentId ? { agentId: storageAgentId } : undefined,
);
const storedMeta = (() => {
const tempSession = new SessionManager({ sessionId: this.sessionId });
const tempSession = new SessionManager({
sessionId: this.sessionId,
...(storageAgentId ? { agentId: storageAgentId } : {}),
});
return tempSession.getMeta();
})();
@ -554,6 +559,7 @@ export class Agent {
// 创建 SessionManager带 context window 配置)
this.session = new SessionManager({
sessionId: this.sessionId,
...(storageAgentId ? { agentId: storageAgentId } : {}),
compactionMode,
// Token 模式参数
contextWindowTokens: this.contextWindowGuard.tokens,

View file

@ -1,7 +1,13 @@
import type { AgentMessage } from "@mariozechner/pi-agent-core";
import { getModel, type Model, type UserMessage } from "@mariozechner/pi-ai";
import type { SessionEntry, SessionMeta } from "./types.js";
import { appendEntry, readEntries, resolveSessionPath, writeEntries } from "./storage.js";
import {
appendEntry,
readEntries,
resolveSessionPath,
writeEntries,
type SessionStorageOptions,
} from "./storage.js";
import { compactMessages, compactMessagesAsync, type CompactionResult } from "./compaction.js";
import { estimateTokenUsage, estimateMessagesTokens, shouldCompact as shouldCompactTokens } from "../context-window/index.js";
import { credentialManager } from "../credentials.js";
@ -36,6 +42,8 @@ function getSummaryApiKey(): string | undefined {
export type SessionManagerOptions = {
sessionId: string;
baseDir?: string | undefined;
/** Logical owner agent ID for hierarchical session storage. */
agentId?: string | undefined;
// Compaction mode configuration
/** Compaction mode: "tokens" uses token awareness, "summary" uses LLM summary (default) */
@ -81,6 +89,7 @@ export type SessionManagerOptions = {
export class SessionManager {
private readonly sessionId: string;
private readonly baseDir: string | undefined;
private readonly agentId: string | undefined;
private readonly compactionMode: "tokens" | "summary";
// Token mode
private readonly contextWindowTokens: number;
@ -108,6 +117,7 @@ export class SessionManager {
constructor(options: SessionManagerOptions) {
this.sessionId = options.sessionId;
this.baseDir = options.baseDir;
this.agentId = options.agentId;
// Compaction mode (default: summary with LLM-based summarization)
this.compactionMode = options.compactionMode ?? "summary";
@ -174,11 +184,11 @@ export class SessionManager {
}
loadEntries(): SessionEntry[] {
return readEntries(this.sessionId, { baseDir: this.baseDir });
return readEntries(this.sessionId, this.getStorageOptions());
}
async repairIfNeeded(warn?: (message: string) => void): Promise<RepairReport> {
const filePath = resolveSessionPath(this.sessionId, { baseDir: this.baseDir });
const filePath = resolveSessionPath(this.sessionId, this.getStorageOptions());
return repairSessionFileIfNeeded({ sessionFile: filePath, ...(warn !== undefined ? { warn } : {}) });
}
@ -240,7 +250,7 @@ export class SessionManager {
appendEntry(
this.sessionId,
{ type: "meta", meta, timestamp: Date.now() },
{ baseDir: this.baseDir },
this.getStorageOptions(),
),
);
}
@ -258,7 +268,7 @@ export class SessionManager {
contextWindowTokens: this.contextWindowTokens,
settings: this.toolResultTruncation,
saveArtifact: (toolCallId, content) =>
saveToolResultArtifact(this.sessionId, toolCallId, content, { baseDir: this.baseDir }),
saveToolResultArtifact(this.sessionId, toolCallId, content, this.getStorageOptions()),
});
if (result.truncated) {
persistMessage = result.message;
@ -286,7 +296,7 @@ export class SessionManager {
: {}),
...(options?.source !== undefined ? { source: options.source } : {}),
},
{ baseDir: this.baseDir },
this.getStorageOptions(),
),
);
}
@ -463,7 +473,7 @@ export class SessionManager {
});
await this.enqueue(() =>
writeEntries(this.sessionId, entries, { baseDir: this.baseDir }),
writeEntries(this.sessionId, entries, this.getStorageOptions()),
);
return result;
}
@ -483,4 +493,11 @@ export class SessionManager {
});
return this.queue;
}
private getStorageOptions(): SessionStorageOptions {
return {
...(this.baseDir !== undefined ? { baseDir: this.baseDir } : {}),
...(this.agentId !== undefined ? { agentId: this.agentId } : {}),
};
}
}

View file

@ -58,6 +58,11 @@ describe("session/storage", () => {
const result = resolveSessionDir("session-123-abc", { baseDir: testBaseDir });
expect(result).toBe(join(testBaseDir, "session-123-abc"));
});
it("should return hierarchical path when agentId is provided", () => {
const result = resolveSessionDir("conv-1", { baseDir: testBaseDir, agentId: "agent-1" });
expect(result).toBe(join(testBaseDir, "agent-1", "conv-1"));
});
});
describe("resolveSessionPath", () => {
@ -65,6 +70,11 @@ describe("session/storage", () => {
const result = resolveSessionPath("test-session", { baseDir: testBaseDir });
expect(result).toBe(join(testBaseDir, "test-session", "session.jsonl"));
});
it("should return hierarchical path when agentId is provided", () => {
const result = resolveSessionPath("conv-1", { baseDir: testBaseDir, agentId: "agent-1" });
expect(result).toBe(join(testBaseDir, "agent-1", "conv-1", "session.jsonl"));
});
});
describe("ensureSessionDir", () => {
@ -84,6 +94,20 @@ describe("session/storage", () => {
expect(() => ensureSessionDir(sessionId, { baseDir: testBaseDir })).not.toThrow();
expect(existsSync(dir)).toBe(true);
});
it("should migrate legacy flat directory into hierarchical path", () => {
const sessionId = "legacy-migrate";
const legacyDir = join(testBaseDir, sessionId);
mkdirSync(legacyDir, { recursive: true });
writeFileSync(join(legacyDir, "session.jsonl"), '{"type":"meta","meta":{},"timestamp":1}\n');
ensureSessionDir(sessionId, { baseDir: testBaseDir, agentId: "agent-1" });
const nextDir = join(testBaseDir, "agent-1", sessionId);
expect(existsSync(nextDir)).toBe(true);
expect(existsSync(join(nextDir, "session.jsonl"))).toBe(true);
expect(existsSync(legacyDir)).toBe(false);
});
});
describe("readEntries", () => {
@ -102,6 +126,21 @@ describe("session/storage", () => {
expect(entries).toEqual([]);
});
it("should read legacy flat session when hierarchical path is requested", () => {
const sessionId = "legacy-read";
const legacyDir = join(testBaseDir, sessionId);
mkdirSync(legacyDir, { recursive: true });
const entry: SessionEntry = {
type: "message",
message: { role: "user", content: "legacy" } as any,
timestamp: 1000,
};
writeFileSync(join(legacyDir, "session.jsonl"), `${JSON.stringify(entry)}\n`);
const entries = readEntries(sessionId, { baseDir: testBaseDir, agentId: "agent-1" });
expect(entries).toEqual([entry]);
});
it("should parse valid JSONL entries", () => {
const sessionId = "valid-session";
const dir = join(testBaseDir, sessionId);

View file

@ -1,5 +1,5 @@
import { join } from "path";
import { existsSync, mkdirSync, readFileSync, writeFileSync } from "fs";
import { existsSync, mkdirSync, readFileSync, renameSync, writeFileSync } from "fs";
import { appendFile, writeFile } from "fs/promises";
import { createHash } from "node:crypto";
import type { SessionEntry } from "./types.js";
@ -8,6 +8,8 @@ import { acquireSessionWriteLock } from "./session-write-lock.js";
export type SessionStorageOptions = {
baseDir?: string | undefined;
/** Owner agent ID. When provided, sessions are stored under agent/conversation hierarchy. */
agentId?: string | undefined;
};
/** Minimum base64 data length to externalize (32KB decoded ≈ 43KB base64) */
@ -17,10 +19,35 @@ export function resolveBaseDir(options?: SessionStorageOptions) {
return options?.baseDir ?? join(DATA_DIR, "sessions");
}
export function resolveSessionDir(sessionId: string, options?: SessionStorageOptions) {
function normalizeId(value: string | undefined): string | undefined {
const normalized = (value ?? "").trim();
return normalized.length > 0 ? normalized : undefined;
}
function resolveLegacySessionDir(sessionId: string, options?: SessionStorageOptions): string {
return join(resolveBaseDir(options), sessionId);
}
function resolvePreferredSessionDir(sessionId: string, options?: SessionStorageOptions): string {
const normalizedAgentId = normalizeId(options?.agentId);
if (!normalizedAgentId) {
return resolveLegacySessionDir(sessionId, options);
}
return join(resolveBaseDir(options), normalizedAgentId, sessionId);
}
function resolveExistingSessionDir(sessionId: string, options?: SessionStorageOptions): string {
const preferred = resolvePreferredSessionDir(sessionId, options);
if (existsSync(preferred)) return preferred;
const legacy = resolveLegacySessionDir(sessionId, options);
if (legacy !== preferred && existsSync(legacy)) return legacy;
return preferred;
}
export function resolveSessionDir(sessionId: string, options?: SessionStorageOptions) {
return resolvePreferredSessionDir(sessionId, options);
}
export function resolveSessionPath(sessionId: string, options?: SessionStorageOptions) {
return join(resolveSessionDir(sessionId, options), "session.jsonl");
}
@ -30,6 +57,19 @@ export function resolveMediaDir(sessionId: string, options?: SessionStorageOptio
}
export function ensureSessionDir(sessionId: string, options?: SessionStorageOptions) {
const preferredDir = resolvePreferredSessionDir(sessionId, options);
const legacyDir = resolveLegacySessionDir(sessionId, options);
if (preferredDir !== legacyDir && existsSync(legacyDir) && !existsSync(preferredDir)) {
try {
mkdirSync(join(preferredDir, ".."), { recursive: true });
renameSync(legacyDir, preferredDir);
return;
} catch {
// Fall through to normal mkdir flow below.
}
}
const dir = resolveSessionDir(sessionId, options);
// mkdirSync with recursive is idempotent (no-op if dir exists),
// so skip the existsSync check to avoid a TOCTOU race.
@ -127,7 +167,7 @@ function internalizeBlock(
// Format A ref: { type: "image", $ref: "media/<hash>.bin" }
if (typeof block.$ref === "string") {
const filePath = join(resolveSessionDir(sessionId, options), block.$ref);
const filePath = join(resolveExistingSessionDir(sessionId, options), block.$ref);
try {
const buffer = readFileSync(filePath);
const data = buffer.toString("base64");
@ -140,7 +180,7 @@ function internalizeBlock(
// Format B ref: { type: "image", source: { type: "$ref", path: "media/<hash>.bin" } }
if (block.source && typeof block.source === "object" && block.source.type === "$ref") {
const filePath = join(resolveSessionDir(sessionId, options), block.source.path);
const filePath = join(resolveExistingSessionDir(sessionId, options), block.source.path);
try {
const buffer = readFileSync(filePath);
const data = buffer.toString("base64");
@ -240,7 +280,7 @@ function internalizeImages(
// ─── Public API ─────────────────────────────────────────────────────────────
export function readEntries(sessionId: string, options?: SessionStorageOptions): SessionEntry[] {
const filePath = resolveSessionPath(sessionId, options);
const filePath = join(resolveExistingSessionDir(sessionId, options), "session.jsonl");
if (!existsSync(filePath)) return [];
const content = readFileSync(filePath, "utf8");
const lines = content.split("\n").filter(Boolean);

View file

@ -41,6 +41,8 @@ export type AgentOptions = {
/** Command execution directory */
cwd?: string | undefined;
sessionId?: string | undefined;
/** Logical owner agent ID for hierarchical session storage (agent/conversation). */
ownerAgentId?: string | undefined;
logger?: AgentLogger | undefined;
// === Context Window Guard Configuration ===

View file

@ -620,6 +620,7 @@ export class Hub {
const channels = this.channelManager.listChannelInfos();
const agent = new AsyncAgent({
sessionId: conversationId,
ownerAgentId: targetAgentId,
profileId,
onExecApprovalNeeded,
onChannelSendFile,