diff --git a/src/agent/runner.ts b/src/agent/runner.ts index 90f5416a..9ae9bf27 100644 --- a/src/agent/runner.ts +++ b/src/agent/runner.ts @@ -23,6 +23,7 @@ export class Agent { private readonly skillManager?: SkillManager; private readonly contextWindowGuard: ContextWindowGuardResult; private readonly debug: boolean; + private initialized = false; /** Current session ID */ readonly sessionId: string; @@ -172,31 +173,6 @@ export class Agent { } this.agent.setTools(tools); - const restoredMessages = this.session.loadMessages(); - if (restoredMessages.length > 0) { - if (this.debug) { - console.error(`[debug] Restoring ${restoredMessages.length} messages from session`); - for (const msg of restoredMessages) { - const msgAny = msg as any; - const content = Array.isArray(msgAny.content) - ? msgAny.content.map((c: any) => c.type || "text").join(", ") - : typeof msgAny.content; - console.error(`[debug] ${msg.role}: ${content}`); - if (Array.isArray(msgAny.content)) { - for (const block of msgAny.content) { - if (block.type === "tool_use") { - console.error(`[debug] tool_use id: ${block.id}, name: ${block.name}`); - } - if (block.type === "tool_result") { - console.error(`[debug] tool_result tool_use_id: ${block.tool_use_id}`); - } - } - } - } - } - this.agent.replaceMessages(restoredMessages); - } - this.session.saveMeta({ provider: this.agent.state.model?.provider, model: this.agent.state.model?.id, @@ -216,6 +192,34 @@ export class Agent { } async run(prompt: string): Promise { + if (!this.initialized) { + await this.session.repairIfNeeded((msg) => console.error(msg)); + const restoredMessages = this.session.loadMessages(); + if (restoredMessages.length > 0) { + if (this.debug) { + console.error(`[debug] Restoring ${restoredMessages.length} messages from session`); + for (const msg of restoredMessages) { + const msgAny = msg as any; + const content = Array.isArray(msgAny.content) + ? msgAny.content.map((c: any) => c.type || "text").join(", ") + : typeof msgAny.content; + console.error(`[debug] ${msg.role}: ${content}`); + if (Array.isArray(msgAny.content)) { + for (const block of msgAny.content) { + if (block.type === "tool_use") { + console.error(`[debug] tool_use id: ${block.id}, name: ${block.name}`); + } + if (block.type === "tool_result") { + console.error(`[debug] tool_result tool_use_id: ${block.tool_use_id}`); + } + } + } + } + } + this.agent.replaceMessages(restoredMessages); + } + this.initialized = true; + } this.output.state.lastAssistantText = ""; await this.agent.prompt(prompt); return { text: this.output.state.lastAssistantText, error: this.agent.state.error }; diff --git a/src/agent/session/session-manager.ts b/src/agent/session/session-manager.ts index e6793b36..a820072a 100644 --- a/src/agent/session/session-manager.ts +++ b/src/agent/session/session-manager.ts @@ -1,9 +1,11 @@ import type { AgentMessage } from "@mariozechner/pi-agent-core"; import { getModel, type Model } from "@mariozechner/pi-ai"; import type { SessionEntry, SessionMeta } from "./types.js"; -import { appendEntry, readEntries, writeEntries } from "./storage.js"; +import { appendEntry, readEntries, resolveSessionPath, writeEntries } from "./storage.js"; import { compactMessages, compactMessagesAsync } from "./compaction.js"; import { credentialManager } from "../credentials.js"; +import { repairSessionFileIfNeeded, type RepairReport } from "./session-file-repair.js"; +import { sanitizeToolCallInputs, sanitizeToolUseResultPairing } from "./session-transcript-repair.js"; /** Get Kimi model for summarization (use a cheaper model than k2-thinking) */ function getSummaryModel(): Model { @@ -140,11 +142,19 @@ export class SessionManager { return readEntries(this.sessionId, { baseDir: this.baseDir }); } + async repairIfNeeded(warn?: (message: string) => void): Promise { + const filePath = resolveSessionPath(this.sessionId, { baseDir: this.baseDir }); + return repairSessionFileIfNeeded({ sessionFile: filePath, warn }); + } + loadMessages(): AgentMessage[] { const entries = this.loadEntries(); - return entries + let messages = entries .filter((entry) => entry.type === "message") .map((entry) => entry.message); + messages = sanitizeToolCallInputs(messages); + messages = sanitizeToolUseResultPairing(messages); + return messages; } loadMeta(): SessionMeta | undefined { diff --git a/src/agent/session/storage.ts b/src/agent/session/storage.ts index b219fd95..63915783 100644 --- a/src/agent/session/storage.ts +++ b/src/agent/session/storage.ts @@ -3,6 +3,7 @@ import { existsSync, mkdirSync, readFileSync } from "fs"; import { appendFile, writeFile } from "fs/promises"; import type { SessionEntry } from "./types.js"; import { DATA_DIR } from "../../shared/index.js"; +import { acquireSessionWriteLock } from "./session-write-lock.js"; export type SessionStorageOptions = { baseDir?: string | undefined; @@ -50,7 +51,12 @@ export async function appendEntry( ) { ensureSessionDir(sessionId, options); const filePath = resolveSessionPath(sessionId, options); - await appendFile(filePath, `${JSON.stringify(entry)}\n`, "utf8"); + const lock = await acquireSessionWriteLock({ sessionFile: filePath }); + try { + await appendFile(filePath, `${JSON.stringify(entry)}\n`, "utf8"); + } finally { + await lock.release(); + } } export async function writeEntries( @@ -60,6 +66,11 @@ export async function writeEntries( ) { ensureSessionDir(sessionId, options); const filePath = resolveSessionPath(sessionId, options); - const content = entries.map((entry) => JSON.stringify(entry)).join("\n"); - await writeFile(filePath, content ? `${content}\n` : "", "utf8"); + const lock = await acquireSessionWriteLock({ sessionFile: filePath }); + try { + const content = entries.map((entry) => JSON.stringify(entry)).join("\n"); + await writeFile(filePath, content ? `${content}\n` : "", "utf8"); + } finally { + await lock.release(); + } }