From 37e5b2ce7f700f7a89e7cda184a114e7e53035fe Mon Sep 17 00:00:00 2001 From: yushen Date: Tue, 3 Feb 2026 17:21:15 +0800 Subject: [PATCH] feat(session): add write lock, file repair, and transcript repair modules Port from OpenClaw: - session-write-lock: file-level write lock with atomic creation, reference counting, stale lock detection, and process cleanup handlers - session-file-repair: auto-detect and repair malformed JSONL lines with backup and atomic rename - session-transcript-repair: fix tool call/result pairing issues including displaced results, duplicates, orphans, and missing inputs Co-Authored-By: Claude Opus 4.5 --- src/agent/session/session-file-repair.test.ts | 80 +++++ src/agent/session/session-file-repair.ts | 96 ++++++ .../session/session-transcript-repair.test.ts | 150 +++++++++ .../session/session-transcript-repair.ts | 295 ++++++++++++++++++ src/agent/session/session-write-lock.test.ts | 160 ++++++++++ src/agent/session/session-write-lock.ts | 208 ++++++++++++ 6 files changed, 989 insertions(+) create mode 100644 src/agent/session/session-file-repair.test.ts create mode 100644 src/agent/session/session-file-repair.ts create mode 100644 src/agent/session/session-transcript-repair.test.ts create mode 100644 src/agent/session/session-transcript-repair.ts create mode 100644 src/agent/session/session-write-lock.test.ts create mode 100644 src/agent/session/session-write-lock.ts diff --git a/src/agent/session/session-file-repair.test.ts b/src/agent/session/session-file-repair.test.ts new file mode 100644 index 00000000..caeebe3e --- /dev/null +++ b/src/agent/session/session-file-repair.test.ts @@ -0,0 +1,80 @@ +import fs from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; +import { describe, expect, it, vi } from "vitest"; +import { repairSessionFileIfNeeded } from "./session-file-repair.js"; + +describe("repairSessionFileIfNeeded", () => { + it("rewrites session files that contain malformed lines", async () => { + const dir = await fs.mkdtemp(path.join(os.tmpdir(), "multica-session-repair-")); + const file = path.join(dir, "session.jsonl"); + const meta = { + type: "meta", + meta: { provider: "kimi", model: "moonshot-v1-128k" }, + timestamp: Date.now(), + }; + const message = { + type: "message", + message: { role: "user", content: "hello" }, + timestamp: Date.now(), + }; + + const content = `${JSON.stringify(meta)}\n${JSON.stringify(message)}\n{"type":"message"`; + await fs.writeFile(file, content, "utf-8"); + + const result = await repairSessionFileIfNeeded({ sessionFile: file }); + expect(result.repaired).toBe(true); + expect(result.droppedLines).toBe(1); + expect(result.backupPath).toBeTruthy(); + + const repaired = await fs.readFile(file, "utf-8"); + expect(repaired.trim().split("\n")).toHaveLength(2); + + if (result.backupPath) { + const backup = await fs.readFile(result.backupPath, "utf-8"); + expect(backup).toBe(content); + } + }); + + it("does not drop CRLF-terminated JSONL lines", async () => { + const dir = await fs.mkdtemp(path.join(os.tmpdir(), "multica-session-repair-")); + const file = path.join(dir, "session.jsonl"); + const meta = { + type: "meta", + meta: { provider: "kimi", model: "moonshot-v1-128k" }, + timestamp: Date.now(), + }; + const message = { + type: "message", + message: { role: "user", content: "hello" }, + timestamp: Date.now(), + }; + const content = `${JSON.stringify(meta)}\r\n${JSON.stringify(message)}\r\n`; + await fs.writeFile(file, content, "utf-8"); + + const result = await repairSessionFileIfNeeded({ sessionFile: file }); + expect(result.repaired).toBe(false); + expect(result.droppedLines).toBe(0); + }); + + it("returns reason when file is empty after dropping all lines", async () => { + const dir = await fs.mkdtemp(path.join(os.tmpdir(), "multica-session-repair-")); + const file = path.join(dir, "session.jsonl"); + await fs.writeFile(file, "{broken\n{also broken\n", "utf-8"); + + const result = await repairSessionFileIfNeeded({ sessionFile: file }); + expect(result.repaired).toBe(false); + expect(result.reason).toBe("empty session file"); + }); + + it("returns a detailed reason when read errors are not ENOENT", async () => { + const dir = await fs.mkdtemp(path.join(os.tmpdir(), "multica-session-repair-")); + const warn = vi.fn(); + + const result = await repairSessionFileIfNeeded({ sessionFile: dir, warn }); + + expect(result.repaired).toBe(false); + expect(result.reason).toContain("failed to read session file"); + expect(warn).toHaveBeenCalledTimes(1); + }); +}); diff --git a/src/agent/session/session-file-repair.ts b/src/agent/session/session-file-repair.ts new file mode 100644 index 00000000..cb284f23 --- /dev/null +++ b/src/agent/session/session-file-repair.ts @@ -0,0 +1,96 @@ +import fs from "node:fs/promises"; +import path from "node:path"; + +type RepairReport = { + repaired: boolean; + droppedLines: number; + backupPath?: string; + reason?: string; +}; + +export type { RepairReport }; + +export async function repairSessionFileIfNeeded(params: { + sessionFile: string; + warn?: (message: string) => void; +}): Promise { + const sessionFile = params.sessionFile.trim(); + if (!sessionFile) { + return { repaired: false, droppedLines: 0, reason: "missing session file" }; + } + + let content: string; + try { + content = await fs.readFile(sessionFile, "utf-8"); + } catch (err) { + const code = (err as { code?: unknown } | undefined)?.code; + if (code === "ENOENT") { + return { repaired: false, droppedLines: 0, reason: "missing session file" }; + } + const reason = `failed to read session file: ${err instanceof Error ? err.message : "unknown error"}`; + params.warn?.(`session file repair skipped: ${reason} (${path.basename(sessionFile)})`); + return { repaired: false, droppedLines: 0, reason }; + } + + const lines = content.split(/\r?\n/); + const entries: unknown[] = []; + let droppedLines = 0; + + for (const line of lines) { + if (!line.trim()) { + continue; + } + try { + const entry = JSON.parse(line); + entries.push(entry); + } catch { + droppedLines += 1; + } + } + + if (entries.length === 0) { + return { repaired: false, droppedLines, reason: "empty session file" }; + } + + if (droppedLines === 0) { + return { repaired: false, droppedLines: 0 }; + } + + const cleaned = `${entries.map((entry) => JSON.stringify(entry)).join("\n")}\n`; + const backupPath = `${sessionFile}.bak-${process.pid}-${Date.now()}`; + const tmpPath = `${sessionFile}.repair-${process.pid}-${Date.now()}.tmp`; + try { + const stat = await fs.stat(sessionFile).catch(() => null); + await fs.writeFile(backupPath, content, "utf-8"); + if (stat) { + await fs.chmod(backupPath, stat.mode); + } + await fs.writeFile(tmpPath, cleaned, "utf-8"); + if (stat) { + await fs.chmod(tmpPath, stat.mode); + } + await fs.rename(tmpPath, sessionFile); + } catch (err) { + try { + await fs.unlink(tmpPath); + } catch (cleanupErr) { + params.warn?.( + `session file repair cleanup failed: ${cleanupErr instanceof Error ? cleanupErr.message : "unknown error"} (${path.basename( + tmpPath, + )})`, + ); + } + return { + repaired: false, + droppedLines, + reason: `repair failed: ${err instanceof Error ? err.message : "unknown error"}`, + }; + } + + params.warn?.( + `session file repaired: dropped ${droppedLines} malformed line(s) (${path.basename( + sessionFile, + )})`, + ); + return { repaired: true, droppedLines, backupPath }; +} diff --git a/src/agent/session/session-transcript-repair.test.ts b/src/agent/session/session-transcript-repair.test.ts new file mode 100644 index 00000000..7607f86f --- /dev/null +++ b/src/agent/session/session-transcript-repair.test.ts @@ -0,0 +1,150 @@ +import type { AgentMessage } from "@mariozechner/pi-agent-core"; +import { describe, expect, it } from "vitest"; +import { + sanitizeToolCallInputs, + sanitizeToolUseResultPairing, +} from "./session-transcript-repair.js"; + +describe("sanitizeToolUseResultPairing", () => { + it("moves tool results directly after tool calls and inserts missing results", () => { + const input = [ + { + role: "assistant", + content: [ + { type: "toolCall", id: "call_1", name: "read", arguments: {} }, + { type: "toolCall", id: "call_2", name: "exec", arguments: {} }, + ], + }, + { role: "user", content: "user message that should come after tool use" }, + { + role: "toolResult", + toolCallId: "call_2", + toolName: "exec", + content: [{ type: "text", text: "ok" }], + isError: false, + }, + ] satisfies AgentMessage[]; + + const out = sanitizeToolUseResultPairing(input); + expect(out[0]?.role).toBe("assistant"); + expect(out[1]?.role).toBe("toolResult"); + expect((out[1] as { toolCallId?: string }).toolCallId).toBe("call_1"); + expect(out[2]?.role).toBe("toolResult"); + expect((out[2] as { toolCallId?: string }).toolCallId).toBe("call_2"); + expect(out[3]?.role).toBe("user"); + }); + + it("drops duplicate tool results for the same id within a span", () => { + const input = [ + { + role: "assistant", + content: [{ type: "toolCall", id: "call_1", name: "read", arguments: {} }], + }, + { + role: "toolResult", + toolCallId: "call_1", + toolName: "read", + content: [{ type: "text", text: "first" }], + isError: false, + }, + { + role: "toolResult", + toolCallId: "call_1", + toolName: "read", + content: [{ type: "text", text: "second" }], + isError: false, + }, + { role: "user", content: "ok" }, + ] satisfies AgentMessage[]; + + const out = sanitizeToolUseResultPairing(input); + expect(out.filter((m) => m.role === "toolResult")).toHaveLength(1); + }); + + it("drops duplicate tool results for the same id across the transcript", () => { + const input = [ + { + role: "assistant", + content: [{ type: "toolCall", id: "call_1", name: "read", arguments: {} }], + }, + { + role: "toolResult", + toolCallId: "call_1", + toolName: "read", + content: [{ type: "text", text: "first" }], + isError: false, + }, + { role: "assistant", content: [{ type: "text", text: "ok" }] }, + { + role: "toolResult", + toolCallId: "call_1", + toolName: "read", + content: [{ type: "text", text: "second (duplicate)" }], + isError: false, + }, + ] satisfies AgentMessage[]; + + const out = sanitizeToolUseResultPairing(input); + const results = out.filter((m) => m.role === "toolResult") as Array<{ + toolCallId?: string; + }>; + expect(results).toHaveLength(1); + expect(results[0]?.toolCallId).toBe("call_1"); + }); + + it("drops orphan tool results that do not match any tool call", () => { + const input = [ + { role: "user", content: "hello" }, + { + role: "toolResult", + toolCallId: "call_orphan", + toolName: "read", + content: [{ type: "text", text: "orphan" }], + isError: false, + }, + { + role: "assistant", + content: [{ type: "text", text: "ok" }], + }, + ] satisfies AgentMessage[]; + + const out = sanitizeToolUseResultPairing(input); + expect(out.some((m) => m.role === "toolResult")).toBe(false); + expect(out.map((m) => m.role)).toEqual(["user", "assistant"]); + }); +}); + +describe("sanitizeToolCallInputs", () => { + it("drops tool calls missing input or arguments", () => { + const input: AgentMessage[] = [ + { + role: "assistant", + content: [{ type: "toolCall", id: "call_1", name: "read" }], + }, + { role: "user", content: "hello" }, + ]; + + const out = sanitizeToolCallInputs(input); + expect(out.map((m) => m.role)).toEqual(["user"]); + }); + + it("keeps valid tool calls and preserves text blocks", () => { + const input: AgentMessage[] = [ + { + role: "assistant", + content: [ + { type: "text", text: "before" }, + { type: "toolUse", id: "call_ok", name: "read", input: { path: "a" } }, + { type: "toolCall", id: "call_drop", name: "read" }, + ], + }, + ]; + + const out = sanitizeToolCallInputs(input); + const assistant = out[0] as Extract; + const types = Array.isArray(assistant.content) + ? assistant.content.map((block) => (block as { type?: unknown }).type) + : []; + expect(types).toEqual(["text", "toolUse"]); + }); +}); diff --git a/src/agent/session/session-transcript-repair.ts b/src/agent/session/session-transcript-repair.ts new file mode 100644 index 00000000..aa43071e --- /dev/null +++ b/src/agent/session/session-transcript-repair.ts @@ -0,0 +1,295 @@ +import type { AgentMessage } from "@mariozechner/pi-agent-core"; + +type ToolCallLike = { + id: string; + name?: string; +}; + +const TOOL_CALL_TYPES = new Set(["toolCall", "toolUse", "functionCall"]); + +type ToolCallBlock = { + type?: unknown; + id?: unknown; + name?: unknown; + input?: unknown; + arguments?: unknown; +}; + +function extractToolCallsFromAssistant( + msg: Extract, +): ToolCallLike[] { + const content = msg.content; + if (!Array.isArray(content)) { + return []; + } + + const toolCalls: ToolCallLike[] = []; + for (const block of content) { + if (!block || typeof block !== "object") { + continue; + } + const rec = block as { type?: unknown; id?: unknown; name?: unknown }; + if (typeof rec.id !== "string" || !rec.id) { + continue; + } + + if (rec.type === "toolCall" || rec.type === "toolUse" || rec.type === "functionCall") { + toolCalls.push({ + id: rec.id, + name: typeof rec.name === "string" ? rec.name : undefined, + }); + } + } + return toolCalls; +} + +function isToolCallBlock(block: unknown): block is ToolCallBlock { + if (!block || typeof block !== "object") { + return false; + } + const type = (block as { type?: unknown }).type; + return typeof type === "string" && TOOL_CALL_TYPES.has(type); +} + +function hasToolCallInput(block: ToolCallBlock): boolean { + const hasInput = "input" in block ? block.input !== undefined && block.input !== null : false; + const hasArguments = + "arguments" in block ? block.arguments !== undefined && block.arguments !== null : false; + return hasInput || hasArguments; +} + +function extractToolResultId(msg: Extract): string | null { + const toolCallId = (msg as { toolCallId?: unknown }).toolCallId; + if (typeof toolCallId === "string" && toolCallId) { + return toolCallId; + } + const toolUseId = (msg as { toolUseId?: unknown }).toolUseId; + if (typeof toolUseId === "string" && toolUseId) { + return toolUseId; + } + return null; +} + +function makeMissingToolResult(params: { + toolCallId: string; + toolName?: string; +}): Extract { + return { + role: "toolResult", + toolCallId: params.toolCallId, + toolName: params.toolName ?? "unknown", + content: [ + { + type: "text", + text: "[multica] missing tool result in session history; inserted synthetic error result for transcript repair.", + }, + ], + isError: true, + timestamp: Date.now(), + } as Extract; +} + +export { makeMissingToolResult }; + +export type ToolCallInputRepairReport = { + messages: AgentMessage[]; + droppedToolCalls: number; + droppedAssistantMessages: number; +}; + +export function repairToolCallInputs(messages: AgentMessage[]): ToolCallInputRepairReport { + let droppedToolCalls = 0; + let droppedAssistantMessages = 0; + let changed = false; + const out: AgentMessage[] = []; + + for (const msg of messages) { + if (!msg || typeof msg !== "object") { + out.push(msg); + continue; + } + + if (msg.role !== "assistant" || !Array.isArray(msg.content)) { + out.push(msg); + continue; + } + + const nextContent = []; + let droppedInMessage = 0; + + for (const block of msg.content) { + if (isToolCallBlock(block) && !hasToolCallInput(block)) { + droppedToolCalls += 1; + droppedInMessage += 1; + changed = true; + continue; + } + nextContent.push(block); + } + + if (droppedInMessage > 0) { + if (nextContent.length === 0) { + droppedAssistantMessages += 1; + changed = true; + continue; + } + out.push({ ...msg, content: nextContent }); + continue; + } + + out.push(msg); + } + + return { + messages: changed ? out : messages, + droppedToolCalls, + droppedAssistantMessages, + }; +} + +export function sanitizeToolCallInputs(messages: AgentMessage[]): AgentMessage[] { + return repairToolCallInputs(messages).messages; +} + +export function sanitizeToolUseResultPairing(messages: AgentMessage[]): AgentMessage[] { + return repairToolUseResultPairing(messages).messages; +} + +export type ToolUseRepairReport = { + messages: AgentMessage[]; + added: Array>; + droppedDuplicateCount: number; + droppedOrphanCount: number; + moved: boolean; +}; + +export function repairToolUseResultPairing(messages: AgentMessage[]): ToolUseRepairReport { + const out: AgentMessage[] = []; + const added: Array> = []; + const seenToolResultIds = new Set(); + let droppedDuplicateCount = 0; + let droppedOrphanCount = 0; + let moved = false; + let changed = false; + + const pushToolResult = (msg: Extract) => { + const id = extractToolResultId(msg); + if (id && seenToolResultIds.has(id)) { + droppedDuplicateCount += 1; + changed = true; + return; + } + if (id) { + seenToolResultIds.add(id); + } + out.push(msg); + }; + + for (let i = 0; i < messages.length; i += 1) { + const msg = messages[i]; + if (!msg || typeof msg !== "object") { + out.push(msg); + continue; + } + + const role = (msg as { role?: unknown }).role; + if (role !== "assistant") { + if (role !== "toolResult") { + out.push(msg); + } else { + droppedOrphanCount += 1; + changed = true; + } + continue; + } + + const assistant = msg as Extract; + const toolCalls = extractToolCallsFromAssistant(assistant); + if (toolCalls.length === 0) { + out.push(msg); + continue; + } + + const toolCallIds = new Set(toolCalls.map((t) => t.id)); + + const spanResultsById = new Map>(); + const remainder: AgentMessage[] = []; + + let j = i + 1; + for (; j < messages.length; j += 1) { + const next = messages[j]; + if (!next || typeof next !== "object") { + remainder.push(next); + continue; + } + + const nextRole = (next as { role?: unknown }).role; + if (nextRole === "assistant") { + break; + } + + if (nextRole === "toolResult") { + const toolResult = next as Extract; + const id = extractToolResultId(toolResult); + if (id && toolCallIds.has(id)) { + if (seenToolResultIds.has(id)) { + droppedDuplicateCount += 1; + changed = true; + continue; + } + if (!spanResultsById.has(id)) { + spanResultsById.set(id, toolResult); + } + continue; + } + } + + if (nextRole !== "toolResult") { + remainder.push(next); + } else { + droppedOrphanCount += 1; + changed = true; + } + } + + out.push(msg); + + if (spanResultsById.size > 0 && remainder.length > 0) { + moved = true; + changed = true; + } + + for (const call of toolCalls) { + const existing = spanResultsById.get(call.id); + if (existing) { + pushToolResult(existing); + } else { + const missing = makeMissingToolResult({ + toolCallId: call.id, + toolName: call.name, + }); + added.push(missing); + changed = true; + pushToolResult(missing); + } + } + + for (const rem of remainder) { + if (!rem || typeof rem !== "object") { + out.push(rem); + continue; + } + out.push(rem); + } + i = j - 1; + } + + const changedOrMoved = changed || moved; + return { + messages: changedOrMoved ? out : messages, + added, + droppedDuplicateCount, + droppedOrphanCount, + moved: changedOrMoved, + }; +} diff --git a/src/agent/session/session-write-lock.test.ts b/src/agent/session/session-write-lock.test.ts new file mode 100644 index 00000000..1eb3f024 --- /dev/null +++ b/src/agent/session/session-write-lock.test.ts @@ -0,0 +1,160 @@ +import fs from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; +import { describe, expect, it } from "vitest"; +import { __testing, acquireSessionWriteLock } from "./session-write-lock.js"; + +describe("acquireSessionWriteLock", () => { + it("reuses locks across symlinked session paths", async () => { + if (process.platform === "win32") { + expect(true).toBe(true); + return; + } + + const root = await fs.mkdtemp(path.join(os.tmpdir(), "multica-lock-")); + try { + const realDir = path.join(root, "real"); + const linkDir = path.join(root, "link"); + await fs.mkdir(realDir, { recursive: true }); + await fs.symlink(realDir, linkDir); + + const sessionReal = path.join(realDir, "sessions.json"); + const sessionLink = path.join(linkDir, "sessions.json"); + + const lockA = await acquireSessionWriteLock({ sessionFile: sessionReal, timeoutMs: 500 }); + const lockB = await acquireSessionWriteLock({ sessionFile: sessionLink, timeoutMs: 500 }); + + await lockB.release(); + await lockA.release(); + } finally { + await fs.rm(root, { recursive: true, force: true }); + } + }); + + it("keeps the lock file until the last release", async () => { + const root = await fs.mkdtemp(path.join(os.tmpdir(), "multica-lock-")); + try { + const sessionFile = path.join(root, "sessions.json"); + const lockPath = `${sessionFile}.lock`; + + const lockA = await acquireSessionWriteLock({ sessionFile, timeoutMs: 500 }); + const lockB = await acquireSessionWriteLock({ sessionFile, timeoutMs: 500 }); + + await expect(fs.access(lockPath)).resolves.toBeUndefined(); + await lockA.release(); + await expect(fs.access(lockPath)).resolves.toBeUndefined(); + await lockB.release(); + await expect(fs.access(lockPath)).rejects.toThrow(); + } finally { + await fs.rm(root, { recursive: true, force: true }); + } + }); + + it("reclaims stale lock files", async () => { + const root = await fs.mkdtemp(path.join(os.tmpdir(), "multica-lock-")); + try { + const sessionFile = path.join(root, "sessions.json"); + const lockPath = `${sessionFile}.lock`; + await fs.writeFile( + lockPath, + JSON.stringify({ pid: 123456, createdAt: new Date(Date.now() - 60_000).toISOString() }), + "utf8", + ); + + const lock = await acquireSessionWriteLock({ sessionFile, timeoutMs: 500, staleMs: 10 }); + const raw = await fs.readFile(lockPath, "utf8"); + const payload = JSON.parse(raw) as { pid: number }; + + expect(payload.pid).toBe(process.pid); + await lock.release(); + } finally { + await fs.rm(root, { recursive: true, force: true }); + } + }); + + it("removes held locks on termination signals", async () => { + const signals = ["SIGINT", "SIGTERM", "SIGQUIT", "SIGABRT"] as const; + for (const signal of signals) { + const root = await fs.mkdtemp(path.join(os.tmpdir(), "multica-lock-cleanup-")); + // Prevent the signal from actually killing the vitest worker + const keepAlive = () => {}; + process.on(signal, keepAlive); + try { + const sessionFile = path.join(root, "sessions.json"); + const lockPath = `${sessionFile}.lock`; + await acquireSessionWriteLock({ sessionFile, timeoutMs: 500 }); + + __testing.handleTerminationSignal(signal); + + await expect(fs.stat(lockPath)).rejects.toThrow(); + } finally { + process.off(signal, keepAlive); + await fs.rm(root, { recursive: true, force: true }); + } + } + }); + + it("registers cleanup for SIGQUIT and SIGABRT", () => { + expect(__testing.cleanupSignals).toContain("SIGQUIT"); + expect(__testing.cleanupSignals).toContain("SIGABRT"); + }); + + it("cleans up locks on SIGINT without removing other handlers", async () => { + const root = await fs.mkdtemp(path.join(os.tmpdir(), "multica-lock-")); + const originalKill = process.kill.bind(process) as typeof process.kill; + const killCalls: Array = []; + let otherHandlerCalled = false; + + process.kill = ((pid: number, signal?: NodeJS.Signals) => { + killCalls.push(signal); + return true; + }) as typeof process.kill; + + const otherHandler = () => { + otherHandlerCalled = true; + }; + + process.on("SIGINT", otherHandler); + + try { + const sessionFile = path.join(root, "sessions.json"); + const lockPath = `${sessionFile}.lock`; + await acquireSessionWriteLock({ sessionFile, timeoutMs: 500 }); + + process.emit("SIGINT"); + + await expect(fs.access(lockPath)).rejects.toThrow(); + expect(otherHandlerCalled).toBe(true); + expect(killCalls).toEqual([]); + } finally { + process.off("SIGINT", otherHandler); + process.kill = originalKill; + await fs.rm(root, { recursive: true, force: true }); + } + }); + + it("cleans up locks via releaseAllLocksSync", async () => { + const root = await fs.mkdtemp(path.join(os.tmpdir(), "multica-lock-")); + try { + const sessionFile = path.join(root, "sessions.json"); + const lockPath = `${sessionFile}.lock`; + await acquireSessionWriteLock({ sessionFile, timeoutMs: 500 }); + + __testing.releaseAllLocksSync(); + + await expect(fs.access(lockPath)).rejects.toThrow(); + } finally { + await fs.rm(root, { recursive: true, force: true }); + } + }); + + it("keeps other signal listeners registered", () => { + const keepAlive = () => {}; + process.on("SIGINT", keepAlive); + + __testing.handleTerminationSignal("SIGINT"); + + expect(process.listeners("SIGINT")).toContain(keepAlive); + process.off("SIGINT", keepAlive); + }); +}); diff --git a/src/agent/session/session-write-lock.ts b/src/agent/session/session-write-lock.ts new file mode 100644 index 00000000..7335abaf --- /dev/null +++ b/src/agent/session/session-write-lock.ts @@ -0,0 +1,208 @@ +import fsSync from "node:fs"; +import fs from "node:fs/promises"; +import path from "node:path"; + +type LockFilePayload = { + pid: number; + createdAt: string; +}; + +type HeldLock = { + count: number; + handle: fs.FileHandle; + lockPath: string; +}; + +const HELD_LOCKS = new Map(); +const CLEANUP_SIGNALS = ["SIGINT", "SIGTERM", "SIGQUIT", "SIGABRT"] as const; +type CleanupSignal = (typeof CLEANUP_SIGNALS)[number]; +const cleanupHandlers = new Map void>(); + +function isAlive(pid: number): boolean { + if (!Number.isFinite(pid) || pid <= 0) { + return false; + } + try { + process.kill(pid, 0); + return true; + } catch { + return false; + } +} + +/** + * Synchronously release all held locks. + * Used during process exit when async operations aren't reliable. + */ +function releaseAllLocksSync(): void { + for (const [sessionFile, held] of HELD_LOCKS) { + try { + if (typeof held.handle.close === "function") { + void held.handle.close().catch(() => {}); + } + } catch { + // Ignore errors during cleanup - best effort + } + try { + fsSync.rmSync(held.lockPath, { force: true }); + } catch { + // Ignore errors during cleanup - best effort + } + HELD_LOCKS.delete(sessionFile); + } +} + +let cleanupRegistered = false; + +function handleTerminationSignal(signal: CleanupSignal): void { + releaseAllLocksSync(); + const shouldReraise = process.listenerCount(signal) === 1; + if (shouldReraise) { + const handler = cleanupHandlers.get(signal); + if (handler) { + process.off(signal, handler); + } + try { + process.kill(process.pid, signal); + } catch { + // Ignore errors during shutdown + } + } +} + +function registerCleanupHandlers(): void { + if (cleanupRegistered) { + return; + } + cleanupRegistered = true; + + // Cleanup on normal exit and process.exit() calls + process.on("exit", () => { + releaseAllLocksSync(); + }); + + // Handle termination signals + for (const signal of CLEANUP_SIGNALS) { + try { + const handler = () => handleTerminationSignal(signal); + cleanupHandlers.set(signal, handler); + process.on(signal, handler); + } catch { + // Ignore unsupported signals on this platform. + } + } +} + +async function readLockPayload(lockPath: string): Promise { + try { + const raw = await fs.readFile(lockPath, "utf8"); + const parsed = JSON.parse(raw) as Partial; + if (typeof parsed.pid !== "number") { + return null; + } + if (typeof parsed.createdAt !== "string") { + return null; + } + return { pid: parsed.pid, createdAt: parsed.createdAt }; + } catch { + return null; + } +} + +export async function acquireSessionWriteLock(params: { + sessionFile: string; + timeoutMs?: number; + staleMs?: number; +}): Promise<{ + release: () => Promise; +}> { + registerCleanupHandlers(); + const timeoutMs = params.timeoutMs ?? 10_000; + const staleMs = params.staleMs ?? 30 * 60 * 1000; + const sessionFile = path.resolve(params.sessionFile); + const sessionDir = path.dirname(sessionFile); + await fs.mkdir(sessionDir, { recursive: true }); + let normalizedDir = sessionDir; + try { + normalizedDir = await fs.realpath(sessionDir); + } catch { + // Fall back to the resolved path if realpath fails (permissions, transient FS). + } + const normalizedSessionFile = path.join(normalizedDir, path.basename(sessionFile)); + const lockPath = `${normalizedSessionFile}.lock`; + + const held = HELD_LOCKS.get(normalizedSessionFile); + if (held) { + held.count += 1; + return { + release: async () => { + const current = HELD_LOCKS.get(normalizedSessionFile); + if (!current) { + return; + } + current.count -= 1; + if (current.count > 0) { + return; + } + HELD_LOCKS.delete(normalizedSessionFile); + await current.handle.close(); + await fs.rm(current.lockPath, { force: true }); + }, + }; + } + + const startedAt = Date.now(); + let attempt = 0; + while (Date.now() - startedAt < timeoutMs) { + attempt += 1; + try { + const handle = await fs.open(lockPath, "wx"); + await handle.writeFile( + JSON.stringify({ pid: process.pid, createdAt: new Date().toISOString() }, null, 2), + "utf8", + ); + HELD_LOCKS.set(normalizedSessionFile, { count: 1, handle, lockPath }); + return { + release: async () => { + const current = HELD_LOCKS.get(normalizedSessionFile); + if (!current) { + return; + } + current.count -= 1; + if (current.count > 0) { + return; + } + HELD_LOCKS.delete(normalizedSessionFile); + await current.handle.close(); + await fs.rm(current.lockPath, { force: true }); + }, + }; + } catch (err) { + const code = (err as { code?: unknown }).code; + if (code !== "EEXIST") { + throw err; + } + const payload = await readLockPayload(lockPath); + const createdAt = payload?.createdAt ? Date.parse(payload.createdAt) : NaN; + const stale = !Number.isFinite(createdAt) || Date.now() - createdAt > staleMs; + const alive = payload?.pid ? isAlive(payload.pid) : false; + if (stale || !alive) { + await fs.rm(lockPath, { force: true }); + continue; + } + + const delay = Math.min(1000, 50 * attempt); + await new Promise((r) => setTimeout(r, delay)); + } + } + + const payload = await readLockPayload(lockPath); + const owner = payload?.pid ? `pid=${payload.pid}` : "unknown"; + throw new Error(`session file locked (timeout ${timeoutMs}ms): ${owner} ${lockPath}`); +} + +export const __testing = { + cleanupSignals: [...CLEANUP_SIGNALS], + handleTerminationSignal, + releaseAllLocksSync, +};