multica/src/agent/session/session-transcript-repair.ts
yushen 37e5b2ce7f feat(session): add write lock, file repair, and transcript repair modules
Port from OpenClaw:
- session-write-lock: file-level write lock with atomic creation, reference
  counting, stale lock detection, and process cleanup handlers
- session-file-repair: auto-detect and repair malformed JSONL lines with
  backup and atomic rename
- session-transcript-repair: fix tool call/result pairing issues including
  displaced results, duplicates, orphans, and missing inputs

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-03 17:21:15 +08:00

295 lines
7.5 KiB
TypeScript

import type { AgentMessage } from "@mariozechner/pi-agent-core";
type ToolCallLike = {
id: string;
name?: string;
};
const TOOL_CALL_TYPES = new Set(["toolCall", "toolUse", "functionCall"]);
type ToolCallBlock = {
type?: unknown;
id?: unknown;
name?: unknown;
input?: unknown;
arguments?: unknown;
};
function extractToolCallsFromAssistant(
msg: Extract<AgentMessage, { role: "assistant" }>,
): ToolCallLike[] {
const content = msg.content;
if (!Array.isArray(content)) {
return [];
}
const toolCalls: ToolCallLike[] = [];
for (const block of content) {
if (!block || typeof block !== "object") {
continue;
}
const rec = block as { type?: unknown; id?: unknown; name?: unknown };
if (typeof rec.id !== "string" || !rec.id) {
continue;
}
if (rec.type === "toolCall" || rec.type === "toolUse" || rec.type === "functionCall") {
toolCalls.push({
id: rec.id,
name: typeof rec.name === "string" ? rec.name : undefined,
});
}
}
return toolCalls;
}
function isToolCallBlock(block: unknown): block is ToolCallBlock {
if (!block || typeof block !== "object") {
return false;
}
const type = (block as { type?: unknown }).type;
return typeof type === "string" && TOOL_CALL_TYPES.has(type);
}
function hasToolCallInput(block: ToolCallBlock): boolean {
const hasInput = "input" in block ? block.input !== undefined && block.input !== null : false;
const hasArguments =
"arguments" in block ? block.arguments !== undefined && block.arguments !== null : false;
return hasInput || hasArguments;
}
function extractToolResultId(msg: Extract<AgentMessage, { role: "toolResult" }>): string | null {
const toolCallId = (msg as { toolCallId?: unknown }).toolCallId;
if (typeof toolCallId === "string" && toolCallId) {
return toolCallId;
}
const toolUseId = (msg as { toolUseId?: unknown }).toolUseId;
if (typeof toolUseId === "string" && toolUseId) {
return toolUseId;
}
return null;
}
function makeMissingToolResult(params: {
toolCallId: string;
toolName?: string;
}): Extract<AgentMessage, { role: "toolResult" }> {
return {
role: "toolResult",
toolCallId: params.toolCallId,
toolName: params.toolName ?? "unknown",
content: [
{
type: "text",
text: "[multica] missing tool result in session history; inserted synthetic error result for transcript repair.",
},
],
isError: true,
timestamp: Date.now(),
} as Extract<AgentMessage, { role: "toolResult" }>;
}
export { makeMissingToolResult };
export type ToolCallInputRepairReport = {
messages: AgentMessage[];
droppedToolCalls: number;
droppedAssistantMessages: number;
};
export function repairToolCallInputs(messages: AgentMessage[]): ToolCallInputRepairReport {
let droppedToolCalls = 0;
let droppedAssistantMessages = 0;
let changed = false;
const out: AgentMessage[] = [];
for (const msg of messages) {
if (!msg || typeof msg !== "object") {
out.push(msg);
continue;
}
if (msg.role !== "assistant" || !Array.isArray(msg.content)) {
out.push(msg);
continue;
}
const nextContent = [];
let droppedInMessage = 0;
for (const block of msg.content) {
if (isToolCallBlock(block) && !hasToolCallInput(block)) {
droppedToolCalls += 1;
droppedInMessage += 1;
changed = true;
continue;
}
nextContent.push(block);
}
if (droppedInMessage > 0) {
if (nextContent.length === 0) {
droppedAssistantMessages += 1;
changed = true;
continue;
}
out.push({ ...msg, content: nextContent });
continue;
}
out.push(msg);
}
return {
messages: changed ? out : messages,
droppedToolCalls,
droppedAssistantMessages,
};
}
export function sanitizeToolCallInputs(messages: AgentMessage[]): AgentMessage[] {
return repairToolCallInputs(messages).messages;
}
export function sanitizeToolUseResultPairing(messages: AgentMessage[]): AgentMessage[] {
return repairToolUseResultPairing(messages).messages;
}
export type ToolUseRepairReport = {
messages: AgentMessage[];
added: Array<Extract<AgentMessage, { role: "toolResult" }>>;
droppedDuplicateCount: number;
droppedOrphanCount: number;
moved: boolean;
};
export function repairToolUseResultPairing(messages: AgentMessage[]): ToolUseRepairReport {
const out: AgentMessage[] = [];
const added: Array<Extract<AgentMessage, { role: "toolResult" }>> = [];
const seenToolResultIds = new Set<string>();
let droppedDuplicateCount = 0;
let droppedOrphanCount = 0;
let moved = false;
let changed = false;
const pushToolResult = (msg: Extract<AgentMessage, { role: "toolResult" }>) => {
const id = extractToolResultId(msg);
if (id && seenToolResultIds.has(id)) {
droppedDuplicateCount += 1;
changed = true;
return;
}
if (id) {
seenToolResultIds.add(id);
}
out.push(msg);
};
for (let i = 0; i < messages.length; i += 1) {
const msg = messages[i];
if (!msg || typeof msg !== "object") {
out.push(msg);
continue;
}
const role = (msg as { role?: unknown }).role;
if (role !== "assistant") {
if (role !== "toolResult") {
out.push(msg);
} else {
droppedOrphanCount += 1;
changed = true;
}
continue;
}
const assistant = msg as Extract<AgentMessage, { role: "assistant" }>;
const toolCalls = extractToolCallsFromAssistant(assistant);
if (toolCalls.length === 0) {
out.push(msg);
continue;
}
const toolCallIds = new Set(toolCalls.map((t) => t.id));
const spanResultsById = new Map<string, Extract<AgentMessage, { role: "toolResult" }>>();
const remainder: AgentMessage[] = [];
let j = i + 1;
for (; j < messages.length; j += 1) {
const next = messages[j];
if (!next || typeof next !== "object") {
remainder.push(next);
continue;
}
const nextRole = (next as { role?: unknown }).role;
if (nextRole === "assistant") {
break;
}
if (nextRole === "toolResult") {
const toolResult = next as Extract<AgentMessage, { role: "toolResult" }>;
const id = extractToolResultId(toolResult);
if (id && toolCallIds.has(id)) {
if (seenToolResultIds.has(id)) {
droppedDuplicateCount += 1;
changed = true;
continue;
}
if (!spanResultsById.has(id)) {
spanResultsById.set(id, toolResult);
}
continue;
}
}
if (nextRole !== "toolResult") {
remainder.push(next);
} else {
droppedOrphanCount += 1;
changed = true;
}
}
out.push(msg);
if (spanResultsById.size > 0 && remainder.length > 0) {
moved = true;
changed = true;
}
for (const call of toolCalls) {
const existing = spanResultsById.get(call.id);
if (existing) {
pushToolResult(existing);
} else {
const missing = makeMissingToolResult({
toolCallId: call.id,
toolName: call.name,
});
added.push(missing);
changed = true;
pushToolResult(missing);
}
}
for (const rem of remainder) {
if (!rem || typeof rem !== "object") {
out.push(rem);
continue;
}
out.push(rem);
}
i = j - 1;
}
const changedOrMoved = changed || moved;
return {
messages: changedOrMoved ? out : messages,
added,
droppedDuplicateCount,
droppedOrphanCount,
moved: changedOrMoved,
};
}