From f2ca86aa1f08ac92a22c3df919f05d697c9bbaf3 Mon Sep 17 00:00:00 2001 From: yushen Date: Tue, 3 Feb 2026 17:47:41 +0800 Subject: [PATCH] fix(session): harden write lock and repair --- src/agent/session/session-file-repair.test.ts | 21 +++ src/agent/session/session-file-repair.ts | 134 +++++++++--------- src/agent/session/session-write-lock.test.ts | 34 +++++ src/agent/session/session-write-lock.ts | 30 +++- 4 files changed, 149 insertions(+), 70 deletions(-) diff --git a/src/agent/session/session-file-repair.test.ts b/src/agent/session/session-file-repair.test.ts index caeebe3e..d190dd52 100644 --- a/src/agent/session/session-file-repair.test.ts +++ b/src/agent/session/session-file-repair.test.ts @@ -3,6 +3,17 @@ import os from "node:os"; import path from "node:path"; import { describe, expect, it, vi } from "vitest"; import { repairSessionFileIfNeeded } from "./session-file-repair.js"; +import { acquireSessionWriteLock } from "./session-write-lock.js"; + +vi.mock("./session-write-lock.js", async () => { + const actual = await vi.importActual( + "./session-write-lock.js", + ); + return { + ...actual, + acquireSessionWriteLock: vi.fn(actual.acquireSessionWriteLock), + }; +}); describe("repairSessionFileIfNeeded", () => { it("rewrites session files that contain malformed lines", async () => { @@ -77,4 +88,14 @@ describe("repairSessionFileIfNeeded", () => { expect(result.reason).toContain("failed to read session file"); expect(warn).toHaveBeenCalledTimes(1); }); + + it("acquires a write lock while repairing", async () => { + const dir = await fs.mkdtemp(path.join(os.tmpdir(), "multica-session-repair-")); + const file = path.join(dir, "session.jsonl"); + await fs.writeFile(file, "{broken\n{also broken\n", "utf-8"); + + await repairSessionFileIfNeeded({ sessionFile: file }); + + expect(vi.mocked(acquireSessionWriteLock)).toHaveBeenCalledTimes(1); + }); }); diff --git a/src/agent/session/session-file-repair.ts b/src/agent/session/session-file-repair.ts index cb284f23..6e978f7e 100644 --- a/src/agent/session/session-file-repair.ts +++ b/src/agent/session/session-file-repair.ts @@ -1,5 +1,6 @@ import fs from "node:fs/promises"; import path from "node:path"; +import { acquireSessionWriteLock } from "./session-write-lock.js"; type RepairReport = { repaired: boolean; @@ -19,78 +20,83 @@ export async function repairSessionFileIfNeeded(params: { return { repaired: false, droppedLines: 0, reason: "missing session file" }; } - let content: string; + const lock = await acquireSessionWriteLock({ sessionFile }); try { - content = await fs.readFile(sessionFile, "utf-8"); - } catch (err) { - const code = (err as { code?: unknown } | undefined)?.code; - if (code === "ENOENT") { - return { repaired: false, droppedLines: 0, reason: "missing session file" }; - } - const reason = `failed to read session file: ${err instanceof Error ? err.message : "unknown error"}`; - params.warn?.(`session file repair skipped: ${reason} (${path.basename(sessionFile)})`); - return { repaired: false, droppedLines: 0, reason }; - } - - const lines = content.split(/\r?\n/); - const entries: unknown[] = []; - let droppedLines = 0; - - for (const line of lines) { - if (!line.trim()) { - continue; - } + let content: string; try { - const entry = JSON.parse(line); - entries.push(entry); - } catch { - droppedLines += 1; + content = await fs.readFile(sessionFile, "utf-8"); + } catch (err) { + const code = (err as { code?: unknown } | undefined)?.code; + if (code === "ENOENT") { + return { repaired: false, droppedLines: 0, reason: "missing session file" }; + } + const reason = `failed to read session file: ${err instanceof Error ? err.message : "unknown error"}`; + params.warn?.(`session file repair skipped: ${reason} (${path.basename(sessionFile)})`); + return { repaired: false, droppedLines: 0, reason }; } - } - if (entries.length === 0) { - return { repaired: false, droppedLines, reason: "empty session file" }; - } + const lines = content.split(/\r?\n/); + const entries: unknown[] = []; + let droppedLines = 0; - if (droppedLines === 0) { - return { repaired: false, droppedLines: 0 }; - } - - const cleaned = `${entries.map((entry) => JSON.stringify(entry)).join("\n")}\n`; - const backupPath = `${sessionFile}.bak-${process.pid}-${Date.now()}`; - const tmpPath = `${sessionFile}.repair-${process.pid}-${Date.now()}.tmp`; - try { - const stat = await fs.stat(sessionFile).catch(() => null); - await fs.writeFile(backupPath, content, "utf-8"); - if (stat) { - await fs.chmod(backupPath, stat.mode); + for (const line of lines) { + if (!line.trim()) { + continue; + } + try { + const entry = JSON.parse(line); + entries.push(entry); + } catch { + droppedLines += 1; + } } - await fs.writeFile(tmpPath, cleaned, "utf-8"); - if (stat) { - await fs.chmod(tmpPath, stat.mode); + + if (entries.length === 0) { + return { repaired: false, droppedLines, reason: "empty session file" }; } - await fs.rename(tmpPath, sessionFile); - } catch (err) { + + if (droppedLines === 0) { + return { repaired: false, droppedLines: 0 }; + } + + const cleaned = `${entries.map((entry) => JSON.stringify(entry)).join("\n")}\n`; + const backupPath = `${sessionFile}.bak-${process.pid}-${Date.now()}`; + const tmpPath = `${sessionFile}.repair-${process.pid}-${Date.now()}.tmp`; try { - await fs.unlink(tmpPath); - } catch (cleanupErr) { - params.warn?.( - `session file repair cleanup failed: ${cleanupErr instanceof Error ? cleanupErr.message : "unknown error"} (${path.basename( - tmpPath, - )})`, - ); + const stat = await fs.stat(sessionFile).catch(() => null); + await fs.writeFile(backupPath, content, "utf-8"); + if (stat) { + await fs.chmod(backupPath, stat.mode); + } + await fs.writeFile(tmpPath, cleaned, "utf-8"); + if (stat) { + await fs.chmod(tmpPath, stat.mode); + } + await fs.rename(tmpPath, sessionFile); + } catch (err) { + try { + await fs.unlink(tmpPath); + } catch (cleanupErr) { + params.warn?.( + `session file repair cleanup failed: ${cleanupErr instanceof Error ? cleanupErr.message : "unknown error"} (${path.basename( + tmpPath, + )})`, + ); + } + return { + repaired: false, + droppedLines, + reason: `repair failed: ${err instanceof Error ? err.message : "unknown error"}`, + }; } - return { - repaired: false, - droppedLines, - reason: `repair failed: ${err instanceof Error ? err.message : "unknown error"}`, - }; - } - params.warn?.( - `session file repaired: dropped ${droppedLines} malformed line(s) (${path.basename( - sessionFile, - )})`, - ); - return { repaired: true, droppedLines, backupPath }; + params.warn?.( + `session file repaired: dropped ${droppedLines} malformed line(s) (${path.basename( + sessionFile, + )})`, + ); + return { repaired: true, droppedLines, backupPath }; + } finally { + await lock.release(); + } } diff --git a/src/agent/session/session-write-lock.test.ts b/src/agent/session/session-write-lock.test.ts index 1eb3f024..06c4f587 100644 --- a/src/agent/session/session-write-lock.test.ts +++ b/src/agent/session/session-write-lock.test.ts @@ -72,6 +72,40 @@ describe("acquireSessionWriteLock", () => { } }); + it("does not delete recent lock files with invalid payloads", async () => { + const root = await fs.mkdtemp(path.join(os.tmpdir(), "multica-lock-")); + try { + const sessionFile = path.join(root, "sessions.json"); + const lockPath = `${sessionFile}.lock`; + await fs.writeFile(lockPath, "{", "utf8"); + + await expect( + acquireSessionWriteLock({ sessionFile, timeoutMs: 200, staleMs: 60_000 }), + ).rejects.toThrow(/timeout/); + + await expect(fs.access(lockPath)).resolves.toBeUndefined(); + } finally { + await fs.rm(root, { recursive: true, force: true }); + } + }); + + it("reclaims invalid lock files when mtime is stale", async () => { + const root = await fs.mkdtemp(path.join(os.tmpdir(), "multica-lock-")); + try { + const sessionFile = path.join(root, "sessions.json"); + const lockPath = `${sessionFile}.lock`; + await fs.writeFile(lockPath, "{", "utf8"); + const old = new Date(Date.now() - 60_000); + await fs.utimes(lockPath, old, old); + + const lock = await acquireSessionWriteLock({ sessionFile, timeoutMs: 500, staleMs: 10 }); + + await lock.release(); + } finally { + await fs.rm(root, { recursive: true, force: true }); + } + }); + it("removes held locks on termination signals", async () => { const signals = ["SIGINT", "SIGTERM", "SIGQUIT", "SIGABRT"] as const; for (const signal of signals) { diff --git a/src/agent/session/session-write-lock.ts b/src/agent/session/session-write-lock.ts index 7335abaf..c6364e32 100644 --- a/src/agent/session/session-write-lock.ts +++ b/src/agent/session/session-write-lock.ts @@ -109,6 +109,15 @@ async function readLockPayload(lockPath: string): Promise { + try { + const stat = await fs.stat(lockPath); + return Date.now() - stat.mtimeMs; + } catch { + return null; + } +} + export async function acquireSessionWriteLock(params: { sessionFile: string; timeoutMs?: number; @@ -183,12 +192,21 @@ export async function acquireSessionWriteLock(params: { throw err; } const payload = await readLockPayload(lockPath); - const createdAt = payload?.createdAt ? Date.parse(payload.createdAt) : NaN; - const stale = !Number.isFinite(createdAt) || Date.now() - createdAt > staleMs; - const alive = payload?.pid ? isAlive(payload.pid) : false; - if (stale || !alive) { - await fs.rm(lockPath, { force: true }); - continue; + if (payload) { + const createdAt = payload.createdAt ? Date.parse(payload.createdAt) : NaN; + const stale = !Number.isFinite(createdAt) || Date.now() - createdAt > staleMs; + const alive = payload.pid ? isAlive(payload.pid) : false; + if (stale || !alive) { + await fs.rm(lockPath, { force: true }); + continue; + } + } else { + const ageMs = await getLockAgeMs(lockPath); + const stale = ageMs !== null && ageMs > staleMs; + if (stale) { + await fs.rm(lockPath, { force: true }); + continue; + } } const delay = Math.min(1000, 50 * attempt);