feat(session): integrate write lock and auto-repair into session flow

- storage.ts: acquire/release write lock in appendEntry and writeEntries
- session-manager.ts: add repairIfNeeded() method, apply transcript
  sanitization in loadMessages()
- runner.ts: move message loading to lazy async init in run(), call
  repair before loading messages (following OpenClaw's pattern)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
yushen 2026-02-03 17:21:26 +08:00
parent 37e5b2ce7f
commit 0c549f76e6
3 changed files with 55 additions and 30 deletions

View file

@ -23,6 +23,7 @@ export class Agent {
private readonly skillManager?: SkillManager;
private readonly contextWindowGuard: ContextWindowGuardResult;
private readonly debug: boolean;
private initialized = false;
/** Current session ID */
readonly sessionId: string;
@ -172,31 +173,6 @@ export class Agent {
}
this.agent.setTools(tools);
const restoredMessages = this.session.loadMessages();
if (restoredMessages.length > 0) {
if (this.debug) {
console.error(`[debug] Restoring ${restoredMessages.length} messages from session`);
for (const msg of restoredMessages) {
const msgAny = msg as any;
const content = Array.isArray(msgAny.content)
? msgAny.content.map((c: any) => c.type || "text").join(", ")
: typeof msgAny.content;
console.error(`[debug] ${msg.role}: ${content}`);
if (Array.isArray(msgAny.content)) {
for (const block of msgAny.content) {
if (block.type === "tool_use") {
console.error(`[debug] tool_use id: ${block.id}, name: ${block.name}`);
}
if (block.type === "tool_result") {
console.error(`[debug] tool_result tool_use_id: ${block.tool_use_id}`);
}
}
}
}
}
this.agent.replaceMessages(restoredMessages);
}
this.session.saveMeta({
provider: this.agent.state.model?.provider,
model: this.agent.state.model?.id,
@ -216,6 +192,34 @@ export class Agent {
}
async run(prompt: string): Promise<AgentRunResult> {
if (!this.initialized) {
await this.session.repairIfNeeded((msg) => console.error(msg));
const restoredMessages = this.session.loadMessages();
if (restoredMessages.length > 0) {
if (this.debug) {
console.error(`[debug] Restoring ${restoredMessages.length} messages from session`);
for (const msg of restoredMessages) {
const msgAny = msg as any;
const content = Array.isArray(msgAny.content)
? msgAny.content.map((c: any) => c.type || "text").join(", ")
: typeof msgAny.content;
console.error(`[debug] ${msg.role}: ${content}`);
if (Array.isArray(msgAny.content)) {
for (const block of msgAny.content) {
if (block.type === "tool_use") {
console.error(`[debug] tool_use id: ${block.id}, name: ${block.name}`);
}
if (block.type === "tool_result") {
console.error(`[debug] tool_result tool_use_id: ${block.tool_use_id}`);
}
}
}
}
}
this.agent.replaceMessages(restoredMessages);
}
this.initialized = true;
}
this.output.state.lastAssistantText = "";
await this.agent.prompt(prompt);
return { text: this.output.state.lastAssistantText, error: this.agent.state.error };

View file

@ -1,9 +1,11 @@
import type { AgentMessage } from "@mariozechner/pi-agent-core";
import { getModel, type Model } from "@mariozechner/pi-ai";
import type { SessionEntry, SessionMeta } from "./types.js";
import { appendEntry, readEntries, writeEntries } from "./storage.js";
import { appendEntry, readEntries, resolveSessionPath, writeEntries } from "./storage.js";
import { compactMessages, compactMessagesAsync } from "./compaction.js";
import { credentialManager } from "../credentials.js";
import { repairSessionFileIfNeeded, type RepairReport } from "./session-file-repair.js";
import { sanitizeToolCallInputs, sanitizeToolUseResultPairing } from "./session-transcript-repair.js";
/** Get Kimi model for summarization (use a cheaper model than k2-thinking) */
function getSummaryModel(): Model<any> {
@ -140,11 +142,19 @@ export class SessionManager {
return readEntries(this.sessionId, { baseDir: this.baseDir });
}
async repairIfNeeded(warn?: (message: string) => void): Promise<RepairReport> {
const filePath = resolveSessionPath(this.sessionId, { baseDir: this.baseDir });
return repairSessionFileIfNeeded({ sessionFile: filePath, warn });
}
loadMessages(): AgentMessage[] {
const entries = this.loadEntries();
return entries
let messages = entries
.filter((entry) => entry.type === "message")
.map((entry) => entry.message);
messages = sanitizeToolCallInputs(messages);
messages = sanitizeToolUseResultPairing(messages);
return messages;
}
loadMeta(): SessionMeta | undefined {

View file

@ -3,6 +3,7 @@ import { existsSync, mkdirSync, readFileSync } from "fs";
import { appendFile, writeFile } from "fs/promises";
import type { SessionEntry } from "./types.js";
import { DATA_DIR } from "../../shared/index.js";
import { acquireSessionWriteLock } from "./session-write-lock.js";
export type SessionStorageOptions = {
baseDir?: string | undefined;
@ -50,7 +51,12 @@ export async function appendEntry(
) {
ensureSessionDir(sessionId, options);
const filePath = resolveSessionPath(sessionId, options);
await appendFile(filePath, `${JSON.stringify(entry)}\n`, "utf8");
const lock = await acquireSessionWriteLock({ sessionFile: filePath });
try {
await appendFile(filePath, `${JSON.stringify(entry)}\n`, "utf8");
} finally {
await lock.release();
}
}
export async function writeEntries(
@ -60,6 +66,11 @@ export async function writeEntries(
) {
ensureSessionDir(sessionId, options);
const filePath = resolveSessionPath(sessionId, options);
const content = entries.map((entry) => JSON.stringify(entry)).join("\n");
await writeFile(filePath, content ? `${content}\n` : "", "utf8");
const lock = await acquireSessionWriteLock({ sessionFile: filePath });
try {
const content = entries.map((entry) => JSON.stringify(entry)).join("\n");
await writeFile(filePath, content ? `${content}\n` : "", "utf8");
} finally {
await lock.release();
}
}