feat(session): add write lock, file repair, and transcript repair modules
Port from OpenClaw: - session-write-lock: file-level write lock with atomic creation, reference counting, stale lock detection, and process cleanup handlers - session-file-repair: auto-detect and repair malformed JSONL lines with backup and atomic rename - session-transcript-repair: fix tool call/result pairing issues including displaced results, duplicates, orphans, and missing inputs Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
parent
09f7e93496
commit
37e5b2ce7f
6 changed files with 989 additions and 0 deletions
80
src/agent/session/session-file-repair.test.ts
Normal file
80
src/agent/session/session-file-repair.test.ts
Normal file
|
|
@ -0,0 +1,80 @@
|
|||
import fs from "node:fs/promises";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
import { describe, expect, it, vi } from "vitest";
|
||||
import { repairSessionFileIfNeeded } from "./session-file-repair.js";
|
||||
|
||||
describe("repairSessionFileIfNeeded", () => {
|
||||
it("rewrites session files that contain malformed lines", async () => {
|
||||
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "multica-session-repair-"));
|
||||
const file = path.join(dir, "session.jsonl");
|
||||
const meta = {
|
||||
type: "meta",
|
||||
meta: { provider: "kimi", model: "moonshot-v1-128k" },
|
||||
timestamp: Date.now(),
|
||||
};
|
||||
const message = {
|
||||
type: "message",
|
||||
message: { role: "user", content: "hello" },
|
||||
timestamp: Date.now(),
|
||||
};
|
||||
|
||||
const content = `${JSON.stringify(meta)}\n${JSON.stringify(message)}\n{"type":"message"`;
|
||||
await fs.writeFile(file, content, "utf-8");
|
||||
|
||||
const result = await repairSessionFileIfNeeded({ sessionFile: file });
|
||||
expect(result.repaired).toBe(true);
|
||||
expect(result.droppedLines).toBe(1);
|
||||
expect(result.backupPath).toBeTruthy();
|
||||
|
||||
const repaired = await fs.readFile(file, "utf-8");
|
||||
expect(repaired.trim().split("\n")).toHaveLength(2);
|
||||
|
||||
if (result.backupPath) {
|
||||
const backup = await fs.readFile(result.backupPath, "utf-8");
|
||||
expect(backup).toBe(content);
|
||||
}
|
||||
});
|
||||
|
||||
it("does not drop CRLF-terminated JSONL lines", async () => {
|
||||
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "multica-session-repair-"));
|
||||
const file = path.join(dir, "session.jsonl");
|
||||
const meta = {
|
||||
type: "meta",
|
||||
meta: { provider: "kimi", model: "moonshot-v1-128k" },
|
||||
timestamp: Date.now(),
|
||||
};
|
||||
const message = {
|
||||
type: "message",
|
||||
message: { role: "user", content: "hello" },
|
||||
timestamp: Date.now(),
|
||||
};
|
||||
const content = `${JSON.stringify(meta)}\r\n${JSON.stringify(message)}\r\n`;
|
||||
await fs.writeFile(file, content, "utf-8");
|
||||
|
||||
const result = await repairSessionFileIfNeeded({ sessionFile: file });
|
||||
expect(result.repaired).toBe(false);
|
||||
expect(result.droppedLines).toBe(0);
|
||||
});
|
||||
|
||||
it("returns reason when file is empty after dropping all lines", async () => {
|
||||
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "multica-session-repair-"));
|
||||
const file = path.join(dir, "session.jsonl");
|
||||
await fs.writeFile(file, "{broken\n{also broken\n", "utf-8");
|
||||
|
||||
const result = await repairSessionFileIfNeeded({ sessionFile: file });
|
||||
expect(result.repaired).toBe(false);
|
||||
expect(result.reason).toBe("empty session file");
|
||||
});
|
||||
|
||||
it("returns a detailed reason when read errors are not ENOENT", async () => {
|
||||
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "multica-session-repair-"));
|
||||
const warn = vi.fn();
|
||||
|
||||
const result = await repairSessionFileIfNeeded({ sessionFile: dir, warn });
|
||||
|
||||
expect(result.repaired).toBe(false);
|
||||
expect(result.reason).toContain("failed to read session file");
|
||||
expect(warn).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
});
|
||||
96
src/agent/session/session-file-repair.ts
Normal file
96
src/agent/session/session-file-repair.ts
Normal file
|
|
@ -0,0 +1,96 @@
|
|||
import fs from "node:fs/promises";
|
||||
import path from "node:path";
|
||||
|
||||
type RepairReport = {
|
||||
repaired: boolean;
|
||||
droppedLines: number;
|
||||
backupPath?: string;
|
||||
reason?: string;
|
||||
};
|
||||
|
||||
export type { RepairReport };
|
||||
|
||||
export async function repairSessionFileIfNeeded(params: {
|
||||
sessionFile: string;
|
||||
warn?: (message: string) => void;
|
||||
}): Promise<RepairReport> {
|
||||
const sessionFile = params.sessionFile.trim();
|
||||
if (!sessionFile) {
|
||||
return { repaired: false, droppedLines: 0, reason: "missing session file" };
|
||||
}
|
||||
|
||||
let content: string;
|
||||
try {
|
||||
content = await fs.readFile(sessionFile, "utf-8");
|
||||
} catch (err) {
|
||||
const code = (err as { code?: unknown } | undefined)?.code;
|
||||
if (code === "ENOENT") {
|
||||
return { repaired: false, droppedLines: 0, reason: "missing session file" };
|
||||
}
|
||||
const reason = `failed to read session file: ${err instanceof Error ? err.message : "unknown error"}`;
|
||||
params.warn?.(`session file repair skipped: ${reason} (${path.basename(sessionFile)})`);
|
||||
return { repaired: false, droppedLines: 0, reason };
|
||||
}
|
||||
|
||||
const lines = content.split(/\r?\n/);
|
||||
const entries: unknown[] = [];
|
||||
let droppedLines = 0;
|
||||
|
||||
for (const line of lines) {
|
||||
if (!line.trim()) {
|
||||
continue;
|
||||
}
|
||||
try {
|
||||
const entry = JSON.parse(line);
|
||||
entries.push(entry);
|
||||
} catch {
|
||||
droppedLines += 1;
|
||||
}
|
||||
}
|
||||
|
||||
if (entries.length === 0) {
|
||||
return { repaired: false, droppedLines, reason: "empty session file" };
|
||||
}
|
||||
|
||||
if (droppedLines === 0) {
|
||||
return { repaired: false, droppedLines: 0 };
|
||||
}
|
||||
|
||||
const cleaned = `${entries.map((entry) => JSON.stringify(entry)).join("\n")}\n`;
|
||||
const backupPath = `${sessionFile}.bak-${process.pid}-${Date.now()}`;
|
||||
const tmpPath = `${sessionFile}.repair-${process.pid}-${Date.now()}.tmp`;
|
||||
try {
|
||||
const stat = await fs.stat(sessionFile).catch(() => null);
|
||||
await fs.writeFile(backupPath, content, "utf-8");
|
||||
if (stat) {
|
||||
await fs.chmod(backupPath, stat.mode);
|
||||
}
|
||||
await fs.writeFile(tmpPath, cleaned, "utf-8");
|
||||
if (stat) {
|
||||
await fs.chmod(tmpPath, stat.mode);
|
||||
}
|
||||
await fs.rename(tmpPath, sessionFile);
|
||||
} catch (err) {
|
||||
try {
|
||||
await fs.unlink(tmpPath);
|
||||
} catch (cleanupErr) {
|
||||
params.warn?.(
|
||||
`session file repair cleanup failed: ${cleanupErr instanceof Error ? cleanupErr.message : "unknown error"} (${path.basename(
|
||||
tmpPath,
|
||||
)})`,
|
||||
);
|
||||
}
|
||||
return {
|
||||
repaired: false,
|
||||
droppedLines,
|
||||
reason: `repair failed: ${err instanceof Error ? err.message : "unknown error"}`,
|
||||
};
|
||||
}
|
||||
|
||||
params.warn?.(
|
||||
`session file repaired: dropped ${droppedLines} malformed line(s) (${path.basename(
|
||||
sessionFile,
|
||||
)})`,
|
||||
);
|
||||
return { repaired: true, droppedLines, backupPath };
|
||||
}
|
||||
150
src/agent/session/session-transcript-repair.test.ts
Normal file
150
src/agent/session/session-transcript-repair.test.ts
Normal file
|
|
@ -0,0 +1,150 @@
|
|||
import type { AgentMessage } from "@mariozechner/pi-agent-core";
|
||||
import { describe, expect, it } from "vitest";
|
||||
import {
|
||||
sanitizeToolCallInputs,
|
||||
sanitizeToolUseResultPairing,
|
||||
} from "./session-transcript-repair.js";
|
||||
|
||||
describe("sanitizeToolUseResultPairing", () => {
|
||||
it("moves tool results directly after tool calls and inserts missing results", () => {
|
||||
const input = [
|
||||
{
|
||||
role: "assistant",
|
||||
content: [
|
||||
{ type: "toolCall", id: "call_1", name: "read", arguments: {} },
|
||||
{ type: "toolCall", id: "call_2", name: "exec", arguments: {} },
|
||||
],
|
||||
},
|
||||
{ role: "user", content: "user message that should come after tool use" },
|
||||
{
|
||||
role: "toolResult",
|
||||
toolCallId: "call_2",
|
||||
toolName: "exec",
|
||||
content: [{ type: "text", text: "ok" }],
|
||||
isError: false,
|
||||
},
|
||||
] satisfies AgentMessage[];
|
||||
|
||||
const out = sanitizeToolUseResultPairing(input);
|
||||
expect(out[0]?.role).toBe("assistant");
|
||||
expect(out[1]?.role).toBe("toolResult");
|
||||
expect((out[1] as { toolCallId?: string }).toolCallId).toBe("call_1");
|
||||
expect(out[2]?.role).toBe("toolResult");
|
||||
expect((out[2] as { toolCallId?: string }).toolCallId).toBe("call_2");
|
||||
expect(out[3]?.role).toBe("user");
|
||||
});
|
||||
|
||||
it("drops duplicate tool results for the same id within a span", () => {
|
||||
const input = [
|
||||
{
|
||||
role: "assistant",
|
||||
content: [{ type: "toolCall", id: "call_1", name: "read", arguments: {} }],
|
||||
},
|
||||
{
|
||||
role: "toolResult",
|
||||
toolCallId: "call_1",
|
||||
toolName: "read",
|
||||
content: [{ type: "text", text: "first" }],
|
||||
isError: false,
|
||||
},
|
||||
{
|
||||
role: "toolResult",
|
||||
toolCallId: "call_1",
|
||||
toolName: "read",
|
||||
content: [{ type: "text", text: "second" }],
|
||||
isError: false,
|
||||
},
|
||||
{ role: "user", content: "ok" },
|
||||
] satisfies AgentMessage[];
|
||||
|
||||
const out = sanitizeToolUseResultPairing(input);
|
||||
expect(out.filter((m) => m.role === "toolResult")).toHaveLength(1);
|
||||
});
|
||||
|
||||
it("drops duplicate tool results for the same id across the transcript", () => {
|
||||
const input = [
|
||||
{
|
||||
role: "assistant",
|
||||
content: [{ type: "toolCall", id: "call_1", name: "read", arguments: {} }],
|
||||
},
|
||||
{
|
||||
role: "toolResult",
|
||||
toolCallId: "call_1",
|
||||
toolName: "read",
|
||||
content: [{ type: "text", text: "first" }],
|
||||
isError: false,
|
||||
},
|
||||
{ role: "assistant", content: [{ type: "text", text: "ok" }] },
|
||||
{
|
||||
role: "toolResult",
|
||||
toolCallId: "call_1",
|
||||
toolName: "read",
|
||||
content: [{ type: "text", text: "second (duplicate)" }],
|
||||
isError: false,
|
||||
},
|
||||
] satisfies AgentMessage[];
|
||||
|
||||
const out = sanitizeToolUseResultPairing(input);
|
||||
const results = out.filter((m) => m.role === "toolResult") as Array<{
|
||||
toolCallId?: string;
|
||||
}>;
|
||||
expect(results).toHaveLength(1);
|
||||
expect(results[0]?.toolCallId).toBe("call_1");
|
||||
});
|
||||
|
||||
it("drops orphan tool results that do not match any tool call", () => {
|
||||
const input = [
|
||||
{ role: "user", content: "hello" },
|
||||
{
|
||||
role: "toolResult",
|
||||
toolCallId: "call_orphan",
|
||||
toolName: "read",
|
||||
content: [{ type: "text", text: "orphan" }],
|
||||
isError: false,
|
||||
},
|
||||
{
|
||||
role: "assistant",
|
||||
content: [{ type: "text", text: "ok" }],
|
||||
},
|
||||
] satisfies AgentMessage[];
|
||||
|
||||
const out = sanitizeToolUseResultPairing(input);
|
||||
expect(out.some((m) => m.role === "toolResult")).toBe(false);
|
||||
expect(out.map((m) => m.role)).toEqual(["user", "assistant"]);
|
||||
});
|
||||
});
|
||||
|
||||
describe("sanitizeToolCallInputs", () => {
|
||||
it("drops tool calls missing input or arguments", () => {
|
||||
const input: AgentMessage[] = [
|
||||
{
|
||||
role: "assistant",
|
||||
content: [{ type: "toolCall", id: "call_1", name: "read" }],
|
||||
},
|
||||
{ role: "user", content: "hello" },
|
||||
];
|
||||
|
||||
const out = sanitizeToolCallInputs(input);
|
||||
expect(out.map((m) => m.role)).toEqual(["user"]);
|
||||
});
|
||||
|
||||
it("keeps valid tool calls and preserves text blocks", () => {
|
||||
const input: AgentMessage[] = [
|
||||
{
|
||||
role: "assistant",
|
||||
content: [
|
||||
{ type: "text", text: "before" },
|
||||
{ type: "toolUse", id: "call_ok", name: "read", input: { path: "a" } },
|
||||
{ type: "toolCall", id: "call_drop", name: "read" },
|
||||
],
|
||||
},
|
||||
];
|
||||
|
||||
const out = sanitizeToolCallInputs(input);
|
||||
const assistant = out[0] as Extract<AgentMessage, { role: "assistant" }>;
|
||||
const types = Array.isArray(assistant.content)
|
||||
? assistant.content.map((block) => (block as { type?: unknown }).type)
|
||||
: [];
|
||||
expect(types).toEqual(["text", "toolUse"]);
|
||||
});
|
||||
});
|
||||
295
src/agent/session/session-transcript-repair.ts
Normal file
295
src/agent/session/session-transcript-repair.ts
Normal file
|
|
@ -0,0 +1,295 @@
|
|||
import type { AgentMessage } from "@mariozechner/pi-agent-core";
|
||||
|
||||
type ToolCallLike = {
|
||||
id: string;
|
||||
name?: string;
|
||||
};
|
||||
|
||||
const TOOL_CALL_TYPES = new Set(["toolCall", "toolUse", "functionCall"]);
|
||||
|
||||
type ToolCallBlock = {
|
||||
type?: unknown;
|
||||
id?: unknown;
|
||||
name?: unknown;
|
||||
input?: unknown;
|
||||
arguments?: unknown;
|
||||
};
|
||||
|
||||
function extractToolCallsFromAssistant(
|
||||
msg: Extract<AgentMessage, { role: "assistant" }>,
|
||||
): ToolCallLike[] {
|
||||
const content = msg.content;
|
||||
if (!Array.isArray(content)) {
|
||||
return [];
|
||||
}
|
||||
|
||||
const toolCalls: ToolCallLike[] = [];
|
||||
for (const block of content) {
|
||||
if (!block || typeof block !== "object") {
|
||||
continue;
|
||||
}
|
||||
const rec = block as { type?: unknown; id?: unknown; name?: unknown };
|
||||
if (typeof rec.id !== "string" || !rec.id) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (rec.type === "toolCall" || rec.type === "toolUse" || rec.type === "functionCall") {
|
||||
toolCalls.push({
|
||||
id: rec.id,
|
||||
name: typeof rec.name === "string" ? rec.name : undefined,
|
||||
});
|
||||
}
|
||||
}
|
||||
return toolCalls;
|
||||
}
|
||||
|
||||
function isToolCallBlock(block: unknown): block is ToolCallBlock {
|
||||
if (!block || typeof block !== "object") {
|
||||
return false;
|
||||
}
|
||||
const type = (block as { type?: unknown }).type;
|
||||
return typeof type === "string" && TOOL_CALL_TYPES.has(type);
|
||||
}
|
||||
|
||||
function hasToolCallInput(block: ToolCallBlock): boolean {
|
||||
const hasInput = "input" in block ? block.input !== undefined && block.input !== null : false;
|
||||
const hasArguments =
|
||||
"arguments" in block ? block.arguments !== undefined && block.arguments !== null : false;
|
||||
return hasInput || hasArguments;
|
||||
}
|
||||
|
||||
function extractToolResultId(msg: Extract<AgentMessage, { role: "toolResult" }>): string | null {
|
||||
const toolCallId = (msg as { toolCallId?: unknown }).toolCallId;
|
||||
if (typeof toolCallId === "string" && toolCallId) {
|
||||
return toolCallId;
|
||||
}
|
||||
const toolUseId = (msg as { toolUseId?: unknown }).toolUseId;
|
||||
if (typeof toolUseId === "string" && toolUseId) {
|
||||
return toolUseId;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
function makeMissingToolResult(params: {
|
||||
toolCallId: string;
|
||||
toolName?: string;
|
||||
}): Extract<AgentMessage, { role: "toolResult" }> {
|
||||
return {
|
||||
role: "toolResult",
|
||||
toolCallId: params.toolCallId,
|
||||
toolName: params.toolName ?? "unknown",
|
||||
content: [
|
||||
{
|
||||
type: "text",
|
||||
text: "[multica] missing tool result in session history; inserted synthetic error result for transcript repair.",
|
||||
},
|
||||
],
|
||||
isError: true,
|
||||
timestamp: Date.now(),
|
||||
} as Extract<AgentMessage, { role: "toolResult" }>;
|
||||
}
|
||||
|
||||
export { makeMissingToolResult };
|
||||
|
||||
export type ToolCallInputRepairReport = {
|
||||
messages: AgentMessage[];
|
||||
droppedToolCalls: number;
|
||||
droppedAssistantMessages: number;
|
||||
};
|
||||
|
||||
export function repairToolCallInputs(messages: AgentMessage[]): ToolCallInputRepairReport {
|
||||
let droppedToolCalls = 0;
|
||||
let droppedAssistantMessages = 0;
|
||||
let changed = false;
|
||||
const out: AgentMessage[] = [];
|
||||
|
||||
for (const msg of messages) {
|
||||
if (!msg || typeof msg !== "object") {
|
||||
out.push(msg);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (msg.role !== "assistant" || !Array.isArray(msg.content)) {
|
||||
out.push(msg);
|
||||
continue;
|
||||
}
|
||||
|
||||
const nextContent = [];
|
||||
let droppedInMessage = 0;
|
||||
|
||||
for (const block of msg.content) {
|
||||
if (isToolCallBlock(block) && !hasToolCallInput(block)) {
|
||||
droppedToolCalls += 1;
|
||||
droppedInMessage += 1;
|
||||
changed = true;
|
||||
continue;
|
||||
}
|
||||
nextContent.push(block);
|
||||
}
|
||||
|
||||
if (droppedInMessage > 0) {
|
||||
if (nextContent.length === 0) {
|
||||
droppedAssistantMessages += 1;
|
||||
changed = true;
|
||||
continue;
|
||||
}
|
||||
out.push({ ...msg, content: nextContent });
|
||||
continue;
|
||||
}
|
||||
|
||||
out.push(msg);
|
||||
}
|
||||
|
||||
return {
|
||||
messages: changed ? out : messages,
|
||||
droppedToolCalls,
|
||||
droppedAssistantMessages,
|
||||
};
|
||||
}
|
||||
|
||||
export function sanitizeToolCallInputs(messages: AgentMessage[]): AgentMessage[] {
|
||||
return repairToolCallInputs(messages).messages;
|
||||
}
|
||||
|
||||
export function sanitizeToolUseResultPairing(messages: AgentMessage[]): AgentMessage[] {
|
||||
return repairToolUseResultPairing(messages).messages;
|
||||
}
|
||||
|
||||
export type ToolUseRepairReport = {
|
||||
messages: AgentMessage[];
|
||||
added: Array<Extract<AgentMessage, { role: "toolResult" }>>;
|
||||
droppedDuplicateCount: number;
|
||||
droppedOrphanCount: number;
|
||||
moved: boolean;
|
||||
};
|
||||
|
||||
export function repairToolUseResultPairing(messages: AgentMessage[]): ToolUseRepairReport {
|
||||
const out: AgentMessage[] = [];
|
||||
const added: Array<Extract<AgentMessage, { role: "toolResult" }>> = [];
|
||||
const seenToolResultIds = new Set<string>();
|
||||
let droppedDuplicateCount = 0;
|
||||
let droppedOrphanCount = 0;
|
||||
let moved = false;
|
||||
let changed = false;
|
||||
|
||||
const pushToolResult = (msg: Extract<AgentMessage, { role: "toolResult" }>) => {
|
||||
const id = extractToolResultId(msg);
|
||||
if (id && seenToolResultIds.has(id)) {
|
||||
droppedDuplicateCount += 1;
|
||||
changed = true;
|
||||
return;
|
||||
}
|
||||
if (id) {
|
||||
seenToolResultIds.add(id);
|
||||
}
|
||||
out.push(msg);
|
||||
};
|
||||
|
||||
for (let i = 0; i < messages.length; i += 1) {
|
||||
const msg = messages[i];
|
||||
if (!msg || typeof msg !== "object") {
|
||||
out.push(msg);
|
||||
continue;
|
||||
}
|
||||
|
||||
const role = (msg as { role?: unknown }).role;
|
||||
if (role !== "assistant") {
|
||||
if (role !== "toolResult") {
|
||||
out.push(msg);
|
||||
} else {
|
||||
droppedOrphanCount += 1;
|
||||
changed = true;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
const assistant = msg as Extract<AgentMessage, { role: "assistant" }>;
|
||||
const toolCalls = extractToolCallsFromAssistant(assistant);
|
||||
if (toolCalls.length === 0) {
|
||||
out.push(msg);
|
||||
continue;
|
||||
}
|
||||
|
||||
const toolCallIds = new Set(toolCalls.map((t) => t.id));
|
||||
|
||||
const spanResultsById = new Map<string, Extract<AgentMessage, { role: "toolResult" }>>();
|
||||
const remainder: AgentMessage[] = [];
|
||||
|
||||
let j = i + 1;
|
||||
for (; j < messages.length; j += 1) {
|
||||
const next = messages[j];
|
||||
if (!next || typeof next !== "object") {
|
||||
remainder.push(next);
|
||||
continue;
|
||||
}
|
||||
|
||||
const nextRole = (next as { role?: unknown }).role;
|
||||
if (nextRole === "assistant") {
|
||||
break;
|
||||
}
|
||||
|
||||
if (nextRole === "toolResult") {
|
||||
const toolResult = next as Extract<AgentMessage, { role: "toolResult" }>;
|
||||
const id = extractToolResultId(toolResult);
|
||||
if (id && toolCallIds.has(id)) {
|
||||
if (seenToolResultIds.has(id)) {
|
||||
droppedDuplicateCount += 1;
|
||||
changed = true;
|
||||
continue;
|
||||
}
|
||||
if (!spanResultsById.has(id)) {
|
||||
spanResultsById.set(id, toolResult);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
if (nextRole !== "toolResult") {
|
||||
remainder.push(next);
|
||||
} else {
|
||||
droppedOrphanCount += 1;
|
||||
changed = true;
|
||||
}
|
||||
}
|
||||
|
||||
out.push(msg);
|
||||
|
||||
if (spanResultsById.size > 0 && remainder.length > 0) {
|
||||
moved = true;
|
||||
changed = true;
|
||||
}
|
||||
|
||||
for (const call of toolCalls) {
|
||||
const existing = spanResultsById.get(call.id);
|
||||
if (existing) {
|
||||
pushToolResult(existing);
|
||||
} else {
|
||||
const missing = makeMissingToolResult({
|
||||
toolCallId: call.id,
|
||||
toolName: call.name,
|
||||
});
|
||||
added.push(missing);
|
||||
changed = true;
|
||||
pushToolResult(missing);
|
||||
}
|
||||
}
|
||||
|
||||
for (const rem of remainder) {
|
||||
if (!rem || typeof rem !== "object") {
|
||||
out.push(rem);
|
||||
continue;
|
||||
}
|
||||
out.push(rem);
|
||||
}
|
||||
i = j - 1;
|
||||
}
|
||||
|
||||
const changedOrMoved = changed || moved;
|
||||
return {
|
||||
messages: changedOrMoved ? out : messages,
|
||||
added,
|
||||
droppedDuplicateCount,
|
||||
droppedOrphanCount,
|
||||
moved: changedOrMoved,
|
||||
};
|
||||
}
|
||||
160
src/agent/session/session-write-lock.test.ts
Normal file
160
src/agent/session/session-write-lock.test.ts
Normal file
|
|
@ -0,0 +1,160 @@
|
|||
import fs from "node:fs/promises";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
import { describe, expect, it } from "vitest";
|
||||
import { __testing, acquireSessionWriteLock } from "./session-write-lock.js";
|
||||
|
||||
describe("acquireSessionWriteLock", () => {
|
||||
it("reuses locks across symlinked session paths", async () => {
|
||||
if (process.platform === "win32") {
|
||||
expect(true).toBe(true);
|
||||
return;
|
||||
}
|
||||
|
||||
const root = await fs.mkdtemp(path.join(os.tmpdir(), "multica-lock-"));
|
||||
try {
|
||||
const realDir = path.join(root, "real");
|
||||
const linkDir = path.join(root, "link");
|
||||
await fs.mkdir(realDir, { recursive: true });
|
||||
await fs.symlink(realDir, linkDir);
|
||||
|
||||
const sessionReal = path.join(realDir, "sessions.json");
|
||||
const sessionLink = path.join(linkDir, "sessions.json");
|
||||
|
||||
const lockA = await acquireSessionWriteLock({ sessionFile: sessionReal, timeoutMs: 500 });
|
||||
const lockB = await acquireSessionWriteLock({ sessionFile: sessionLink, timeoutMs: 500 });
|
||||
|
||||
await lockB.release();
|
||||
await lockA.release();
|
||||
} finally {
|
||||
await fs.rm(root, { recursive: true, force: true });
|
||||
}
|
||||
});
|
||||
|
||||
it("keeps the lock file until the last release", async () => {
|
||||
const root = await fs.mkdtemp(path.join(os.tmpdir(), "multica-lock-"));
|
||||
try {
|
||||
const sessionFile = path.join(root, "sessions.json");
|
||||
const lockPath = `${sessionFile}.lock`;
|
||||
|
||||
const lockA = await acquireSessionWriteLock({ sessionFile, timeoutMs: 500 });
|
||||
const lockB = await acquireSessionWriteLock({ sessionFile, timeoutMs: 500 });
|
||||
|
||||
await expect(fs.access(lockPath)).resolves.toBeUndefined();
|
||||
await lockA.release();
|
||||
await expect(fs.access(lockPath)).resolves.toBeUndefined();
|
||||
await lockB.release();
|
||||
await expect(fs.access(lockPath)).rejects.toThrow();
|
||||
} finally {
|
||||
await fs.rm(root, { recursive: true, force: true });
|
||||
}
|
||||
});
|
||||
|
||||
it("reclaims stale lock files", async () => {
|
||||
const root = await fs.mkdtemp(path.join(os.tmpdir(), "multica-lock-"));
|
||||
try {
|
||||
const sessionFile = path.join(root, "sessions.json");
|
||||
const lockPath = `${sessionFile}.lock`;
|
||||
await fs.writeFile(
|
||||
lockPath,
|
||||
JSON.stringify({ pid: 123456, createdAt: new Date(Date.now() - 60_000).toISOString() }),
|
||||
"utf8",
|
||||
);
|
||||
|
||||
const lock = await acquireSessionWriteLock({ sessionFile, timeoutMs: 500, staleMs: 10 });
|
||||
const raw = await fs.readFile(lockPath, "utf8");
|
||||
const payload = JSON.parse(raw) as { pid: number };
|
||||
|
||||
expect(payload.pid).toBe(process.pid);
|
||||
await lock.release();
|
||||
} finally {
|
||||
await fs.rm(root, { recursive: true, force: true });
|
||||
}
|
||||
});
|
||||
|
||||
it("removes held locks on termination signals", async () => {
|
||||
const signals = ["SIGINT", "SIGTERM", "SIGQUIT", "SIGABRT"] as const;
|
||||
for (const signal of signals) {
|
||||
const root = await fs.mkdtemp(path.join(os.tmpdir(), "multica-lock-cleanup-"));
|
||||
// Prevent the signal from actually killing the vitest worker
|
||||
const keepAlive = () => {};
|
||||
process.on(signal, keepAlive);
|
||||
try {
|
||||
const sessionFile = path.join(root, "sessions.json");
|
||||
const lockPath = `${sessionFile}.lock`;
|
||||
await acquireSessionWriteLock({ sessionFile, timeoutMs: 500 });
|
||||
|
||||
__testing.handleTerminationSignal(signal);
|
||||
|
||||
await expect(fs.stat(lockPath)).rejects.toThrow();
|
||||
} finally {
|
||||
process.off(signal, keepAlive);
|
||||
await fs.rm(root, { recursive: true, force: true });
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
it("registers cleanup for SIGQUIT and SIGABRT", () => {
|
||||
expect(__testing.cleanupSignals).toContain("SIGQUIT");
|
||||
expect(__testing.cleanupSignals).toContain("SIGABRT");
|
||||
});
|
||||
|
||||
it("cleans up locks on SIGINT without removing other handlers", async () => {
|
||||
const root = await fs.mkdtemp(path.join(os.tmpdir(), "multica-lock-"));
|
||||
const originalKill = process.kill.bind(process) as typeof process.kill;
|
||||
const killCalls: Array<NodeJS.Signals | undefined> = [];
|
||||
let otherHandlerCalled = false;
|
||||
|
||||
process.kill = ((pid: number, signal?: NodeJS.Signals) => {
|
||||
killCalls.push(signal);
|
||||
return true;
|
||||
}) as typeof process.kill;
|
||||
|
||||
const otherHandler = () => {
|
||||
otherHandlerCalled = true;
|
||||
};
|
||||
|
||||
process.on("SIGINT", otherHandler);
|
||||
|
||||
try {
|
||||
const sessionFile = path.join(root, "sessions.json");
|
||||
const lockPath = `${sessionFile}.lock`;
|
||||
await acquireSessionWriteLock({ sessionFile, timeoutMs: 500 });
|
||||
|
||||
process.emit("SIGINT");
|
||||
|
||||
await expect(fs.access(lockPath)).rejects.toThrow();
|
||||
expect(otherHandlerCalled).toBe(true);
|
||||
expect(killCalls).toEqual([]);
|
||||
} finally {
|
||||
process.off("SIGINT", otherHandler);
|
||||
process.kill = originalKill;
|
||||
await fs.rm(root, { recursive: true, force: true });
|
||||
}
|
||||
});
|
||||
|
||||
it("cleans up locks via releaseAllLocksSync", async () => {
|
||||
const root = await fs.mkdtemp(path.join(os.tmpdir(), "multica-lock-"));
|
||||
try {
|
||||
const sessionFile = path.join(root, "sessions.json");
|
||||
const lockPath = `${sessionFile}.lock`;
|
||||
await acquireSessionWriteLock({ sessionFile, timeoutMs: 500 });
|
||||
|
||||
__testing.releaseAllLocksSync();
|
||||
|
||||
await expect(fs.access(lockPath)).rejects.toThrow();
|
||||
} finally {
|
||||
await fs.rm(root, { recursive: true, force: true });
|
||||
}
|
||||
});
|
||||
|
||||
it("keeps other signal listeners registered", () => {
|
||||
const keepAlive = () => {};
|
||||
process.on("SIGINT", keepAlive);
|
||||
|
||||
__testing.handleTerminationSignal("SIGINT");
|
||||
|
||||
expect(process.listeners("SIGINT")).toContain(keepAlive);
|
||||
process.off("SIGINT", keepAlive);
|
||||
});
|
||||
});
|
||||
208
src/agent/session/session-write-lock.ts
Normal file
208
src/agent/session/session-write-lock.ts
Normal file
|
|
@ -0,0 +1,208 @@
|
|||
import fsSync from "node:fs";
|
||||
import fs from "node:fs/promises";
|
||||
import path from "node:path";
|
||||
|
||||
type LockFilePayload = {
|
||||
pid: number;
|
||||
createdAt: string;
|
||||
};
|
||||
|
||||
type HeldLock = {
|
||||
count: number;
|
||||
handle: fs.FileHandle;
|
||||
lockPath: string;
|
||||
};
|
||||
|
||||
const HELD_LOCKS = new Map<string, HeldLock>();
|
||||
const CLEANUP_SIGNALS = ["SIGINT", "SIGTERM", "SIGQUIT", "SIGABRT"] as const;
|
||||
type CleanupSignal = (typeof CLEANUP_SIGNALS)[number];
|
||||
const cleanupHandlers = new Map<CleanupSignal, () => void>();
|
||||
|
||||
function isAlive(pid: number): boolean {
|
||||
if (!Number.isFinite(pid) || pid <= 0) {
|
||||
return false;
|
||||
}
|
||||
try {
|
||||
process.kill(pid, 0);
|
||||
return true;
|
||||
} catch {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Synchronously release all held locks.
|
||||
* Used during process exit when async operations aren't reliable.
|
||||
*/
|
||||
function releaseAllLocksSync(): void {
|
||||
for (const [sessionFile, held] of HELD_LOCKS) {
|
||||
try {
|
||||
if (typeof held.handle.close === "function") {
|
||||
void held.handle.close().catch(() => {});
|
||||
}
|
||||
} catch {
|
||||
// Ignore errors during cleanup - best effort
|
||||
}
|
||||
try {
|
||||
fsSync.rmSync(held.lockPath, { force: true });
|
||||
} catch {
|
||||
// Ignore errors during cleanup - best effort
|
||||
}
|
||||
HELD_LOCKS.delete(sessionFile);
|
||||
}
|
||||
}
|
||||
|
||||
let cleanupRegistered = false;
|
||||
|
||||
function handleTerminationSignal(signal: CleanupSignal): void {
|
||||
releaseAllLocksSync();
|
||||
const shouldReraise = process.listenerCount(signal) === 1;
|
||||
if (shouldReraise) {
|
||||
const handler = cleanupHandlers.get(signal);
|
||||
if (handler) {
|
||||
process.off(signal, handler);
|
||||
}
|
||||
try {
|
||||
process.kill(process.pid, signal);
|
||||
} catch {
|
||||
// Ignore errors during shutdown
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function registerCleanupHandlers(): void {
|
||||
if (cleanupRegistered) {
|
||||
return;
|
||||
}
|
||||
cleanupRegistered = true;
|
||||
|
||||
// Cleanup on normal exit and process.exit() calls
|
||||
process.on("exit", () => {
|
||||
releaseAllLocksSync();
|
||||
});
|
||||
|
||||
// Handle termination signals
|
||||
for (const signal of CLEANUP_SIGNALS) {
|
||||
try {
|
||||
const handler = () => handleTerminationSignal(signal);
|
||||
cleanupHandlers.set(signal, handler);
|
||||
process.on(signal, handler);
|
||||
} catch {
|
||||
// Ignore unsupported signals on this platform.
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async function readLockPayload(lockPath: string): Promise<LockFilePayload | null> {
|
||||
try {
|
||||
const raw = await fs.readFile(lockPath, "utf8");
|
||||
const parsed = JSON.parse(raw) as Partial<LockFilePayload>;
|
||||
if (typeof parsed.pid !== "number") {
|
||||
return null;
|
||||
}
|
||||
if (typeof parsed.createdAt !== "string") {
|
||||
return null;
|
||||
}
|
||||
return { pid: parsed.pid, createdAt: parsed.createdAt };
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
export async function acquireSessionWriteLock(params: {
|
||||
sessionFile: string;
|
||||
timeoutMs?: number;
|
||||
staleMs?: number;
|
||||
}): Promise<{
|
||||
release: () => Promise<void>;
|
||||
}> {
|
||||
registerCleanupHandlers();
|
||||
const timeoutMs = params.timeoutMs ?? 10_000;
|
||||
const staleMs = params.staleMs ?? 30 * 60 * 1000;
|
||||
const sessionFile = path.resolve(params.sessionFile);
|
||||
const sessionDir = path.dirname(sessionFile);
|
||||
await fs.mkdir(sessionDir, { recursive: true });
|
||||
let normalizedDir = sessionDir;
|
||||
try {
|
||||
normalizedDir = await fs.realpath(sessionDir);
|
||||
} catch {
|
||||
// Fall back to the resolved path if realpath fails (permissions, transient FS).
|
||||
}
|
||||
const normalizedSessionFile = path.join(normalizedDir, path.basename(sessionFile));
|
||||
const lockPath = `${normalizedSessionFile}.lock`;
|
||||
|
||||
const held = HELD_LOCKS.get(normalizedSessionFile);
|
||||
if (held) {
|
||||
held.count += 1;
|
||||
return {
|
||||
release: async () => {
|
||||
const current = HELD_LOCKS.get(normalizedSessionFile);
|
||||
if (!current) {
|
||||
return;
|
||||
}
|
||||
current.count -= 1;
|
||||
if (current.count > 0) {
|
||||
return;
|
||||
}
|
||||
HELD_LOCKS.delete(normalizedSessionFile);
|
||||
await current.handle.close();
|
||||
await fs.rm(current.lockPath, { force: true });
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
const startedAt = Date.now();
|
||||
let attempt = 0;
|
||||
while (Date.now() - startedAt < timeoutMs) {
|
||||
attempt += 1;
|
||||
try {
|
||||
const handle = await fs.open(lockPath, "wx");
|
||||
await handle.writeFile(
|
||||
JSON.stringify({ pid: process.pid, createdAt: new Date().toISOString() }, null, 2),
|
||||
"utf8",
|
||||
);
|
||||
HELD_LOCKS.set(normalizedSessionFile, { count: 1, handle, lockPath });
|
||||
return {
|
||||
release: async () => {
|
||||
const current = HELD_LOCKS.get(normalizedSessionFile);
|
||||
if (!current) {
|
||||
return;
|
||||
}
|
||||
current.count -= 1;
|
||||
if (current.count > 0) {
|
||||
return;
|
||||
}
|
||||
HELD_LOCKS.delete(normalizedSessionFile);
|
||||
await current.handle.close();
|
||||
await fs.rm(current.lockPath, { force: true });
|
||||
},
|
||||
};
|
||||
} catch (err) {
|
||||
const code = (err as { code?: unknown }).code;
|
||||
if (code !== "EEXIST") {
|
||||
throw err;
|
||||
}
|
||||
const payload = await readLockPayload(lockPath);
|
||||
const createdAt = payload?.createdAt ? Date.parse(payload.createdAt) : NaN;
|
||||
const stale = !Number.isFinite(createdAt) || Date.now() - createdAt > staleMs;
|
||||
const alive = payload?.pid ? isAlive(payload.pid) : false;
|
||||
if (stale || !alive) {
|
||||
await fs.rm(lockPath, { force: true });
|
||||
continue;
|
||||
}
|
||||
|
||||
const delay = Math.min(1000, 50 * attempt);
|
||||
await new Promise((r) => setTimeout(r, delay));
|
||||
}
|
||||
}
|
||||
|
||||
const payload = await readLockPayload(lockPath);
|
||||
const owner = payload?.pid ? `pid=${payload.pid}` : "unknown";
|
||||
throw new Error(`session file locked (timeout ${timeoutMs}ms): ${owner} ${lockPath}`);
|
||||
}
|
||||
|
||||
export const __testing = {
|
||||
cleanupSignals: [...CLEANUP_SIGNALS],
|
||||
handleTerminationSignal,
|
||||
releaseAllLocksSync,
|
||||
};
|
||||
Loading…
Add table
Add a link
Reference in a new issue