fix(session): harden write lock and repair
This commit is contained in:
parent
f1334349ca
commit
f2ca86aa1f
4 changed files with 149 additions and 70 deletions
|
|
@ -3,6 +3,17 @@ import os from "node:os";
|
|||
import path from "node:path";
|
||||
import { describe, expect, it, vi } from "vitest";
|
||||
import { repairSessionFileIfNeeded } from "./session-file-repair.js";
|
||||
import { acquireSessionWriteLock } from "./session-write-lock.js";
|
||||
|
||||
vi.mock("./session-write-lock.js", async () => {
|
||||
const actual = await vi.importActual<typeof import("./session-write-lock.js")>(
|
||||
"./session-write-lock.js",
|
||||
);
|
||||
return {
|
||||
...actual,
|
||||
acquireSessionWriteLock: vi.fn(actual.acquireSessionWriteLock),
|
||||
};
|
||||
});
|
||||
|
||||
describe("repairSessionFileIfNeeded", () => {
|
||||
it("rewrites session files that contain malformed lines", async () => {
|
||||
|
|
@ -77,4 +88,14 @@ describe("repairSessionFileIfNeeded", () => {
|
|||
expect(result.reason).toContain("failed to read session file");
|
||||
expect(warn).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("acquires a write lock while repairing", async () => {
|
||||
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "multica-session-repair-"));
|
||||
const file = path.join(dir, "session.jsonl");
|
||||
await fs.writeFile(file, "{broken\n{also broken\n", "utf-8");
|
||||
|
||||
await repairSessionFileIfNeeded({ sessionFile: file });
|
||||
|
||||
expect(vi.mocked(acquireSessionWriteLock)).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
});
|
||||
|
|
|
|||
|
|
@ -1,5 +1,6 @@
|
|||
import fs from "node:fs/promises";
|
||||
import path from "node:path";
|
||||
import { acquireSessionWriteLock } from "./session-write-lock.js";
|
||||
|
||||
type RepairReport = {
|
||||
repaired: boolean;
|
||||
|
|
@ -19,78 +20,83 @@ export async function repairSessionFileIfNeeded(params: {
|
|||
return { repaired: false, droppedLines: 0, reason: "missing session file" };
|
||||
}
|
||||
|
||||
let content: string;
|
||||
const lock = await acquireSessionWriteLock({ sessionFile });
|
||||
try {
|
||||
content = await fs.readFile(sessionFile, "utf-8");
|
||||
} catch (err) {
|
||||
const code = (err as { code?: unknown } | undefined)?.code;
|
||||
if (code === "ENOENT") {
|
||||
return { repaired: false, droppedLines: 0, reason: "missing session file" };
|
||||
}
|
||||
const reason = `failed to read session file: ${err instanceof Error ? err.message : "unknown error"}`;
|
||||
params.warn?.(`session file repair skipped: ${reason} (${path.basename(sessionFile)})`);
|
||||
return { repaired: false, droppedLines: 0, reason };
|
||||
}
|
||||
|
||||
const lines = content.split(/\r?\n/);
|
||||
const entries: unknown[] = [];
|
||||
let droppedLines = 0;
|
||||
|
||||
for (const line of lines) {
|
||||
if (!line.trim()) {
|
||||
continue;
|
||||
}
|
||||
let content: string;
|
||||
try {
|
||||
const entry = JSON.parse(line);
|
||||
entries.push(entry);
|
||||
} catch {
|
||||
droppedLines += 1;
|
||||
content = await fs.readFile(sessionFile, "utf-8");
|
||||
} catch (err) {
|
||||
const code = (err as { code?: unknown } | undefined)?.code;
|
||||
if (code === "ENOENT") {
|
||||
return { repaired: false, droppedLines: 0, reason: "missing session file" };
|
||||
}
|
||||
const reason = `failed to read session file: ${err instanceof Error ? err.message : "unknown error"}`;
|
||||
params.warn?.(`session file repair skipped: ${reason} (${path.basename(sessionFile)})`);
|
||||
return { repaired: false, droppedLines: 0, reason };
|
||||
}
|
||||
}
|
||||
|
||||
if (entries.length === 0) {
|
||||
return { repaired: false, droppedLines, reason: "empty session file" };
|
||||
}
|
||||
const lines = content.split(/\r?\n/);
|
||||
const entries: unknown[] = [];
|
||||
let droppedLines = 0;
|
||||
|
||||
if (droppedLines === 0) {
|
||||
return { repaired: false, droppedLines: 0 };
|
||||
}
|
||||
|
||||
const cleaned = `${entries.map((entry) => JSON.stringify(entry)).join("\n")}\n`;
|
||||
const backupPath = `${sessionFile}.bak-${process.pid}-${Date.now()}`;
|
||||
const tmpPath = `${sessionFile}.repair-${process.pid}-${Date.now()}.tmp`;
|
||||
try {
|
||||
const stat = await fs.stat(sessionFile).catch(() => null);
|
||||
await fs.writeFile(backupPath, content, "utf-8");
|
||||
if (stat) {
|
||||
await fs.chmod(backupPath, stat.mode);
|
||||
for (const line of lines) {
|
||||
if (!line.trim()) {
|
||||
continue;
|
||||
}
|
||||
try {
|
||||
const entry = JSON.parse(line);
|
||||
entries.push(entry);
|
||||
} catch {
|
||||
droppedLines += 1;
|
||||
}
|
||||
}
|
||||
await fs.writeFile(tmpPath, cleaned, "utf-8");
|
||||
if (stat) {
|
||||
await fs.chmod(tmpPath, stat.mode);
|
||||
|
||||
if (entries.length === 0) {
|
||||
return { repaired: false, droppedLines, reason: "empty session file" };
|
||||
}
|
||||
await fs.rename(tmpPath, sessionFile);
|
||||
} catch (err) {
|
||||
|
||||
if (droppedLines === 0) {
|
||||
return { repaired: false, droppedLines: 0 };
|
||||
}
|
||||
|
||||
const cleaned = `${entries.map((entry) => JSON.stringify(entry)).join("\n")}\n`;
|
||||
const backupPath = `${sessionFile}.bak-${process.pid}-${Date.now()}`;
|
||||
const tmpPath = `${sessionFile}.repair-${process.pid}-${Date.now()}.tmp`;
|
||||
try {
|
||||
await fs.unlink(tmpPath);
|
||||
} catch (cleanupErr) {
|
||||
params.warn?.(
|
||||
`session file repair cleanup failed: ${cleanupErr instanceof Error ? cleanupErr.message : "unknown error"} (${path.basename(
|
||||
tmpPath,
|
||||
)})`,
|
||||
);
|
||||
const stat = await fs.stat(sessionFile).catch(() => null);
|
||||
await fs.writeFile(backupPath, content, "utf-8");
|
||||
if (stat) {
|
||||
await fs.chmod(backupPath, stat.mode);
|
||||
}
|
||||
await fs.writeFile(tmpPath, cleaned, "utf-8");
|
||||
if (stat) {
|
||||
await fs.chmod(tmpPath, stat.mode);
|
||||
}
|
||||
await fs.rename(tmpPath, sessionFile);
|
||||
} catch (err) {
|
||||
try {
|
||||
await fs.unlink(tmpPath);
|
||||
} catch (cleanupErr) {
|
||||
params.warn?.(
|
||||
`session file repair cleanup failed: ${cleanupErr instanceof Error ? cleanupErr.message : "unknown error"} (${path.basename(
|
||||
tmpPath,
|
||||
)})`,
|
||||
);
|
||||
}
|
||||
return {
|
||||
repaired: false,
|
||||
droppedLines,
|
||||
reason: `repair failed: ${err instanceof Error ? err.message : "unknown error"}`,
|
||||
};
|
||||
}
|
||||
return {
|
||||
repaired: false,
|
||||
droppedLines,
|
||||
reason: `repair failed: ${err instanceof Error ? err.message : "unknown error"}`,
|
||||
};
|
||||
}
|
||||
|
||||
params.warn?.(
|
||||
`session file repaired: dropped ${droppedLines} malformed line(s) (${path.basename(
|
||||
sessionFile,
|
||||
)})`,
|
||||
);
|
||||
return { repaired: true, droppedLines, backupPath };
|
||||
params.warn?.(
|
||||
`session file repaired: dropped ${droppedLines} malformed line(s) (${path.basename(
|
||||
sessionFile,
|
||||
)})`,
|
||||
);
|
||||
return { repaired: true, droppedLines, backupPath };
|
||||
} finally {
|
||||
await lock.release();
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -72,6 +72,40 @@ describe("acquireSessionWriteLock", () => {
|
|||
}
|
||||
});
|
||||
|
||||
it("does not delete recent lock files with invalid payloads", async () => {
|
||||
const root = await fs.mkdtemp(path.join(os.tmpdir(), "multica-lock-"));
|
||||
try {
|
||||
const sessionFile = path.join(root, "sessions.json");
|
||||
const lockPath = `${sessionFile}.lock`;
|
||||
await fs.writeFile(lockPath, "{", "utf8");
|
||||
|
||||
await expect(
|
||||
acquireSessionWriteLock({ sessionFile, timeoutMs: 200, staleMs: 60_000 }),
|
||||
).rejects.toThrow(/timeout/);
|
||||
|
||||
await expect(fs.access(lockPath)).resolves.toBeUndefined();
|
||||
} finally {
|
||||
await fs.rm(root, { recursive: true, force: true });
|
||||
}
|
||||
});
|
||||
|
||||
it("reclaims invalid lock files when mtime is stale", async () => {
|
||||
const root = await fs.mkdtemp(path.join(os.tmpdir(), "multica-lock-"));
|
||||
try {
|
||||
const sessionFile = path.join(root, "sessions.json");
|
||||
const lockPath = `${sessionFile}.lock`;
|
||||
await fs.writeFile(lockPath, "{", "utf8");
|
||||
const old = new Date(Date.now() - 60_000);
|
||||
await fs.utimes(lockPath, old, old);
|
||||
|
||||
const lock = await acquireSessionWriteLock({ sessionFile, timeoutMs: 500, staleMs: 10 });
|
||||
|
||||
await lock.release();
|
||||
} finally {
|
||||
await fs.rm(root, { recursive: true, force: true });
|
||||
}
|
||||
});
|
||||
|
||||
it("removes held locks on termination signals", async () => {
|
||||
const signals = ["SIGINT", "SIGTERM", "SIGQUIT", "SIGABRT"] as const;
|
||||
for (const signal of signals) {
|
||||
|
|
|
|||
|
|
@ -109,6 +109,15 @@ async function readLockPayload(lockPath: string): Promise<LockFilePayload | null
|
|||
}
|
||||
}
|
||||
|
||||
async function getLockAgeMs(lockPath: string): Promise<number | null> {
|
||||
try {
|
||||
const stat = await fs.stat(lockPath);
|
||||
return Date.now() - stat.mtimeMs;
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
export async function acquireSessionWriteLock(params: {
|
||||
sessionFile: string;
|
||||
timeoutMs?: number;
|
||||
|
|
@ -183,12 +192,21 @@ export async function acquireSessionWriteLock(params: {
|
|||
throw err;
|
||||
}
|
||||
const payload = await readLockPayload(lockPath);
|
||||
const createdAt = payload?.createdAt ? Date.parse(payload.createdAt) : NaN;
|
||||
const stale = !Number.isFinite(createdAt) || Date.now() - createdAt > staleMs;
|
||||
const alive = payload?.pid ? isAlive(payload.pid) : false;
|
||||
if (stale || !alive) {
|
||||
await fs.rm(lockPath, { force: true });
|
||||
continue;
|
||||
if (payload) {
|
||||
const createdAt = payload.createdAt ? Date.parse(payload.createdAt) : NaN;
|
||||
const stale = !Number.isFinite(createdAt) || Date.now() - createdAt > staleMs;
|
||||
const alive = payload.pid ? isAlive(payload.pid) : false;
|
||||
if (stale || !alive) {
|
||||
await fs.rm(lockPath, { force: true });
|
||||
continue;
|
||||
}
|
||||
} else {
|
||||
const ageMs = await getLockAgeMs(lockPath);
|
||||
const stale = ageMs !== null && ageMs > staleMs;
|
||||
if (stale) {
|
||||
await fs.rm(lockPath, { force: true });
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
const delay = Math.min(1000, 50 * attempt);
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue