merge: resolve conflicts with main branch

- Merge auth-profiles feature from main into runner.ts
- Merge closeCallbacks feature from main into async-agent.ts
- Regenerate pnpm-lock.yaml with new dependencies

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Jiang Bohan 2026-02-03 18:57:05 +08:00
commit dafbf856ac
83 changed files with 12239 additions and 258 deletions

View file

@ -1,18 +1,20 @@
import { v7 as uuidv7 } from "uuid";
import type { AgentEvent } from "@mariozechner/pi-agent-core";
import { Agent } from "./runner.js";
import { Channel } from "./channel.js";
import { extractText } from "./extract-text.js";
import type { AgentOptions, Message } from "./types.js";
import type { StreamPayload } from "@multica/sdk";
const devNull = { write: () => true } as NodeJS.WritableStream;
/** Discriminated union of legacy Message (error fallback) and raw AgentEvent */
export type ChannelItem = Message | AgentEvent;
export class AsyncAgent {
private readonly agent: Agent;
private readonly channel = new Channel<Message>();
private readonly channel = new Channel<ChannelItem>();
private _closed = false;
private queue: Promise<void> = Promise.resolve();
private streamCallback?: (payload: StreamPayload) => void;
private closeCallbacks: Array<() => void> = [];
readonly sessionId: string;
constructor(options?: AgentOptions) {
@ -21,18 +23,17 @@ export class AsyncAgent {
logger: { stdout: devNull, stderr: devNull },
});
this.sessionId = this.agent.sessionId;
this.setupStreamEvents();
// Forward raw AgentEvent into the channel
this.agent.subscribe((event: AgentEvent) => {
this.channel.send(event);
});
}
get closed(): boolean {
return this._closed;
}
/** Register callback for streaming events */
onStream(cb: (payload: StreamPayload) => void): void {
this.streamCallback = cb;
}
/** Write message to agent (non-blocking, serialized queue) */
write(content: string): void {
if (this._closed) throw new Error("Agent is closed");
@ -41,15 +42,9 @@ export class AsyncAgent {
.then(async () => {
if (this._closed) return;
const result = await this.agent.run(content);
// Only send final message via channel if no stream callback
// (stream callback already sent the final content)
if (!this.streamCallback) {
if (result.text) {
this.channel.send({ id: uuidv7(), content: result.text });
}
if (result.error) {
this.channel.send({ id: uuidv7(), content: `[error] ${result.error}` });
}
// Normal text is delivered via message_end event; only handle errors here
if (result.error) {
this.channel.send({ id: uuidv7(), content: `[error] ${result.error}` });
}
})
.catch((err) => {
@ -58,16 +53,39 @@ export class AsyncAgent {
});
}
/** Continuously read message stream */
read(): AsyncIterable<Message> {
/** Continuously read channel stream (AgentEvent + error Messages) */
read(): AsyncIterable<ChannelItem> {
return this.channel;
}
/** Close agent, stop all reads */
/** Returns a promise that resolves when the current message queue is drained */
waitForIdle(): Promise<void> {
return this.queue;
}
/** Register a callback to be invoked when the agent is closed */
onClose(callback: () => void): void {
if (this._closed) {
// Already closed, fire immediately
callback();
return;
}
this.closeCallbacks.push(callback);
}
/** Close agent, stop all reads, fire close callbacks */
close(): void {
if (this._closed) return;
this._closed = true;
this.channel.close();
for (const cb of this.closeCallbacks) {
try {
cb();
} catch {
// Don't let callback errors prevent other callbacks
}
}
this.closeCallbacks = [];
}
/** Get current active tool names */
@ -130,50 +148,4 @@ export class AsyncAgent {
getProfileId(): string | undefined {
return this.agent.getProfileId();
}
private setupStreamEvents(): void {
let currentStreamId: string | null = null;
this.agent.subscribe((event) => {
if (!this.streamCallback) return;
switch (event.type) {
case "message_start": {
if (event.message.role === "assistant") {
currentStreamId = uuidv7();
this.streamCallback({
streamId: currentStreamId,
agentId: this.sessionId,
state: "delta",
content: extractText(event.message),
});
}
break;
}
case "message_update": {
if (event.message.role === "assistant" && currentStreamId) {
this.streamCallback({
streamId: currentStreamId,
agentId: this.sessionId,
state: "delta",
content: extractText(event.message),
});
}
break;
}
case "message_end": {
if (event.message.role === "assistant" && currentStreamId) {
this.streamCallback({
streamId: currentStreamId,
agentId: this.sessionId,
state: "final",
content: extractText(event.message),
});
currentStreamId = null;
}
break;
}
}
});
}
}

View file

@ -0,0 +1,45 @@
/**
* Auth Profile Constants
*
* Cooldown timings, store version, and file names.
*/
/** Store format version */
export const AUTH_STORE_VERSION = 1;
/** Runtime store filename (inside ~/.super-multica/) */
export const AUTH_PROFILE_STORE_FILENAME = "auth-profiles.json";
// ============================================================
// Non-billing cooldown (rate_limit, auth, timeout, unknown)
// Progression: 1min -> 5min -> 25min -> 1hr (cap)
// Formula: min(MAX, BASE * FACTOR ^ min(errorCount - 1, 3))
// ============================================================
/** Base cooldown duration in milliseconds (1 minute) */
export const COOLDOWN_BASE_MS = 60_000;
/** Exponential factor for cooldown progression */
export const COOLDOWN_FACTOR = 5;
/** Maximum cooldown duration in milliseconds (1 hour) */
export const COOLDOWN_MAX_MS = 3_600_000;
// ============================================================
// Billing disable (longer backoff for payment/quota issues)
// Progression: 5h -> 10h -> 20h -> 24h (cap)
// Formula: min(MAX_HOURS, BASE_HOURS * 2 ^ (count - 1))
// ============================================================
/** Base billing disable duration in hours */
export const BILLING_BACKOFF_HOURS = 5;
/** Maximum billing disable duration in hours */
export const BILLING_MAX_HOURS = 24;
// ============================================================
// Failure window
// ============================================================
/** Failure window in milliseconds (24 hours) — errors older than this are forgotten */
export const FAILURE_WINDOW_MS = 24 * 60 * 60 * 1000;

View file

@ -0,0 +1,65 @@
import { describe, it, expect } from "vitest";
import { classifyError, isRotatableError } from "../runner.js";
// ============================================================
// classifyError
// ============================================================
describe("classifyError", () => {
it("classifies 401/403/unauthorized as auth", () => {
expect(classifyError(new Error("HTTP 401 Unauthorized"))).toBe("auth");
expect(classifyError(new Error("403 Forbidden"))).toBe("auth");
expect(classifyError(new Error("Invalid API key provided"))).toBe("auth");
expect(classifyError(new Error("Authentication failed"))).toBe("auth");
});
it("classifies 400/malformed as format", () => {
expect(classifyError(new Error("400 Bad Request"))).toBe("format");
expect(classifyError(new Error("Invalid request body"))).toBe("format");
expect(classifyError(new Error("Malformed JSON in request"))).toBe("format");
expect(classifyError(new Error("Schema validation failed"))).toBe("format");
});
it("classifies 429/rate limit as rate_limit", () => {
expect(classifyError(new Error("429 Too Many Requests"))).toBe("rate_limit");
expect(classifyError(new Error("Rate limit exceeded"))).toBe("rate_limit");
expect(classifyError(new Error("rate_limit_error"))).toBe("rate_limit");
});
it("classifies billing/quota as billing", () => {
expect(classifyError(new Error("Billing quota exceeded"))).toBe("billing");
expect(classifyError(new Error("Insufficient credits"))).toBe("billing");
expect(classifyError(new Error("Payment required"))).toBe("billing");
});
it("classifies timeout/connection errors as timeout", () => {
expect(classifyError(new Error("Request timed out"))).toBe("timeout");
expect(classifyError(new Error("ETIMEDOUT"))).toBe("timeout");
expect(classifyError(new Error("ECONNRESET"))).toBe("timeout");
expect(classifyError(new Error("Connection timeout"))).toBe("timeout");
});
it("classifies unknown errors as unknown", () => {
expect(classifyError(new Error("Something went wrong"))).toBe("unknown");
expect(classifyError("string error")).toBe("unknown");
expect(classifyError(42)).toBe("unknown");
});
});
// ============================================================
// isRotatableError
// ============================================================
describe("isRotatableError", () => {
it("considers auth, rate_limit, billing, timeout as rotatable", () => {
expect(isRotatableError("auth")).toBe(true);
expect(isRotatableError("rate_limit")).toBe(true);
expect(isRotatableError("billing")).toBe(true);
expect(isRotatableError("timeout")).toBe(true);
});
it("does not rotate on format or unknown errors", () => {
expect(isRotatableError("format")).toBe(false);
expect(isRotatableError("unknown")).toBe(false);
});
});

View file

@ -0,0 +1,48 @@
/**
* Auth Profiles barrel export
*/
export type {
AuthProfileFailureReason,
AuthProfileStore,
ProfileUsageStats,
ResolvedProfileAuth,
} from "./types.js";
export {
AUTH_STORE_VERSION,
AUTH_PROFILE_STORE_FILENAME,
COOLDOWN_BASE_MS,
COOLDOWN_FACTOR,
COOLDOWN_MAX_MS,
BILLING_BACKOFF_HOURS,
BILLING_MAX_HOURS,
FAILURE_WINDOW_MS,
} from "./constants.js";
export {
resolveAuthStorePath,
coerceStore,
ensureAuthStoreFile,
loadAuthProfileStore,
saveAuthProfileStore,
updateAuthProfileStore,
} from "./store.js";
export {
listProfilesForProvider,
resolveAuthProfileOrder,
type AuthProfileOrderOptions,
} from "./order.js";
export {
isProfileInCooldown,
resolveProfileUnusableUntil,
calculateCooldownMs,
calculateBillingDisableMs,
computeNextProfileUsageStats,
markAuthProfileFailure,
markAuthProfileUsed,
markAuthProfileGood,
clearAuthProfileCooldown,
} from "./usage.js";

View file

@ -0,0 +1,208 @@
import { describe, it, expect, vi, beforeEach } from "vitest";
import { resolveAuthProfileOrder, listProfilesForProvider } from "./order.js";
import type { AuthProfileStore } from "./types.js";
// Track mock profiles for credential validation
let _profiles: Record<string, { apiKey?: string }> = {};
let _order: Record<string, string[]> = {};
// Mock credentialManager
vi.mock("../credentials.js", () => {
return {
credentialManager: {
listProfileIdsForProvider(provider: string): string[] {
return Object.keys(_profiles).filter(
(key) => key === provider || key.startsWith(`${provider}:`),
);
},
getLlmOrder(provider: string): string[] | undefined {
return _order[provider];
},
getLlmProviderConfig(profileId: string): { apiKey?: string } | undefined {
return _profiles[profileId];
},
},
};
});
// Mock providers/registry — all test profiles are API-key based
vi.mock("../providers/registry.js", () => ({
isOAuthProvider: (_provider: string) => false,
}));
// Mock providers/resolver — delegate to our mock profiles
vi.mock("../providers/resolver.js", () => ({
resolveApiKeyForProfile: (profileId: string) => _profiles[profileId]?.apiKey,
}));
function setProfiles(profiles: Record<string, { apiKey?: string }>) {
_profiles = profiles;
}
function setOrder(order: Record<string, string[]>) {
_order = order;
}
beforeEach(() => {
_profiles = {};
_order = {};
});
// ============================================================
// listProfilesForProvider
// ============================================================
describe("listProfilesForProvider", () => {
it("returns profiles matching the provider", () => {
setProfiles({
anthropic: { apiKey: "sk-1" },
"anthropic:backup": { apiKey: "sk-2" },
openai: { apiKey: "sk-3" },
});
expect(listProfilesForProvider("anthropic")).toEqual([
"anthropic",
"anthropic:backup",
]);
});
it("returns empty array when no profiles match", () => {
setProfiles({ openai: { apiKey: "sk-1" } });
expect(listProfilesForProvider("anthropic")).toEqual([]);
});
});
// ============================================================
// resolveAuthProfileOrder
// ============================================================
describe("resolveAuthProfileOrder", () => {
const now = 1_000_000;
it("returns round-robin order by lastUsed when no explicit order", () => {
setProfiles({
"anthropic": { apiKey: "sk-1" },
"anthropic:b": { apiKey: "sk-2" },
"anthropic:c": { apiKey: "sk-3" },
});
const store: AuthProfileStore = {
version: 1,
usageStats: {
"anthropic": { lastUsed: 300 },
"anthropic:b": { lastUsed: 100 },
"anthropic:c": { lastUsed: 200 },
},
};
const order = resolveAuthProfileOrder("anthropic", store, now);
// Sorted by lastUsed ascending: b(100) -> c(200) -> default(300)
expect(order).toEqual(["anthropic:b", "anthropic:c", "anthropic"]);
});
it("respects explicit order from config", () => {
setProfiles({
"anthropic": { apiKey: "sk-1" },
"anthropic:b": { apiKey: "sk-2" },
"anthropic:c": { apiKey: "sk-3" },
});
setOrder({ anthropic: ["anthropic:c", "anthropic", "anthropic:b"] });
const store: AuthProfileStore = { version: 1 };
const order = resolveAuthProfileOrder("anthropic", store, now);
expect(order).toEqual(["anthropic:c", "anthropic", "anthropic:b"]);
});
it("pushes cooldown profiles to the end", () => {
setProfiles({
"anthropic": { apiKey: "sk-1" },
"anthropic:b": { apiKey: "sk-2" },
"anthropic:c": { apiKey: "sk-3" },
});
const store: AuthProfileStore = {
version: 1,
usageStats: {
"anthropic": { lastUsed: 100 },
"anthropic:b": { lastUsed: 200, cooldownUntil: now + 5000 },
"anthropic:c": { lastUsed: 300 },
},
};
const order = resolveAuthProfileOrder("anthropic", store, now);
// anthropic and anthropic:c are available; anthropic:b is in cooldown -> pushed to end
expect(order).toEqual(["anthropic", "anthropic:c", "anthropic:b"]);
});
it("sorts cooldown profiles by earliest recovery", () => {
setProfiles({
"anthropic": { apiKey: "sk-1" },
"anthropic:b": { apiKey: "sk-2" },
"anthropic:c": { apiKey: "sk-3" },
});
const store: AuthProfileStore = {
version: 1,
usageStats: {
"anthropic": { cooldownUntil: now + 10_000 },
"anthropic:b": { cooldownUntil: now + 1_000 },
"anthropic:c": { cooldownUntil: now + 5_000 },
},
};
const order = resolveAuthProfileOrder("anthropic", store, now);
// All in cooldown, sorted by soonest recovery
expect(order).toEqual(["anthropic:b", "anthropic:c", "anthropic"]);
});
it("deduplicates profile IDs", () => {
setProfiles({
"anthropic": { apiKey: "sk-1" },
"anthropic:b": { apiKey: "sk-2" },
});
// Explicit order has duplicate
setOrder({ anthropic: ["anthropic", "anthropic", "anthropic:b"] });
const store: AuthProfileStore = { version: 1 };
const order = resolveAuthProfileOrder("anthropic", store, now);
expect(order).toEqual(["anthropic", "anthropic:b"]);
});
it("appends unlisted profiles to explicit order", () => {
setProfiles({
"anthropic": { apiKey: "sk-1" },
"anthropic:b": { apiKey: "sk-2" },
"anthropic:c": { apiKey: "sk-3" },
});
// Only lists one profile in explicit order
setOrder({ anthropic: ["anthropic:b"] });
const store: AuthProfileStore = { version: 1 };
const order = resolveAuthProfileOrder("anthropic", store, now);
// anthropic:b first (explicit), then the rest
expect(order[0]).toBe("anthropic:b");
expect(order).toHaveLength(3);
expect(order).toContain("anthropic");
expect(order).toContain("anthropic:c");
});
it("filters out profiles with no valid API key", () => {
setProfiles({
"anthropic": { apiKey: "sk-1" },
"anthropic:empty": {}, // no apiKey
"anthropic:c": { apiKey: "sk-3" },
});
const store: AuthProfileStore = { version: 1 };
const order = resolveAuthProfileOrder("anthropic", store, now);
expect(order).toEqual(["anthropic", "anthropic:c"]);
});
it("moves preferredProfile to front", () => {
setProfiles({
"anthropic": { apiKey: "sk-1" },
"anthropic:b": { apiKey: "sk-2" },
"anthropic:c": { apiKey: "sk-3" },
});
const store: AuthProfileStore = { version: 1 };
const order = resolveAuthProfileOrder("anthropic", store, now, {
preferredProfile: "anthropic:c",
});
expect(order[0]).toBe("anthropic:c");
expect(order).toHaveLength(3);
});
});

View file

@ -0,0 +1,147 @@
/**
* Auth Profile Ordering
*
* Determines the order in which auth profiles are tried for a given provider.
* Supports explicit ordering (from credentials.json5) and automatic round-robin
* with two-level sort: credential type priority (OAuth > API key), then lastUsed.
* Profiles in cooldown are pushed to the end.
*/
import { credentialManager } from "../credentials.js";
import { isOAuthProvider } from "../providers/registry.js";
import { resolveApiKeyForProfile } from "../providers/resolver.js";
import type { AuthProfileStore } from "./types.js";
import { isProfileInCooldown, resolveProfileUnusableUntil } from "./usage.js";
// ============================================================
// Profile discovery
// ============================================================
/**
* List all profile IDs from credentials.json5 that belong to a given provider.
* A profile matches if its key equals the provider exactly or starts with "provider:".
*/
export function listProfilesForProvider(provider: string): string[] {
return credentialManager.listProfileIdsForProvider(provider);
}
// ============================================================
// Type priority
// ============================================================
/**
* Get the type-based priority for a profile.
* OAuth providers (e.g. claude-code, openai-codex) get priority 0 (preferred),
* API-key providers get priority 1.
* Lower number = higher priority.
*/
function getProfileTypePriority(profileId: string): number {
// Extract the provider portion from profileId (before ":" if present)
const provider = profileId.includes(":") ? profileId.split(":")[0]! : profileId;
return isOAuthProvider(provider) ? 0 : 1;
}
// ============================================================
// Ordering
// ============================================================
export interface AuthProfileOrderOptions {
/** Preferred profile to put first (used when user or agent selects a profile) */
preferredProfile?: string | undefined;
}
/**
* Resolve the ordered list of profile IDs to try for a given provider.
*
* Strategy:
* 1. If credentials.json5 has `llm.order[provider]`, use that explicit order.
* 2. Otherwise, use round-robin with two-level sort:
* - First by credential type priority (OAuth > API key)
* - Then by `lastUsed` ascending within each type (oldest first)
*
* In both cases:
* - Profiles with invalid/missing credentials are filtered out
* - Profiles currently in cooldown are pushed to the end,
* sorted by earliest cooldown expiry (soonest-to-recover first)
* - If `preferredProfile` is set, it is moved to the front
*/
export function resolveAuthProfileOrder(
provider: string,
store: AuthProfileStore,
now?: number,
options?: AuthProfileOrderOptions,
): string[] {
const ts = now ?? Date.now();
// Gather candidates
const explicitOrder = credentialManager.getLlmOrder(provider);
const allProfiles = listProfilesForProvider(provider);
let candidates: string[];
if (explicitOrder && explicitOrder.length > 0) {
// Use explicit order, filter to only existing profiles
const profileSet = new Set(allProfiles);
candidates = explicitOrder.filter((id) => profileSet.has(id));
// Append any profiles not in the explicit order
for (const id of allProfiles) {
if (!candidates.includes(id)) {
candidates.push(id);
}
}
} else {
// Two-level sort: type priority first, then lastUsed within same type
candidates = [...allProfiles].sort((a, b) => {
const priorityDiff = getProfileTypePriority(a) - getProfileTypePriority(b);
if (priorityDiff !== 0) return priorityDiff;
const statsA = store.usageStats?.[a];
const statsB = store.usageStats?.[b];
return (statsA?.lastUsed ?? 0) - (statsB?.lastUsed ?? 0);
});
}
// Deduplicate
candidates = [...new Set(candidates)];
// Filter out profiles with invalid/missing credentials
candidates = candidates.filter((id) => {
// For OAuth providers, resolveApiKeyForProfile won't find them in credentials.json5
// but they are still valid candidates (resolved at runtime via OAuth flow)
const provider = id.includes(":") ? id.split(":")[0]! : id;
if (isOAuthProvider(provider)) return true;
return resolveApiKeyForProfile(id) !== undefined;
});
// Partition into available and in-cooldown
const available: string[] = [];
const inCooldown: string[] = [];
for (const id of candidates) {
const stats = store.usageStats?.[id];
if (stats && isProfileInCooldown(stats, ts)) {
inCooldown.push(id);
} else {
available.push(id);
}
}
// Sort cooldown profiles by soonest recovery
inCooldown.sort((a, b) => {
const statsA = store.usageStats?.[a] ?? {};
const statsB = store.usageStats?.[b] ?? {};
return resolveProfileUnusableUntil(statsA) - resolveProfileUnusableUntil(statsB);
});
let result = [...available, ...inCooldown];
// Move preferred profile to front if specified
if (options?.preferredProfile && result.includes(options.preferredProfile)) {
result = [
options.preferredProfile,
...result.filter((id) => id !== options.preferredProfile),
];
}
return result;
}

View file

@ -0,0 +1,131 @@
import { describe, it, expect, beforeEach, afterEach } from "vitest";
import { existsSync, mkdirSync, rmSync, writeFileSync } from "node:fs";
import { join } from "node:path";
import { coerceStore, loadAuthProfileStore, saveAuthProfileStore, updateAuthProfileStore } from "./store.js";
import { AUTH_STORE_VERSION } from "./constants.js";
import type { AuthProfileStore } from "./types.js";
// Use a temp directory for tests to avoid touching real store
const TEST_DIR = join(import.meta.dirname ?? ".", "__test_store_tmp__");
const TEST_STORE_PATH = join(TEST_DIR, "auth-profiles.json");
// We need to mock resolveAuthStorePath to point to our test dir
import { vi } from "vitest";
vi.mock("../../shared/paths.js", () => ({
DATA_DIR: join(import.meta.dirname ?? ".", "__test_store_tmp__"),
}));
beforeEach(() => {
if (!existsSync(TEST_DIR)) {
mkdirSync(TEST_DIR, { recursive: true });
}
});
afterEach(() => {
if (existsSync(TEST_DIR)) {
rmSync(TEST_DIR, { recursive: true, force: true });
}
});
// ============================================================
// coerceStore
// ============================================================
describe("coerceStore", () => {
it("returns empty store for null", () => {
const store = coerceStore(null);
expect(store.version).toBe(AUTH_STORE_VERSION);
expect(store.lastGood).toBeUndefined();
expect(store.usageStats).toBeUndefined();
});
it("returns empty store for non-object", () => {
expect(coerceStore("hello").version).toBe(AUTH_STORE_VERSION);
expect(coerceStore(42).version).toBe(AUTH_STORE_VERSION);
expect(coerceStore(undefined).version).toBe(AUTH_STORE_VERSION);
});
it("preserves valid store data", () => {
const raw = {
version: 1,
lastGood: { anthropic: "anthropic:backup" },
usageStats: {
"anthropic": { lastUsed: 1000, errorCount: 0 },
},
};
const store = coerceStore(raw);
expect(store.version).toBe(1);
expect(store.lastGood?.anthropic).toBe("anthropic:backup");
expect(store.usageStats?.anthropic?.lastUsed).toBe(1000);
});
it("defaults version when missing", () => {
const store = coerceStore({ lastGood: {} });
expect(store.version).toBe(AUTH_STORE_VERSION);
});
});
// ============================================================
// loadAuthProfileStore / saveAuthProfileStore
// ============================================================
describe("loadAuthProfileStore / saveAuthProfileStore", () => {
it("returns empty store when file does not exist", () => {
const store = loadAuthProfileStore();
expect(store.version).toBe(AUTH_STORE_VERSION);
});
it("round-trips save and load", () => {
const original: AuthProfileStore = {
version: 1,
lastGood: { anthropic: "anthropic:main" },
usageStats: {
"anthropic:main": { lastUsed: 5000, errorCount: 1 },
},
};
saveAuthProfileStore(original);
const loaded = loadAuthProfileStore();
expect(loaded).toEqual(original);
});
it("handles corrupted JSON gracefully", () => {
writeFileSync(TEST_STORE_PATH, "not valid json{{{", "utf8");
const store = loadAuthProfileStore();
expect(store.version).toBe(AUTH_STORE_VERSION);
});
});
// ============================================================
// updateAuthProfileStore
// ============================================================
describe("updateAuthProfileStore", () => {
it("creates file and applies update when file does not exist", () => {
const result = updateAuthProfileStore((store) => {
if (!store.lastGood) store.lastGood = {};
store.lastGood.openai = "openai:primary";
});
expect(result.lastGood?.openai).toBe("openai:primary");
// Verify persisted
const loaded = loadAuthProfileStore();
expect(loaded.lastGood?.openai).toBe("openai:primary");
});
it("preserves existing data across updates", () => {
saveAuthProfileStore({
version: 1,
lastGood: { anthropic: "anthropic" },
});
updateAuthProfileStore((store) => {
if (!store.usageStats) store.usageStats = {};
store.usageStats["anthropic"] = { lastUsed: 9999 };
});
const loaded = loadAuthProfileStore();
expect(loaded.lastGood?.anthropic).toBe("anthropic");
expect(loaded.usageStats?.anthropic?.lastUsed).toBe(9999);
});
});

View file

@ -0,0 +1,214 @@
/**
* Auth Profile Store
*
* Persistence layer for auth profile runtime state.
* Stores usage stats, cooldowns, and last-good info in ~/.super-multica/auth-profiles.json.
* Uses a custom file lock (exclusive-create based) for safe concurrent access.
*/
import {
existsSync,
readFileSync,
writeFileSync,
mkdirSync,
openSync,
closeSync,
rmSync,
statSync,
constants as fsConstants,
} from "node:fs";
import { join, dirname } from "node:path";
import { DATA_DIR } from "../../shared/paths.js";
import { AUTH_STORE_VERSION, AUTH_PROFILE_STORE_FILENAME } from "./constants.js";
import type { AuthProfileStore } from "./types.js";
// ============================================================
// Custom file lock (synchronous, exclusive-create based)
// ============================================================
const LOCK_STALE_MS = 30_000;
const LOCK_RETRY_COUNT = 10;
const LOCK_RETRY_BASE_MS = 50;
const LOCK_RETRY_MAX_MS = 1_000;
type LockPayload = { pid: number; createdAt: string };
function isProcessAlive(pid: number): boolean {
if (!Number.isFinite(pid) || pid <= 0) return false;
try {
process.kill(pid, 0);
return true;
} catch {
return false;
}
}
function readLockPayloadSync(lockPath: string): LockPayload | null {
try {
const raw = readFileSync(lockPath, "utf8");
const parsed = JSON.parse(raw) as Partial<LockPayload>;
if (typeof parsed.pid !== "number" || typeof parsed.createdAt !== "string") return null;
return { pid: parsed.pid, createdAt: parsed.createdAt };
} catch {
return null;
}
}
function isLockStale(lockPath: string): boolean {
const payload = readLockPayloadSync(lockPath);
if (payload) {
const age = Date.now() - Date.parse(payload.createdAt);
if (!Number.isFinite(age) || age > LOCK_STALE_MS) return true;
return !isProcessAlive(payload.pid);
}
// No payload readable — check file mtime
try {
const stat = statSync(lockPath);
return Date.now() - stat.mtimeMs > LOCK_STALE_MS;
} catch {
return true; // Can't stat — treat as stale
}
}
/**
* Acquire a synchronous exclusive file lock.
* Returns a release function. Throws if lock cannot be acquired after retries.
*/
function acquireLockSync(filePath: string): () => void {
const lockPath = `${filePath}.lock`;
const payload = JSON.stringify(
{ pid: process.pid, createdAt: new Date().toISOString() },
null,
2,
);
for (let attempt = 0; attempt < LOCK_RETRY_COUNT; attempt++) {
try {
// O_WRONLY | O_CREAT | O_EXCL — fails if file already exists
const fd = openSync(lockPath, fsConstants.O_WRONLY | fsConstants.O_CREAT | fsConstants.O_EXCL);
writeFileSync(fd, payload, "utf8");
closeSync(fd);
return () => {
try { rmSync(lockPath, { force: true }); } catch { /* best effort */ }
};
} catch (err) {
const code = (err as { code?: string }).code;
if (code !== "EEXIST") throw err;
// Lock file exists — check if stale
if (isLockStale(lockPath)) {
try { rmSync(lockPath, { force: true }); } catch { /* ignore */ }
continue;
}
// Wait and retry (synchronous busy-wait via Atomics for minimal overhead)
const delay = Math.min(LOCK_RETRY_MAX_MS, LOCK_RETRY_BASE_MS * (attempt + 1));
const buf = new SharedArrayBuffer(4);
Atomics.wait(new Int32Array(buf), 0, 0, delay);
}
}
throw new Error(`Failed to acquire lock after ${LOCK_RETRY_COUNT} retries: ${filePath}`);
}
// ============================================================
// Paths
// ============================================================
/** Resolve the auth profile store file path */
export function resolveAuthStorePath(): string {
return join(DATA_DIR, AUTH_PROFILE_STORE_FILENAME);
}
// ============================================================
// Load / Save
// ============================================================
function createEmptyStore(): AuthProfileStore {
return { version: AUTH_STORE_VERSION };
}
/** Coerce raw JSON into a valid AuthProfileStore, defensive against malformed data */
export function coerceStore(raw: unknown): AuthProfileStore {
if (!raw || typeof raw !== "object") return createEmptyStore();
const obj = raw as Record<string, unknown>;
const store: AuthProfileStore = {
version: typeof obj.version === "number" ? obj.version : AUTH_STORE_VERSION,
};
if (obj.lastGood && typeof obj.lastGood === "object") {
store.lastGood = obj.lastGood as Record<string, string>;
}
if (obj.usageStats && typeof obj.usageStats === "object") {
store.usageStats = obj.usageStats as AuthProfileStore["usageStats"];
}
return store;
}
/** Ensure the store file exists on disk (creates it if missing) */
export function ensureAuthStoreFile(): string {
const storePath = resolveAuthStorePath();
const dir = dirname(storePath);
if (!existsSync(dir)) {
mkdirSync(dir, { recursive: true });
}
if (!existsSync(storePath)) {
writeFileSync(storePath, JSON.stringify(createEmptyStore(), null, 2), "utf8");
}
return storePath;
}
/** Load auth profile store from disk. Returns empty store if file doesn't exist. */
export function loadAuthProfileStore(): AuthProfileStore {
const storePath = resolveAuthStorePath();
if (!existsSync(storePath)) return createEmptyStore();
try {
const raw = readFileSync(storePath, "utf8");
return coerceStore(JSON.parse(raw));
} catch {
return createEmptyStore();
}
}
/** Save auth profile store to disk */
export function saveAuthProfileStore(store: AuthProfileStore): void {
const storePath = resolveAuthStorePath();
const dir = dirname(storePath);
if (!existsSync(dir)) {
mkdirSync(dir, { recursive: true });
}
writeFileSync(storePath, JSON.stringify(store, null, 2), "utf8");
}
/**
* Atomic load-update-save cycle with file locking.
* Acquires a lock on the store file, loads current state, runs the updater,
* and saves. Falls back to unlocked update if the lock cannot be acquired.
* Returns the updated store.
*/
export function updateAuthProfileStore(
updater: (store: AuthProfileStore) => void,
): AuthProfileStore {
const storePath = ensureAuthStoreFile();
try {
const release = acquireLockSync(storePath);
try {
const store = loadAuthProfileStore();
updater(store);
saveAuthProfileStore(store);
return store;
} finally {
release();
}
} catch {
// Fallback: unlocked update (better than losing the write entirely)
const store = loadAuthProfileStore();
updater(store);
saveAuthProfileStore(store);
return store;
}
}

View file

@ -0,0 +1,48 @@
/**
* Auth Profile Types
*
* Type definitions for the auth profile rotation and cooldown system.
*/
/** Reason for an auth profile failure, determines cooldown behavior */
export type AuthProfileFailureReason =
| "auth"
| "format"
| "rate_limit"
| "billing"
| "timeout"
| "unknown";
/** Per-profile usage and cooldown state (persisted in auth-profiles.json) */
export type ProfileUsageStats = {
/** Timestamp of last successful use */
lastUsed?: number | undefined;
/** Cooldown expiry for non-billing failures (rate_limit, auth, timeout, unknown) */
cooldownUntil?: number | undefined;
/** Disable expiry for billing failures (longer backoff) */
disabledUntil?: number | undefined;
/** Reason for the current disable period */
disabledReason?: AuthProfileFailureReason | undefined;
/** Consecutive error count (resets on success or after failure window) */
errorCount?: number | undefined;
/** Per-reason failure counts within the failure window */
failureCounts?: Partial<Record<AuthProfileFailureReason, number>> | undefined;
/** Timestamp of the last failure (used for failure window expiry) */
lastFailureAt?: number | undefined;
};
/** Persisted runtime store for auth profile state */
export type AuthProfileStore = {
version: number;
/** Last known good profile per provider */
lastGood?: Record<string, string> | undefined;
/** Per-profile usage/cooldown stats */
usageStats?: Record<string, ProfileUsageStats> | undefined;
};
/** Resolved auth info returned by profile-aware key resolution */
export type ResolvedProfileAuth = {
apiKey: string;
profileId: string;
provider: string;
};

View file

@ -0,0 +1,154 @@
import { describe, it, expect } from "vitest";
import {
calculateCooldownMs,
calculateBillingDisableMs,
computeNextProfileUsageStats,
isProfileInCooldown,
resolveProfileUnusableUntil,
} from "./usage.js";
import {
COOLDOWN_BASE_MS,
COOLDOWN_MAX_MS,
FAILURE_WINDOW_MS,
} from "./constants.js";
import type { ProfileUsageStats } from "./types.js";
// ============================================================
// calculateCooldownMs
// ============================================================
describe("calculateCooldownMs", () => {
it("applies exponential backoff with a 1h cap", () => {
const max = () => 1; // equal-jitter max
expect(calculateCooldownMs(1, max)).toBe(60_000); // 1 min
expect(calculateCooldownMs(2, max)).toBe(5 * 60_000); // 5 min
expect(calculateCooldownMs(3, max)).toBe(25 * 60_000); // 25 min
expect(calculateCooldownMs(4, max)).toBe(60 * 60_000); // 1 hour (cap)
expect(calculateCooldownMs(5, max)).toBe(60 * 60_000); // 1 hour (cap)
expect(calculateCooldownMs(100, max)).toBe(60 * 60_000); // still capped
});
it("returns 0 for errorCount <= 0", () => {
expect(calculateCooldownMs(0)).toBe(0);
expect(calculateCooldownMs(-1)).toBe(0);
});
it("applies equal jitter with a 50% floor", () => {
const min = () => 0;
expect(calculateCooldownMs(1, min)).toBe(30_000); // 50% of 1 min
});
});
// ============================================================
// calculateBillingDisableMs
// ============================================================
describe("calculateBillingDisableMs", () => {
it("applies exponential backoff with a 24h cap", () => {
const h = 60 * 60 * 1000;
const max = () => 1;
expect(calculateBillingDisableMs(1, max)).toBe(5 * h); // 5h
expect(calculateBillingDisableMs(2, max)).toBe(10 * h); // 10h
expect(calculateBillingDisableMs(3, max)).toBe(20 * h); // 20h
expect(calculateBillingDisableMs(4, max)).toBe(24 * h); // 24h (cap)
expect(calculateBillingDisableMs(5, max)).toBe(24 * h); // still capped
});
it("returns 0 for count <= 0", () => {
expect(calculateBillingDisableMs(0)).toBe(0);
expect(calculateBillingDisableMs(-1)).toBe(0);
});
});
// ============================================================
// isProfileInCooldown / resolveProfileUnusableUntil
// ============================================================
describe("isProfileInCooldown", () => {
const now = 1_000_000;
it("returns false for empty stats", () => {
expect(isProfileInCooldown({}, now)).toBe(false);
});
it("returns true when cooldownUntil is in the future", () => {
expect(isProfileInCooldown({ cooldownUntil: now + 1000 }, now)).toBe(true);
});
it("returns false when cooldownUntil has passed", () => {
expect(isProfileInCooldown({ cooldownUntil: now - 1 }, now)).toBe(false);
});
it("returns true when disabledUntil is in the future", () => {
expect(isProfileInCooldown({ disabledUntil: now + 1000 }, now)).toBe(true);
});
it("uses max of cooldownUntil and disabledUntil", () => {
const stats: ProfileUsageStats = {
cooldownUntil: now - 1,
disabledUntil: now + 5000,
};
expect(isProfileInCooldown(stats, now)).toBe(true);
expect(resolveProfileUnusableUntil(stats)).toBe(now + 5000);
});
});
// ============================================================
// computeNextProfileUsageStats
// ============================================================
describe("computeNextProfileUsageStats", () => {
const now = 1_700_000_000_000;
it("increments errorCount and sets cooldown for non-billing failure", () => {
const next = computeNextProfileUsageStats({}, "rate_limit", now, () => 1);
expect(next.errorCount).toBe(1);
expect(next.lastFailureAt).toBe(now);
expect(next.cooldownUntil).toBe(now + COOLDOWN_BASE_MS);
expect(next.failureCounts?.rate_limit).toBe(1);
expect(next.disabledUntil).toBeUndefined();
});
it("applies exponential backoff on consecutive failures", () => {
const stats: ProfileUsageStats = {
errorCount: 2,
lastFailureAt: now - 1000,
failureCounts: { rate_limit: 2 },
};
const next = computeNextProfileUsageStats(stats, "rate_limit", now, () => 1);
expect(next.errorCount).toBe(3);
// Error 3 -> 25 min cooldown
expect(next.cooldownUntil).toBe(now + 25 * 60_000);
});
it("sets disabledUntil for billing failures (~5h by default)", () => {
const next = computeNextProfileUsageStats({}, "billing", now, () => 1);
expect(next.errorCount).toBe(1);
expect(next.disabledUntil).toBe(now + 5 * 60 * 60 * 1000);
expect(next.disabledReason).toBe("billing");
expect(next.failureCounts?.billing).toBe(1);
});
it("resets counters when lastFailureAt is outside the failure window", () => {
const oldFailure = now - FAILURE_WINDOW_MS - 1000;
const stats: ProfileUsageStats = {
errorCount: 5,
lastFailureAt: oldFailure,
failureCounts: { auth: 3, rate_limit: 2 },
};
const next = computeNextProfileUsageStats(stats, "auth", now, () => 1);
// Counters reset, so this is treated as error #1
expect(next.errorCount).toBe(1);
expect(next.failureCounts?.auth).toBe(1);
expect(next.cooldownUntil).toBe(now + COOLDOWN_BASE_MS);
});
it("caps cooldown at COOLDOWN_MAX_MS", () => {
const stats: ProfileUsageStats = {
errorCount: 10,
lastFailureAt: now - 1000,
};
const next = computeNextProfileUsageStats(stats, "unknown", now, () => 1);
expect(next.cooldownUntil).toBe(now + COOLDOWN_MAX_MS);
});
});

View file

@ -0,0 +1,179 @@
/**
* Auth Profile Usage Tracking
*
* Tracks per-profile usage, computes cooldown durations with exponential backoff,
* and manages failure/success state transitions.
*/
import {
COOLDOWN_BASE_MS,
COOLDOWN_FACTOR,
COOLDOWN_MAX_MS,
BILLING_BACKOFF_HOURS,
BILLING_MAX_HOURS,
FAILURE_WINDOW_MS,
} from "./constants.js";
import { updateAuthProfileStore } from "./store.js";
import type {
AuthProfileFailureReason,
AuthProfileStore,
ProfileUsageStats,
} from "./types.js";
// ============================================================
// Cooldown checks
// ============================================================
/** Returns the timestamp until which a profile is unusable (0 if available) */
export function resolveProfileUnusableUntil(stats: ProfileUsageStats): number {
return Math.max(stats.cooldownUntil ?? 0, stats.disabledUntil ?? 0);
}
/** Check if a profile is currently in cooldown or disabled */
export function isProfileInCooldown(stats: ProfileUsageStats, now?: number): boolean {
return resolveProfileUnusableUntil(stats) > (now ?? Date.now());
}
// ============================================================
// Cooldown duration calculation
// ============================================================
/**
* Calculate non-billing cooldown duration in milliseconds.
* Exponential backoff: 1min -> 5min -> 25min -> 1hr (cap).
*
* Formula: min(COOLDOWN_MAX_MS, COOLDOWN_BASE_MS * COOLDOWN_FACTOR ^ min(errorCount - 1, 3))
*/
function applyEqualJitter(baseMs: number, rng?: () => number): number {
if (baseMs <= 0) return 0;
const rand = Math.min(1, Math.max(0, (rng ?? Math.random)()));
const half = Math.floor(baseMs / 2);
return half + Math.floor(rand * (baseMs - half));
}
export function calculateCooldownMs(errorCount: number, rng?: () => number): number {
if (errorCount <= 0) return 0;
const exponent = Math.min(errorCount - 1, 3);
const base = Math.min(COOLDOWN_MAX_MS, COOLDOWN_BASE_MS * COOLDOWN_FACTOR ** exponent);
return applyEqualJitter(base, rng);
}
/**
* Calculate billing disable duration in milliseconds.
* Exponential backoff: 5h -> 10h -> 20h -> 24h (cap).
*
* Formula: min(BILLING_MAX_HOURS, BILLING_BACKOFF_HOURS * 2 ^ (count - 1)) * hours_to_ms
*/
export function calculateBillingDisableMs(billingFailCount: number, rng?: () => number): number {
if (billingFailCount <= 0) return 0;
const hours = Math.min(
BILLING_MAX_HOURS,
BILLING_BACKOFF_HOURS * 2 ** (billingFailCount - 1),
);
const base = hours * 60 * 60 * 1000;
return applyEqualJitter(base, rng);
}
// ============================================================
// State transitions
// ============================================================
function ensureUsageStats(store: AuthProfileStore, profileId: string): ProfileUsageStats {
if (!store.usageStats) store.usageStats = {};
if (!store.usageStats[profileId]) store.usageStats[profileId] = {};
return store.usageStats[profileId];
}
/**
* Compute updated usage stats after a failure.
* Pure function does not mutate the input stats.
*/
export function computeNextProfileUsageStats(
stats: ProfileUsageStats,
reason: AuthProfileFailureReason,
now?: number,
rng?: () => number,
): ProfileUsageStats {
const ts = now ?? Date.now();
const next = { ...stats };
// Reset counters if last failure is outside the failure window
if (next.lastFailureAt && ts - next.lastFailureAt > FAILURE_WINDOW_MS) {
next.errorCount = 0;
next.failureCounts = {};
}
// Increment counters
next.errorCount = (next.errorCount ?? 0) + 1;
next.lastFailureAt = ts;
if (!next.failureCounts) next.failureCounts = {};
next.failureCounts = {
...next.failureCounts,
[reason]: (next.failureCounts[reason] ?? 0) + 1,
};
// Apply cooldown based on failure reason
if (reason === "billing") {
const billingCount = next.failureCounts.billing ?? 1;
const disableMs = calculateBillingDisableMs(billingCount, rng);
next.disabledUntil = ts + disableMs;
next.disabledReason = "billing";
} else {
const cooldownMs = calculateCooldownMs(next.errorCount, rng);
next.cooldownUntil = ts + cooldownMs;
}
return next;
}
/**
* Mark a profile as having failed. Persists updated stats to disk.
*/
export function markAuthProfileFailure(
profileId: string,
reason: AuthProfileFailureReason,
now?: number,
): void {
updateAuthProfileStore((store) => {
const current = ensureUsageStats(store, profileId);
const next = computeNextProfileUsageStats(current, reason, now);
store.usageStats![profileId] = next;
});
}
/**
* Mark a profile as successfully used. Resets all cooldown/error state.
*/
export function markAuthProfileUsed(profileId: string, now?: number): void {
updateAuthProfileStore((store) => {
const stats = ensureUsageStats(store, profileId);
stats.lastUsed = now ?? Date.now();
stats.errorCount = 0;
stats.cooldownUntil = undefined;
stats.disabledUntil = undefined;
stats.disabledReason = undefined;
stats.failureCounts = undefined;
});
}
/**
* Mark a profile as the last known good for a provider.
*/
export function markAuthProfileGood(provider: string, profileId: string): void {
updateAuthProfileStore((store) => {
if (!store.lastGood) store.lastGood = {};
store.lastGood[provider] = profileId;
});
}
/**
* Clear cooldown for a specific profile.
*/
export function clearAuthProfileCooldown(profileId: string): void {
updateAuthProfileStore((store) => {
const stats = ensureUsageStats(store, profileId);
stats.errorCount = 0;
stats.cooldownUntil = undefined;
});
}

View file

@ -21,6 +21,8 @@ export type CredentialsConfig = {
llm?: {
provider?: string | undefined;
providers?: Record<string, ProviderConfig> | undefined;
/** Explicit profile ordering per provider (e.g. { anthropic: ["anthropic", "anthropic:backup"] }) */
order?: Record<string, string[]> | undefined;
} | undefined;
tools?: Record<string, ToolConfig> | undefined;
};
@ -185,6 +187,30 @@ export class CredentialManager {
return name in process.env;
}
/**
* Get explicit profile order for a provider from credentials.json5 `llm.order`.
* Returns undefined if no explicit order is configured.
*/
getLlmOrder(provider: string): string[] | undefined {
this.loadCore();
return this.coreConfig?.llm?.order?.[provider];
}
/**
* List all profile IDs from `llm.providers` that belong to a given provider.
* A profile matches if its key equals the provider exactly or starts with "provider:".
*/
listProfileIdsForProvider(provider: string): string[] {
this.loadCore();
const providers = this.coreConfig?.llm?.providers;
if (!providers) return [];
const prefix = `${provider}:`;
return Object.keys(providers).filter(
(key) => key === provider || key.startsWith(prefix),
);
}
getResolvedEnvSnapshot(): Record<string, string> {
return { ...this.getResolvedSkillsEnv() };
}

View file

@ -28,6 +28,8 @@ export {
type ProviderConfig,
resolveProviderConfig,
resolveApiKey,
resolveApiKeyForProfile,
resolveApiKeyForProvider,
resolveBaseUrl,
resolveModelId,
resolveModel,

View file

@ -134,6 +134,7 @@ const PROVIDER_REGISTRY: Record<string, ProviderMeta> = {
*/
export const PROVIDER_ALIAS: Record<string, string> = {
"claude-code": "anthropic", // Claude Code OAuth uses anthropic API
"openai-codex": "openai", // Codex OAuth uses OpenAI API
};
// ============================================================

View file

@ -18,6 +18,12 @@ import {
isOAuthProvider,
} from "./registry.js";
import type { AgentOptions } from "../types.js";
import {
loadAuthProfileStore,
resolveAuthProfileOrder,
isProfileInCooldown,
} from "../auth-profiles/index.js";
import type { ResolvedProfileAuth } from "../auth-profiles/index.js";
// ============================================================
// Types
@ -128,6 +134,71 @@ export function resolveModelId(provider: string, explicitModel?: string): string
return credentialManager.getLlmProviderConfig(provider)?.model ?? getDefaultModel(provider);
}
// ============================================================
// Profile-aware API Key Resolution
// ============================================================
/**
* Resolve API key for a specific auth profile ID.
* Profile IDs follow the convention: "provider" or "provider:label".
*/
export function resolveApiKeyForProfile(profileId: string): string | undefined {
const config = credentialManager.getLlmProviderConfig(profileId);
return config?.apiKey;
}
/**
* Resolve API key by iterating auth profiles for a provider.
* Returns the first available (non-cooldown) profile with a valid key.
* Falls back to the legacy single-key resolution if no profiles are configured.
*/
export function resolveApiKeyForProvider(
provider: string,
explicitKey?: string,
): ResolvedProfileAuth | undefined {
if (explicitKey) {
return { apiKey: explicitKey, profileId: provider, provider };
}
// Try OAuth providers first
const providerConfig = resolveProviderConfig(provider);
if (providerConfig?.apiKey || providerConfig?.accessToken) {
const key = providerConfig.apiKey ?? providerConfig.accessToken;
if (key) return { apiKey: key, profileId: provider, provider };
}
// Try auth profiles (multi-key rotation)
const store = loadAuthProfileStore();
const candidates = resolveAuthProfileOrder(provider, store);
if (candidates.length > 0) {
for (const profileId of candidates) {
const stats = store.usageStats?.[profileId];
if (stats && isProfileInCooldown(stats)) continue;
const apiKey = resolveApiKeyForProfile(profileId);
if (apiKey) {
return { apiKey, profileId, provider };
}
}
// All in cooldown — return the first one (will be retried when cooldown expires)
for (const profileId of candidates) {
const apiKey = resolveApiKeyForProfile(profileId);
if (apiKey) {
return { apiKey, profileId, provider };
}
}
}
// Fall back to single-key credentials.json5
const fallbackKey = credentialManager.getLlmProviderConfig(provider)?.apiKey;
if (fallbackKey) {
return { apiKey: fallbackKey, profileId: provider, provider };
}
return undefined;
}
// ============================================================
// Model Resolution
// ============================================================

View file

@ -3,23 +3,64 @@ import { v7 as uuidv7 } from "uuid";
import type { AgentOptions, AgentRunResult } from "./types.js";
import { createAgentOutput } from "./cli/output.js";
import { resolveModel, resolveTools } from "./tools.js";
import {
resolveApiKey,
resolveApiKeyForProfile,
resolveApiKeyForProvider,
resolveBaseUrl,
resolveModelId,
} from "./providers/index.js";
import { SessionManager } from "./session/session-manager.js";
import { ProfileManager } from "./profile/index.js";
import { SkillManager } from "./skills/index.js";
import { credentialManager, getCredentialsPath } from "./credentials.js";
import {
resolveApiKey,
resolveBaseUrl,
resolveModelId,
isOAuthProvider,
getLoginInstructions,
} from "./providers/index.js";
import {
checkContextWindow,
DEFAULT_CONTEXT_TOKENS,
type ContextWindowGuardResult,
} from "./context-window/index.js";
import { mergeToolsConfig, type ToolsConfig } from "./tools/policy.js";
import {
loadAuthProfileStore,
resolveAuthProfileOrder,
isProfileInCooldown,
markAuthProfileFailure,
markAuthProfileUsed,
markAuthProfileGood,
} from "./auth-profiles/index.js";
import type { AuthProfileFailureReason } from "./auth-profiles/index.js";
// ============================================================
// Error classification for auth profile rotation
// ============================================================
/** Classify an error into an auth profile failure reason */
export function classifyError(error: unknown): AuthProfileFailureReason {
const msg = error instanceof Error ? error.message.toLowerCase() : String(error).toLowerCase();
if (msg.includes("401") || msg.includes("403") || msg.includes("unauthorized") || msg.includes("invalid api key") || msg.includes("authentication")) {
return "auth";
}
if (msg.includes("400") || msg.includes("invalid request") || msg.includes("malformed") || msg.includes("bad request") || msg.includes("schema")) {
return "format";
}
if (msg.includes("429") || msg.includes("rate limit") || msg.includes("rate_limit") || msg.includes("too many requests")) {
return "rate_limit";
}
if (msg.includes("billing") || msg.includes("quota") || msg.includes("insufficient") || msg.includes("payment")) {
return "billing";
}
if (msg.includes("timeout") || msg.includes("timed out") || msg.includes("econnreset") || msg.includes("etimedout")) {
return "timeout";
}
return "unknown";
}
/** Check if an error is potentially retryable via profile rotation */
export function isRotatableError(reason: AuthProfileFailureReason): boolean {
// timeout is rotatable because some providers hang on rate limit instead of returning 429
return reason === "auth" || reason === "rate_limit" || reason === "billing" || reason === "timeout";
}
export class Agent {
private readonly agent: PiAgentCore;
@ -31,52 +72,83 @@ export class Agent {
private readonly debug: boolean;
private toolsOptions: AgentOptions;
private readonly originalToolsConfig?: ToolsConfig;
private readonly stderr: NodeJS.WritableStream;
private initialized = false;
// Auth profile rotation state
private readonly resolvedProvider: string;
private currentApiKey: string | undefined;
private currentProfileId: string | undefined;
private profileCandidates: string[];
private profileIndex: number;
private readonly pinnedProfile: boolean;
/** Current session ID */
readonly sessionId: string;
constructor(options: AgentOptions = {}) {
const stdout = options.logger?.stdout ?? process.stdout;
const stderr = options.logger?.stderr ?? process.stderr;
this.output = createAgentOutput({ stdout, stderr });
this.stderr = options.logger?.stderr ?? process.stderr;
this.output = createAgentOutput({ stdout, stderr: this.stderr });
this.debug = options.debug ?? false;
// Resolve provider and model from options > env vars > defaults
const resolvedProvider = options.provider ?? credentialManager.getLlmProvider() ?? "kimi-coding";
const resolvedModel = resolveModelId(resolvedProvider, options.model);
const apiKey = resolveApiKey(resolvedProvider, options.apiKey);
// Validate credentials before proceeding
if (!apiKey) {
if (isOAuthProvider(resolvedProvider)) {
// OAuth provider without valid credentials - show login instructions
const instructions = getLoginInstructions(resolvedProvider);
const defaultProvider = options.provider ?? credentialManager.getLlmProvider() ?? "kimi-coding";
if (options.authProfileId) {
const profileProvider = options.authProfileId.includes(":")
? options.authProfileId.split(":")[0]!
: options.authProfileId;
if (options.provider && options.provider !== profileProvider) {
throw new Error(
`Provider "${resolvedProvider}" requires authentication.\n\n` +
`${instructions}\n\n` +
`After logging in, run: multica --provider ${resolvedProvider}`,
`authProfileId provider mismatch: authProfileId="${options.authProfileId}" ` +
`does not match provider="${options.provider}"`,
);
}
// API Key provider without key - show configuration instructions
throw new Error(
`Provider "${resolvedProvider}" requires an API key.\n\n` +
`Add your API key to: ${getCredentialsPath()}\n\n` +
`Example:\n` +
`{\n` +
` "llm": {\n` +
` "provider": "${resolvedProvider}",\n` +
` "providers": {\n` +
` "${resolvedProvider}": {\n` +
` "apiKey": "your-api-key-here"\n` +
` }\n` +
` }\n` +
` }\n` +
`}`,
);
this.resolvedProvider = profileProvider;
} else {
this.resolvedProvider = defaultProvider;
}
const resolvedModel = resolveModelId(this.resolvedProvider, options.model);
// === Auth profile resolution ===
this.pinnedProfile = !!(options.authProfileId || options.apiKey);
if (options.apiKey) {
// Explicit API key — no rotation
this.currentApiKey = options.apiKey;
this.currentProfileId = this.resolvedProvider;
this.profileCandidates = [];
this.profileIndex = 0;
} else if (options.authProfileId) {
// Pinned profile — no rotation
this.currentApiKey = resolveApiKeyForProfile(options.authProfileId)
?? resolveApiKey(this.resolvedProvider);
this.currentProfileId = options.authProfileId;
this.profileCandidates = [];
this.profileIndex = 0;
} else {
// Profile-aware resolution with rotation support
const resolved = resolveApiKeyForProvider(this.resolvedProvider);
if (resolved) {
this.currentApiKey = resolved.apiKey;
this.currentProfileId = resolved.profileId;
} else {
this.currentApiKey = undefined;
this.currentProfileId = undefined;
}
// Load full candidate list for rotation
const store = loadAuthProfileStore();
this.profileCandidates = resolveAuthProfileOrder(this.resolvedProvider, store);
this.profileIndex = this.currentProfileId
? Math.max(0, this.profileCandidates.indexOf(this.currentProfileId))
: 0;
}
this.agent = new PiAgentCore(
{ getApiKey: (_provider: string) => apiKey },
this.currentApiKey
? { getApiKey: (_provider: string) => this.currentApiKey! }
: {},
);
// Load Agent Profile (if profileId is specified)
@ -124,7 +196,7 @@ export class Agent {
return tempSession.getMeta();
})();
const effectiveProvider = resolvedModel ? resolvedProvider : (options.provider ?? storedMeta?.provider);
const effectiveProvider = resolvedModel ? this.resolvedProvider : (options.provider ?? storedMeta?.provider);
const effectiveModel = resolvedModel ?? options.model ?? storedMeta?.model;
let model = resolveModel({ ...options, provider: effectiveProvider, model: effectiveModel });
@ -150,7 +222,7 @@ export class Agent {
// 警告context window 较小
if (this.contextWindowGuard.shouldWarn) {
stderr.write(
this.stderr.write(
`[Context Window Guard] WARNING: Low context window: ${this.contextWindowGuard.tokens} tokens (source: ${this.contextWindowGuard.source})\n`,
);
}
@ -167,7 +239,9 @@ export class Agent {
const compactionMode = options.compactionMode ?? "tokens"; // 默认使用 token 模式
// 获取 API Key用于 summary 模式)
const summaryApiKey = compactionMode === "summary" ? resolveApiKey(model.provider, options.apiKey) : undefined;
const summaryApiKey = compactionMode === "summary"
? resolveApiKey(this.resolvedProvider, options.apiKey)
: undefined;
// 创建 SessionManager带 context window 配置)
this.session = new SessionManager({
@ -211,31 +285,6 @@ export class Agent {
}
this.agent.setTools(tools);
const restoredMessages = this.session.loadMessages();
if (restoredMessages.length > 0) {
if (this.debug) {
console.error(`[debug] Restoring ${restoredMessages.length} messages from session`);
for (const msg of restoredMessages) {
const msgAny = msg as any;
const content = Array.isArray(msgAny.content)
? msgAny.content.map((c: any) => c.type || "text").join(", ")
: typeof msgAny.content;
console.error(`[debug] ${msg.role}: ${content}`);
if (Array.isArray(msgAny.content)) {
for (const block of msgAny.content) {
if (block.type === "tool_use") {
console.error(`[debug] tool_use id: ${block.id}, name: ${block.name}`);
}
if (block.type === "tool_result") {
console.error(`[debug] tool_result tool_use_id: ${block.tool_use_id}`);
}
}
}
}
}
this.agent.replaceMessages(restoredMessages);
}
this.session.saveMeta({
provider: this.agent.state.model?.provider,
model: this.agent.state.model?.id,
@ -247,19 +296,128 @@ export class Agent {
this.output.handleEvent(event);
this.handleSessionEvent(event);
});
if (this.debug && this.currentProfileId) {
console.error(`[debug] Auth profile: ${this.currentProfileId} (pinned=${this.pinnedProfile}, candidates=${this.profileCandidates.length})`);
}
}
/** Subscribe to agent events (returns unsubscribe function) */
/** Subscribe to raw AgentEvent from the underlying engine */
subscribe(fn: (event: AgentEvent) => void): () => void {
return this.agent.subscribe(fn);
}
async run(prompt: string): Promise<AgentRunResult> {
if (!this.initialized) {
await this.session.repairIfNeeded((msg) => console.error(msg));
const restoredMessages = this.session.loadMessages();
if (restoredMessages.length > 0) {
if (this.debug) {
console.error(`[debug] Restoring ${restoredMessages.length} messages from session`);
for (const msg of restoredMessages) {
const msgAny = msg as any;
const content = Array.isArray(msgAny.content)
? msgAny.content.map((c: any) => c.type || "text").join(", ")
: typeof msgAny.content;
console.error(`[debug] ${msg.role}: ${content}`);
if (Array.isArray(msgAny.content)) {
for (const block of msgAny.content) {
if (block.type === "tool_use") {
console.error(`[debug] tool_use id: ${block.id}, name: ${block.name}`);
}
if (block.type === "tool_result") {
console.error(`[debug] tool_result tool_use_id: ${block.tool_use_id}`);
}
}
}
}
}
this.agent.replaceMessages(restoredMessages);
}
this.initialized = true;
}
this.output.state.lastAssistantText = "";
await this.agent.prompt(prompt);
const canRotate = !this.pinnedProfile && this.profileCandidates.length > 1;
let lastError: unknown;
// Loop to exhaust all candidate profiles on rotatable errors
while (true) {
try {
await this.agent.prompt(prompt);
break; // success — exit loop
} catch (error) {
lastError = error;
const reason = classifyError(error);
if (this.currentProfileId && isRotatableError(reason)) {
markAuthProfileFailure(this.currentProfileId, reason);
}
if (!canRotate || !this.currentProfileId) throw error;
if (!isRotatableError(reason)) throw error;
if (this.debug) {
this.stderr.write(
`[auth-profile] Profile "${this.currentProfileId}" failed (${reason}), attempting rotation...\n`,
);
}
if (!this.advanceAuthProfile()) {
throw lastError; // All profiles exhausted
}
if (this.debug) {
this.stderr.write(
`[auth-profile] Rotated to profile "${this.currentProfileId}"\n`,
);
}
// Reset output for retry
this.output.state.lastAssistantText = "";
// continue loop with new profile
}
}
// Mark success
if (this.currentProfileId) {
markAuthProfileUsed(this.currentProfileId);
markAuthProfileGood(this.resolvedProvider, this.currentProfileId);
}
return { text: this.output.state.lastAssistantText, error: this.agent.state.error };
}
/**
* Advance to the next non-cooldown auth profile.
* Returns true if a new profile was activated, false if exhausted.
*/
private advanceAuthProfile(): boolean {
const store = loadAuthProfileStore();
const startIndex = this.profileIndex;
for (let i = 1; i < this.profileCandidates.length; i++) {
const nextIndex = (startIndex + i) % this.profileCandidates.length;
const candidateId = this.profileCandidates[nextIndex] as string | undefined;
if (!candidateId) continue;
// Skip profiles in cooldown
const stats = store.usageStats?.[candidateId];
if (stats && isProfileInCooldown(stats)) continue;
// Try to resolve API key
const apiKey = resolveApiKeyForProfile(candidateId);
if (!apiKey) continue;
this.currentApiKey = apiKey;
this.currentProfileId = candidateId;
this.profileIndex = nextIndex;
return true;
}
return false;
}
private handleSessionEvent(event: AgentEvent) {
if (event.type === "message_end") {
const message = event.message as AgentMessage;

View file

@ -0,0 +1,105 @@
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { beforeEach, describe, expect, it, vi } from "vitest";
import { repairSessionFileIfNeeded } from "./session-file-repair.js";
import { acquireSessionWriteLock } from "./session-write-lock.js";
vi.mock("./session-write-lock.js", async () => {
const actual = await vi.importActual<typeof import("./session-write-lock.js")>(
"./session-write-lock.js",
);
return {
...actual,
acquireSessionWriteLock: vi.fn(actual.acquireSessionWriteLock),
};
});
describe("repairSessionFileIfNeeded", () => {
beforeEach(() => {
vi.mocked(acquireSessionWriteLock).mockClear();
});
it("rewrites session files that contain malformed lines", async () => {
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "multica-session-repair-"));
const file = path.join(dir, "session.jsonl");
const meta = {
type: "meta",
meta: { provider: "kimi", model: "moonshot-v1-128k" },
timestamp: Date.now(),
};
const message = {
type: "message",
message: { role: "user", content: "hello" },
timestamp: Date.now(),
};
const content = `${JSON.stringify(meta)}\n${JSON.stringify(message)}\n{"type":"message"`;
await fs.writeFile(file, content, "utf-8");
const result = await repairSessionFileIfNeeded({ sessionFile: file });
expect(result.repaired).toBe(true);
expect(result.droppedLines).toBe(1);
expect(result.backupPath).toBeTruthy();
const repaired = await fs.readFile(file, "utf-8");
expect(repaired.trim().split("\n")).toHaveLength(2);
if (result.backupPath) {
const backup = await fs.readFile(result.backupPath, "utf-8");
expect(backup).toBe(content);
}
});
it("does not drop CRLF-terminated JSONL lines", async () => {
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "multica-session-repair-"));
const file = path.join(dir, "session.jsonl");
const meta = {
type: "meta",
meta: { provider: "kimi", model: "moonshot-v1-128k" },
timestamp: Date.now(),
};
const message = {
type: "message",
message: { role: "user", content: "hello" },
timestamp: Date.now(),
};
const content = `${JSON.stringify(meta)}\r\n${JSON.stringify(message)}\r\n`;
await fs.writeFile(file, content, "utf-8");
const result = await repairSessionFileIfNeeded({ sessionFile: file });
expect(result.repaired).toBe(false);
expect(result.droppedLines).toBe(0);
});
it("returns reason when file is empty after dropping all lines", async () => {
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "multica-session-repair-"));
const file = path.join(dir, "session.jsonl");
await fs.writeFile(file, "{broken\n{also broken\n", "utf-8");
const result = await repairSessionFileIfNeeded({ sessionFile: file });
expect(result.repaired).toBe(false);
expect(result.reason).toBe("empty session file");
});
it("returns a detailed reason when read errors are not ENOENT", async () => {
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "multica-session-repair-"));
const warn = vi.fn();
const result = await repairSessionFileIfNeeded({ sessionFile: dir, warn });
expect(result.repaired).toBe(false);
expect(result.reason).toContain("failed to read session file");
expect(warn).toHaveBeenCalledTimes(1);
});
it("acquires a write lock while repairing", async () => {
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "multica-session-repair-"));
const file = path.join(dir, "session.jsonl");
await fs.writeFile(file, "{broken\n{also broken\n", "utf-8");
await repairSessionFileIfNeeded({ sessionFile: file });
expect(vi.mocked(acquireSessionWriteLock)).toHaveBeenCalledTimes(1);
});
});

View file

@ -0,0 +1,102 @@
import fs from "node:fs/promises";
import path from "node:path";
import { acquireSessionWriteLock } from "./session-write-lock.js";
type RepairReport = {
repaired: boolean;
droppedLines: number;
backupPath?: string;
reason?: string;
};
export type { RepairReport };
export async function repairSessionFileIfNeeded(params: {
sessionFile: string;
warn?: (message: string) => void;
}): Promise<RepairReport> {
const sessionFile = params.sessionFile.trim();
if (!sessionFile) {
return { repaired: false, droppedLines: 0, reason: "missing session file" };
}
const lock = await acquireSessionWriteLock({ sessionFile });
try {
let content: string;
try {
content = await fs.readFile(sessionFile, "utf-8");
} catch (err) {
const code = (err as { code?: unknown } | undefined)?.code;
if (code === "ENOENT") {
return { repaired: false, droppedLines: 0, reason: "missing session file" };
}
const reason = `failed to read session file: ${err instanceof Error ? err.message : "unknown error"}`;
params.warn?.(`session file repair skipped: ${reason} (${path.basename(sessionFile)})`);
return { repaired: false, droppedLines: 0, reason };
}
const lines = content.split(/\r?\n/);
const entries: unknown[] = [];
let droppedLines = 0;
for (const line of lines) {
if (!line.trim()) {
continue;
}
try {
const entry = JSON.parse(line);
entries.push(entry);
} catch {
droppedLines += 1;
}
}
if (entries.length === 0) {
return { repaired: false, droppedLines, reason: "empty session file" };
}
if (droppedLines === 0) {
return { repaired: false, droppedLines: 0 };
}
const cleaned = `${entries.map((entry) => JSON.stringify(entry)).join("\n")}\n`;
const backupPath = `${sessionFile}.bak-${process.pid}-${Date.now()}`;
const tmpPath = `${sessionFile}.repair-${process.pid}-${Date.now()}.tmp`;
try {
const stat = await fs.stat(sessionFile).catch(() => null);
await fs.writeFile(backupPath, content, "utf-8");
if (stat) {
await fs.chmod(backupPath, stat.mode);
}
await fs.writeFile(tmpPath, cleaned, "utf-8");
if (stat) {
await fs.chmod(tmpPath, stat.mode);
}
await fs.rename(tmpPath, sessionFile);
} catch (err) {
try {
await fs.unlink(tmpPath);
} catch (cleanupErr) {
params.warn?.(
`session file repair cleanup failed: ${cleanupErr instanceof Error ? cleanupErr.message : "unknown error"} (${path.basename(
tmpPath,
)})`,
);
}
return {
repaired: false,
droppedLines,
reason: `repair failed: ${err instanceof Error ? err.message : "unknown error"}`,
};
}
params.warn?.(
`session file repaired: dropped ${droppedLines} malformed line(s) (${path.basename(
sessionFile,
)})`,
);
return { repaired: true, droppedLines, backupPath };
} finally {
await lock.release();
}
}

View file

@ -1,9 +1,11 @@
import type { AgentMessage } from "@mariozechner/pi-agent-core";
import { getModel, type Model } from "@mariozechner/pi-ai";
import type { SessionEntry, SessionMeta } from "./types.js";
import { appendEntry, readEntries, writeEntries } from "./storage.js";
import { appendEntry, readEntries, resolveSessionPath, writeEntries } from "./storage.js";
import { compactMessages, compactMessagesAsync } from "./compaction.js";
import { credentialManager } from "../credentials.js";
import { repairSessionFileIfNeeded, type RepairReport } from "./session-file-repair.js";
import { sanitizeToolCallInputs, sanitizeToolUseResultPairing } from "./session-transcript-repair.js";
/** Get Kimi model for summarization (use a cheaper model than k2-thinking) */
function getSummaryModel(): Model<any> {
@ -140,11 +142,19 @@ export class SessionManager {
return readEntries(this.sessionId, { baseDir: this.baseDir });
}
async repairIfNeeded(warn?: (message: string) => void): Promise<RepairReport> {
const filePath = resolveSessionPath(this.sessionId, { baseDir: this.baseDir });
return repairSessionFileIfNeeded({ sessionFile: filePath, warn });
}
loadMessages(): AgentMessage[] {
const entries = this.loadEntries();
return entries
let messages = entries
.filter((entry) => entry.type === "message")
.map((entry) => entry.message);
messages = sanitizeToolCallInputs(messages);
messages = sanitizeToolUseResultPairing(messages);
return messages;
}
loadMeta(): SessionMeta | undefined {

View file

@ -0,0 +1,150 @@
import type { AgentMessage } from "@mariozechner/pi-agent-core";
import { describe, expect, it } from "vitest";
import {
sanitizeToolCallInputs,
sanitizeToolUseResultPairing,
} from "./session-transcript-repair.js";
describe("sanitizeToolUseResultPairing", () => {
it("moves tool results directly after tool calls and inserts missing results", () => {
const input = [
{
role: "assistant",
content: [
{ type: "toolCall", id: "call_1", name: "read", arguments: {} },
{ type: "toolCall", id: "call_2", name: "exec", arguments: {} },
],
},
{ role: "user", content: "user message that should come after tool use" },
{
role: "toolResult",
toolCallId: "call_2",
toolName: "exec",
content: [{ type: "text", text: "ok" }],
isError: false,
},
] satisfies AgentMessage[];
const out = sanitizeToolUseResultPairing(input);
expect(out[0]?.role).toBe("assistant");
expect(out[1]?.role).toBe("toolResult");
expect((out[1] as { toolCallId?: string }).toolCallId).toBe("call_1");
expect(out[2]?.role).toBe("toolResult");
expect((out[2] as { toolCallId?: string }).toolCallId).toBe("call_2");
expect(out[3]?.role).toBe("user");
});
it("drops duplicate tool results for the same id within a span", () => {
const input = [
{
role: "assistant",
content: [{ type: "toolCall", id: "call_1", name: "read", arguments: {} }],
},
{
role: "toolResult",
toolCallId: "call_1",
toolName: "read",
content: [{ type: "text", text: "first" }],
isError: false,
},
{
role: "toolResult",
toolCallId: "call_1",
toolName: "read",
content: [{ type: "text", text: "second" }],
isError: false,
},
{ role: "user", content: "ok" },
] satisfies AgentMessage[];
const out = sanitizeToolUseResultPairing(input);
expect(out.filter((m) => m.role === "toolResult")).toHaveLength(1);
});
it("drops duplicate tool results for the same id across the transcript", () => {
const input = [
{
role: "assistant",
content: [{ type: "toolCall", id: "call_1", name: "read", arguments: {} }],
},
{
role: "toolResult",
toolCallId: "call_1",
toolName: "read",
content: [{ type: "text", text: "first" }],
isError: false,
},
{ role: "assistant", content: [{ type: "text", text: "ok" }] },
{
role: "toolResult",
toolCallId: "call_1",
toolName: "read",
content: [{ type: "text", text: "second (duplicate)" }],
isError: false,
},
] satisfies AgentMessage[];
const out = sanitizeToolUseResultPairing(input);
const results = out.filter((m) => m.role === "toolResult") as Array<{
toolCallId?: string;
}>;
expect(results).toHaveLength(1);
expect(results[0]?.toolCallId).toBe("call_1");
});
it("drops orphan tool results that do not match any tool call", () => {
const input = [
{ role: "user", content: "hello" },
{
role: "toolResult",
toolCallId: "call_orphan",
toolName: "read",
content: [{ type: "text", text: "orphan" }],
isError: false,
},
{
role: "assistant",
content: [{ type: "text", text: "ok" }],
},
] satisfies AgentMessage[];
const out = sanitizeToolUseResultPairing(input);
expect(out.some((m) => m.role === "toolResult")).toBe(false);
expect(out.map((m) => m.role)).toEqual(["user", "assistant"]);
});
});
describe("sanitizeToolCallInputs", () => {
it("drops tool calls missing input or arguments", () => {
const input: AgentMessage[] = [
{
role: "assistant",
content: [{ type: "toolCall", id: "call_1", name: "read" }],
},
{ role: "user", content: "hello" },
];
const out = sanitizeToolCallInputs(input);
expect(out.map((m) => m.role)).toEqual(["user"]);
});
it("keeps valid tool calls and preserves text blocks", () => {
const input: AgentMessage[] = [
{
role: "assistant",
content: [
{ type: "text", text: "before" },
{ type: "toolUse", id: "call_ok", name: "read", input: { path: "a" } },
{ type: "toolCall", id: "call_drop", name: "read" },
],
},
];
const out = sanitizeToolCallInputs(input);
const assistant = out[0] as Extract<AgentMessage, { role: "assistant" }>;
const types = Array.isArray(assistant.content)
? assistant.content.map((block) => (block as { type?: unknown }).type)
: [];
expect(types).toEqual(["text", "toolUse"]);
});
});

View file

@ -0,0 +1,295 @@
import type { AgentMessage } from "@mariozechner/pi-agent-core";
type ToolCallLike = {
id: string;
name?: string;
};
const TOOL_CALL_TYPES = new Set(["toolCall", "toolUse", "functionCall"]);
type ToolCallBlock = {
type?: unknown;
id?: unknown;
name?: unknown;
input?: unknown;
arguments?: unknown;
};
function extractToolCallsFromAssistant(
msg: Extract<AgentMessage, { role: "assistant" }>,
): ToolCallLike[] {
const content = msg.content;
if (!Array.isArray(content)) {
return [];
}
const toolCalls: ToolCallLike[] = [];
for (const block of content) {
if (!block || typeof block !== "object") {
continue;
}
const rec = block as { type?: unknown; id?: unknown; name?: unknown };
if (typeof rec.id !== "string" || !rec.id) {
continue;
}
if (rec.type === "toolCall" || rec.type === "toolUse" || rec.type === "functionCall") {
toolCalls.push({
id: rec.id,
name: typeof rec.name === "string" ? rec.name : undefined,
});
}
}
return toolCalls;
}
function isToolCallBlock(block: unknown): block is ToolCallBlock {
if (!block || typeof block !== "object") {
return false;
}
const type = (block as { type?: unknown }).type;
return typeof type === "string" && TOOL_CALL_TYPES.has(type);
}
function hasToolCallInput(block: ToolCallBlock): boolean {
const hasInput = "input" in block ? block.input !== undefined && block.input !== null : false;
const hasArguments =
"arguments" in block ? block.arguments !== undefined && block.arguments !== null : false;
return hasInput || hasArguments;
}
function extractToolResultId(msg: Extract<AgentMessage, { role: "toolResult" }>): string | null {
const toolCallId = (msg as { toolCallId?: unknown }).toolCallId;
if (typeof toolCallId === "string" && toolCallId) {
return toolCallId;
}
const toolUseId = (msg as { toolUseId?: unknown }).toolUseId;
if (typeof toolUseId === "string" && toolUseId) {
return toolUseId;
}
return null;
}
function makeMissingToolResult(params: {
toolCallId: string;
toolName?: string;
}): Extract<AgentMessage, { role: "toolResult" }> {
return {
role: "toolResult",
toolCallId: params.toolCallId,
toolName: params.toolName ?? "unknown",
content: [
{
type: "text",
text: "[multica] missing tool result in session history; inserted synthetic error result for transcript repair.",
},
],
isError: true,
timestamp: Date.now(),
} as Extract<AgentMessage, { role: "toolResult" }>;
}
export { makeMissingToolResult };
export type ToolCallInputRepairReport = {
messages: AgentMessage[];
droppedToolCalls: number;
droppedAssistantMessages: number;
};
export function repairToolCallInputs(messages: AgentMessage[]): ToolCallInputRepairReport {
let droppedToolCalls = 0;
let droppedAssistantMessages = 0;
let changed = false;
const out: AgentMessage[] = [];
for (const msg of messages) {
if (!msg || typeof msg !== "object") {
out.push(msg);
continue;
}
if (msg.role !== "assistant" || !Array.isArray(msg.content)) {
out.push(msg);
continue;
}
const nextContent = [];
let droppedInMessage = 0;
for (const block of msg.content) {
if (isToolCallBlock(block) && !hasToolCallInput(block)) {
droppedToolCalls += 1;
droppedInMessage += 1;
changed = true;
continue;
}
nextContent.push(block);
}
if (droppedInMessage > 0) {
if (nextContent.length === 0) {
droppedAssistantMessages += 1;
changed = true;
continue;
}
out.push({ ...msg, content: nextContent });
continue;
}
out.push(msg);
}
return {
messages: changed ? out : messages,
droppedToolCalls,
droppedAssistantMessages,
};
}
export function sanitizeToolCallInputs(messages: AgentMessage[]): AgentMessage[] {
return repairToolCallInputs(messages).messages;
}
export function sanitizeToolUseResultPairing(messages: AgentMessage[]): AgentMessage[] {
return repairToolUseResultPairing(messages).messages;
}
export type ToolUseRepairReport = {
messages: AgentMessage[];
added: Array<Extract<AgentMessage, { role: "toolResult" }>>;
droppedDuplicateCount: number;
droppedOrphanCount: number;
moved: boolean;
};
export function repairToolUseResultPairing(messages: AgentMessage[]): ToolUseRepairReport {
const out: AgentMessage[] = [];
const added: Array<Extract<AgentMessage, { role: "toolResult" }>> = [];
const seenToolResultIds = new Set<string>();
let droppedDuplicateCount = 0;
let droppedOrphanCount = 0;
let moved = false;
let changed = false;
const pushToolResult = (msg: Extract<AgentMessage, { role: "toolResult" }>) => {
const id = extractToolResultId(msg);
if (id && seenToolResultIds.has(id)) {
droppedDuplicateCount += 1;
changed = true;
return;
}
if (id) {
seenToolResultIds.add(id);
}
out.push(msg);
};
for (let i = 0; i < messages.length; i += 1) {
const msg = messages[i];
if (!msg || typeof msg !== "object") {
out.push(msg);
continue;
}
const role = (msg as { role?: unknown }).role;
if (role !== "assistant") {
if (role !== "toolResult") {
out.push(msg);
} else {
droppedOrphanCount += 1;
changed = true;
}
continue;
}
const assistant = msg as Extract<AgentMessage, { role: "assistant" }>;
const toolCalls = extractToolCallsFromAssistant(assistant);
if (toolCalls.length === 0) {
out.push(msg);
continue;
}
const toolCallIds = new Set(toolCalls.map((t) => t.id));
const spanResultsById = new Map<string, Extract<AgentMessage, { role: "toolResult" }>>();
const remainder: AgentMessage[] = [];
let j = i + 1;
for (; j < messages.length; j += 1) {
const next = messages[j];
if (!next || typeof next !== "object") {
remainder.push(next);
continue;
}
const nextRole = (next as { role?: unknown }).role;
if (nextRole === "assistant") {
break;
}
if (nextRole === "toolResult") {
const toolResult = next as Extract<AgentMessage, { role: "toolResult" }>;
const id = extractToolResultId(toolResult);
if (id && toolCallIds.has(id)) {
if (seenToolResultIds.has(id)) {
droppedDuplicateCount += 1;
changed = true;
continue;
}
if (!spanResultsById.has(id)) {
spanResultsById.set(id, toolResult);
}
continue;
}
}
if (nextRole !== "toolResult") {
remainder.push(next);
} else {
droppedOrphanCount += 1;
changed = true;
}
}
out.push(msg);
if (spanResultsById.size > 0 && remainder.length > 0) {
moved = true;
changed = true;
}
for (const call of toolCalls) {
const existing = spanResultsById.get(call.id);
if (existing) {
pushToolResult(existing);
} else {
const missing = makeMissingToolResult({
toolCallId: call.id,
toolName: call.name,
});
added.push(missing);
changed = true;
pushToolResult(missing);
}
}
for (const rem of remainder) {
if (!rem || typeof rem !== "object") {
out.push(rem);
continue;
}
out.push(rem);
}
i = j - 1;
}
const changedOrMoved = changed || moved;
return {
messages: changedOrMoved ? out : messages,
added,
droppedDuplicateCount,
droppedOrphanCount,
moved: changedOrMoved,
};
}

View file

@ -0,0 +1,194 @@
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { describe, expect, it } from "vitest";
import { __testing, acquireSessionWriteLock } from "./session-write-lock.js";
describe("acquireSessionWriteLock", () => {
it("reuses locks across symlinked session paths", async () => {
if (process.platform === "win32") {
expect(true).toBe(true);
return;
}
const root = await fs.mkdtemp(path.join(os.tmpdir(), "multica-lock-"));
try {
const realDir = path.join(root, "real");
const linkDir = path.join(root, "link");
await fs.mkdir(realDir, { recursive: true });
await fs.symlink(realDir, linkDir);
const sessionReal = path.join(realDir, "sessions.json");
const sessionLink = path.join(linkDir, "sessions.json");
const lockA = await acquireSessionWriteLock({ sessionFile: sessionReal, timeoutMs: 500 });
const lockB = await acquireSessionWriteLock({ sessionFile: sessionLink, timeoutMs: 500 });
await lockB.release();
await lockA.release();
} finally {
await fs.rm(root, { recursive: true, force: true });
}
});
it("keeps the lock file until the last release", async () => {
const root = await fs.mkdtemp(path.join(os.tmpdir(), "multica-lock-"));
try {
const sessionFile = path.join(root, "sessions.json");
const lockPath = `${sessionFile}.lock`;
const lockA = await acquireSessionWriteLock({ sessionFile, timeoutMs: 500 });
const lockB = await acquireSessionWriteLock({ sessionFile, timeoutMs: 500 });
await expect(fs.access(lockPath)).resolves.toBeUndefined();
await lockA.release();
await expect(fs.access(lockPath)).resolves.toBeUndefined();
await lockB.release();
await expect(fs.access(lockPath)).rejects.toThrow();
} finally {
await fs.rm(root, { recursive: true, force: true });
}
});
it("reclaims stale lock files", async () => {
const root = await fs.mkdtemp(path.join(os.tmpdir(), "multica-lock-"));
try {
const sessionFile = path.join(root, "sessions.json");
const lockPath = `${sessionFile}.lock`;
await fs.writeFile(
lockPath,
JSON.stringify({ pid: 123456, createdAt: new Date(Date.now() - 60_000).toISOString() }),
"utf8",
);
const lock = await acquireSessionWriteLock({ sessionFile, timeoutMs: 500, staleMs: 10 });
const raw = await fs.readFile(lockPath, "utf8");
const payload = JSON.parse(raw) as { pid: number };
expect(payload.pid).toBe(process.pid);
await lock.release();
} finally {
await fs.rm(root, { recursive: true, force: true });
}
});
it("does not delete recent lock files with invalid payloads", async () => {
const root = await fs.mkdtemp(path.join(os.tmpdir(), "multica-lock-"));
try {
const sessionFile = path.join(root, "sessions.json");
const lockPath = `${sessionFile}.lock`;
await fs.writeFile(lockPath, "{", "utf8");
await expect(
acquireSessionWriteLock({ sessionFile, timeoutMs: 200, staleMs: 60_000 }),
).rejects.toThrow(/timeout/);
await expect(fs.access(lockPath)).resolves.toBeUndefined();
} finally {
await fs.rm(root, { recursive: true, force: true });
}
});
it("reclaims invalid lock files when mtime is stale", async () => {
const root = await fs.mkdtemp(path.join(os.tmpdir(), "multica-lock-"));
try {
const sessionFile = path.join(root, "sessions.json");
const lockPath = `${sessionFile}.lock`;
await fs.writeFile(lockPath, "{", "utf8");
const old = new Date(Date.now() - 60_000);
await fs.utimes(lockPath, old, old);
const lock = await acquireSessionWriteLock({ sessionFile, timeoutMs: 500, staleMs: 10 });
await lock.release();
} finally {
await fs.rm(root, { recursive: true, force: true });
}
});
it("removes held locks on termination signals", async () => {
const signals = ["SIGINT", "SIGTERM", "SIGQUIT", "SIGABRT"] as const;
for (const signal of signals) {
const root = await fs.mkdtemp(path.join(os.tmpdir(), "multica-lock-cleanup-"));
// Prevent the signal from actually killing the vitest worker
const keepAlive = () => {};
process.on(signal, keepAlive);
try {
const sessionFile = path.join(root, "sessions.json");
const lockPath = `${sessionFile}.lock`;
await acquireSessionWriteLock({ sessionFile, timeoutMs: 500 });
__testing.handleTerminationSignal(signal);
await expect(fs.stat(lockPath)).rejects.toThrow();
} finally {
process.off(signal, keepAlive);
await fs.rm(root, { recursive: true, force: true });
}
}
});
it("registers cleanup for SIGQUIT and SIGABRT", () => {
expect(__testing.cleanupSignals).toContain("SIGQUIT");
expect(__testing.cleanupSignals).toContain("SIGABRT");
});
it("cleans up locks on SIGINT without removing other handlers", async () => {
const root = await fs.mkdtemp(path.join(os.tmpdir(), "multica-lock-"));
const originalKill = process.kill.bind(process) as typeof process.kill;
const killCalls: Array<NodeJS.Signals | undefined> = [];
let otherHandlerCalled = false;
process.kill = ((pid: number, signal?: NodeJS.Signals) => {
killCalls.push(signal);
return true;
}) as typeof process.kill;
const otherHandler = () => {
otherHandlerCalled = true;
};
process.on("SIGINT", otherHandler);
try {
const sessionFile = path.join(root, "sessions.json");
const lockPath = `${sessionFile}.lock`;
await acquireSessionWriteLock({ sessionFile, timeoutMs: 500 });
process.emit("SIGINT");
await expect(fs.access(lockPath)).rejects.toThrow();
expect(otherHandlerCalled).toBe(true);
expect(killCalls).toEqual([]);
} finally {
process.off("SIGINT", otherHandler);
process.kill = originalKill;
await fs.rm(root, { recursive: true, force: true });
}
});
it("cleans up locks via releaseAllLocksSync", async () => {
const root = await fs.mkdtemp(path.join(os.tmpdir(), "multica-lock-"));
try {
const sessionFile = path.join(root, "sessions.json");
const lockPath = `${sessionFile}.lock`;
await acquireSessionWriteLock({ sessionFile, timeoutMs: 500 });
__testing.releaseAllLocksSync();
await expect(fs.access(lockPath)).rejects.toThrow();
} finally {
await fs.rm(root, { recursive: true, force: true });
}
});
it("keeps other signal listeners registered", () => {
const keepAlive = () => {};
process.on("SIGINT", keepAlive);
__testing.handleTerminationSignal("SIGINT");
expect(process.listeners("SIGINT")).toContain(keepAlive);
process.off("SIGINT", keepAlive);
});
});

View file

@ -0,0 +1,226 @@
import fsSync from "node:fs";
import fs from "node:fs/promises";
import path from "node:path";
type LockFilePayload = {
pid: number;
createdAt: string;
};
type HeldLock = {
count: number;
handle: fs.FileHandle;
lockPath: string;
};
const HELD_LOCKS = new Map<string, HeldLock>();
const CLEANUP_SIGNALS = ["SIGINT", "SIGTERM", "SIGQUIT", "SIGABRT"] as const;
type CleanupSignal = (typeof CLEANUP_SIGNALS)[number];
const cleanupHandlers = new Map<CleanupSignal, () => void>();
function isAlive(pid: number): boolean {
if (!Number.isFinite(pid) || pid <= 0) {
return false;
}
try {
process.kill(pid, 0);
return true;
} catch {
return false;
}
}
/**
* Synchronously release all held locks.
* Used during process exit when async operations aren't reliable.
*/
function releaseAllLocksSync(): void {
for (const [sessionFile, held] of HELD_LOCKS) {
try {
if (typeof held.handle.close === "function") {
void held.handle.close().catch(() => {});
}
} catch {
// Ignore errors during cleanup - best effort
}
try {
fsSync.rmSync(held.lockPath, { force: true });
} catch {
// Ignore errors during cleanup - best effort
}
HELD_LOCKS.delete(sessionFile);
}
}
let cleanupRegistered = false;
function handleTerminationSignal(signal: CleanupSignal): void {
releaseAllLocksSync();
const shouldReraise = process.listenerCount(signal) === 1;
if (shouldReraise) {
const handler = cleanupHandlers.get(signal);
if (handler) {
process.off(signal, handler);
}
try {
process.kill(process.pid, signal);
} catch {
// Ignore errors during shutdown
}
}
}
function registerCleanupHandlers(): void {
if (cleanupRegistered) {
return;
}
cleanupRegistered = true;
// Cleanup on normal exit and process.exit() calls
process.on("exit", () => {
releaseAllLocksSync();
});
// Handle termination signals
for (const signal of CLEANUP_SIGNALS) {
try {
const handler = () => handleTerminationSignal(signal);
cleanupHandlers.set(signal, handler);
process.on(signal, handler);
} catch {
// Ignore unsupported signals on this platform.
}
}
}
async function readLockPayload(lockPath: string): Promise<LockFilePayload | null> {
try {
const raw = await fs.readFile(lockPath, "utf8");
const parsed = JSON.parse(raw) as Partial<LockFilePayload>;
if (typeof parsed.pid !== "number") {
return null;
}
if (typeof parsed.createdAt !== "string") {
return null;
}
return { pid: parsed.pid, createdAt: parsed.createdAt };
} catch {
return null;
}
}
async function getLockAgeMs(lockPath: string): Promise<number | null> {
try {
const stat = await fs.stat(lockPath);
return Date.now() - stat.mtimeMs;
} catch {
return null;
}
}
export async function acquireSessionWriteLock(params: {
sessionFile: string;
timeoutMs?: number;
staleMs?: number;
}): Promise<{
release: () => Promise<void>;
}> {
registerCleanupHandlers();
const timeoutMs = params.timeoutMs ?? 10_000;
const staleMs = params.staleMs ?? 30 * 60 * 1000;
const sessionFile = path.resolve(params.sessionFile);
const sessionDir = path.dirname(sessionFile);
await fs.mkdir(sessionDir, { recursive: true });
let normalizedDir = sessionDir;
try {
normalizedDir = await fs.realpath(sessionDir);
} catch {
// Fall back to the resolved path if realpath fails (permissions, transient FS).
}
const normalizedSessionFile = path.join(normalizedDir, path.basename(sessionFile));
const lockPath = `${normalizedSessionFile}.lock`;
const held = HELD_LOCKS.get(normalizedSessionFile);
if (held) {
held.count += 1;
return {
release: async () => {
const current = HELD_LOCKS.get(normalizedSessionFile);
if (!current) {
return;
}
current.count -= 1;
if (current.count > 0) {
return;
}
HELD_LOCKS.delete(normalizedSessionFile);
await current.handle.close();
await fs.rm(current.lockPath, { force: true });
},
};
}
const startedAt = Date.now();
let attempt = 0;
while (Date.now() - startedAt < timeoutMs) {
attempt += 1;
try {
const handle = await fs.open(lockPath, "wx");
await handle.writeFile(
JSON.stringify({ pid: process.pid, createdAt: new Date().toISOString() }, null, 2),
"utf8",
);
HELD_LOCKS.set(normalizedSessionFile, { count: 1, handle, lockPath });
return {
release: async () => {
const current = HELD_LOCKS.get(normalizedSessionFile);
if (!current) {
return;
}
current.count -= 1;
if (current.count > 0) {
return;
}
HELD_LOCKS.delete(normalizedSessionFile);
await current.handle.close();
await fs.rm(current.lockPath, { force: true });
},
};
} catch (err) {
const code = (err as { code?: unknown }).code;
if (code !== "EEXIST") {
throw err;
}
const payload = await readLockPayload(lockPath);
if (payload) {
const createdAt = payload.createdAt ? Date.parse(payload.createdAt) : NaN;
const stale = !Number.isFinite(createdAt) || Date.now() - createdAt > staleMs;
const alive = payload.pid ? isAlive(payload.pid) : false;
if (stale || !alive) {
await fs.rm(lockPath, { force: true });
continue;
}
} else {
const ageMs = await getLockAgeMs(lockPath);
const stale = ageMs !== null && ageMs > staleMs;
if (stale) {
await fs.rm(lockPath, { force: true });
continue;
}
}
const delay = Math.min(1000, 50 * attempt);
await new Promise((r) => setTimeout(r, delay));
}
}
const payload = await readLockPayload(lockPath);
const owner = payload?.pid ? `pid=${payload.pid}` : "unknown";
throw new Error(`session file locked (timeout ${timeoutMs}ms): ${owner} ${lockPath}`);
}
export const __testing = {
cleanupSignals: [...CLEANUP_SIGNALS],
handleTerminationSignal,
releaseAllLocksSync,
};

View file

@ -3,6 +3,7 @@ import { existsSync, mkdirSync, readFileSync } from "fs";
import { appendFile, writeFile } from "fs/promises";
import type { SessionEntry } from "./types.js";
import { DATA_DIR } from "../../shared/index.js";
import { acquireSessionWriteLock } from "./session-write-lock.js";
export type SessionStorageOptions = {
baseDir?: string | undefined;
@ -50,7 +51,12 @@ export async function appendEntry(
) {
ensureSessionDir(sessionId, options);
const filePath = resolveSessionPath(sessionId, options);
await appendFile(filePath, `${JSON.stringify(entry)}\n`, "utf8");
const lock = await acquireSessionWriteLock({ sessionFile: filePath });
try {
await appendFile(filePath, `${JSON.stringify(entry)}\n`, "utf8");
} finally {
await lock.release();
}
}
export async function writeEntries(
@ -60,6 +66,11 @@ export async function writeEntries(
) {
ensureSessionDir(sessionId, options);
const filePath = resolveSessionPath(sessionId, options);
const content = entries.map((entry) => JSON.stringify(entry)).join("\n");
await writeFile(filePath, content ? `${content}\n` : "", "utf8");
const lock = await acquireSessionWriteLock({ sessionFile: filePath });
try {
const content = entries.map((entry) => JSON.stringify(entry)).join("\n");
await writeFile(filePath, content ? `${content}\n` : "", "utf8");
} finally {
await lock.release();
}
}

View file

@ -0,0 +1,127 @@
import { describe, it, expect } from "vitest";
import { buildSubagentSystemPrompt, formatAnnouncementMessage } from "./announce.js";
import type { FormatAnnouncementParams } from "./announce.js";
describe("buildSubagentSystemPrompt", () => {
it("includes task and session context", () => {
const prompt = buildSubagentSystemPrompt({
requesterSessionId: "parent-123",
childSessionId: "child-456",
task: "Analyze the auth module for security issues",
});
expect(prompt).toContain("You are a subagent spawned to complete a specific task");
expect(prompt).toContain("Analyze the auth module for security issues");
expect(prompt).toContain("parent-123");
expect(prompt).toContain("child-456");
expect(prompt).toContain("Do NOT spawn nested subagents");
});
it("includes label when provided", () => {
const prompt = buildSubagentSystemPrompt({
requesterSessionId: "parent-123",
childSessionId: "child-456",
label: "Security Audit",
task: "Check for vulnerabilities",
});
expect(prompt).toContain('Label: "Security Audit"');
});
it("omits label line when not provided", () => {
const prompt = buildSubagentSystemPrompt({
requesterSessionId: "parent-123",
childSessionId: "child-456",
task: "Do something",
});
expect(prompt).not.toContain("Label:");
});
});
describe("formatAnnouncementMessage", () => {
const baseParams: FormatAnnouncementParams = {
runId: "run-1",
childSessionId: "child-456",
requesterSessionId: "parent-123",
task: "Analyze code",
label: "Code Analysis",
cleanup: "delete",
outcome: { status: "ok" },
startedAt: 1000000,
endedAt: 1030000,
};
it("formats successful completion", () => {
const msg = formatAnnouncementMessage({
...baseParams,
findings: "Found 3 issues in the auth module.",
});
expect(msg).toContain('"Code Analysis" just completed successfully');
expect(msg).toContain("Found 3 issues in the auth module.");
expect(msg).toContain("runtime 30s");
expect(msg).toContain("session child-456");
});
it("formats error outcome", () => {
const msg = formatAnnouncementMessage({
...baseParams,
outcome: { status: "error", error: "API key expired" },
});
expect(msg).toContain("failed: API key expired");
});
it("formats timeout outcome", () => {
const msg = formatAnnouncementMessage({
...baseParams,
outcome: { status: "timeout" },
});
expect(msg).toContain("timed out");
});
it("shows (no output) when findings is not provided", () => {
const msg = formatAnnouncementMessage(baseParams);
expect(msg).toContain("(no output)");
});
it("uses task text when label is not provided", () => {
const paramsNoLabel: FormatAnnouncementParams = {
...baseParams,
label: undefined,
};
const msg = formatAnnouncementMessage(paramsNoLabel);
expect(msg).toContain('"Analyze code"');
});
it("formats runtime for minutes", () => {
const msg = formatAnnouncementMessage({
...baseParams,
startedAt: 1000000,
endedAt: 1150000, // 150 seconds = 2m30s
});
expect(msg).toContain("runtime 2m30s");
});
it("formats runtime for hours", () => {
const msg = formatAnnouncementMessage({
...baseParams,
startedAt: 1000000,
endedAt: 4600000, // 3600 seconds = 1h
});
expect(msg).toContain("runtime 1h");
});
it("includes summarization instruction", () => {
const msg = formatAnnouncementMessage(baseParams);
expect(msg).toContain("Summarize this naturally for the user");
expect(msg).toContain("NO_REPLY");
});
});

View file

@ -0,0 +1,226 @@
/**
* Subagent announcement flow.
*
* Handles result propagation from child parent agent:
* - Builds system prompts for child agents
* - Reads child session output
* - Formats and delivers announcement messages
*/
import { readEntries } from "../session/storage.js";
import { getHub } from "../../hub/hub-singleton.js";
import type {
SubagentAnnounceParams,
SubagentRunOutcome,
SubagentSystemPromptParams,
} from "./types.js";
/**
* Build the system prompt injected into a subagent session.
*/
export function buildSubagentSystemPrompt(params: SubagentSystemPromptParams): string {
const { requesterSessionId, childSessionId, label, task } = params;
const lines: string[] = [
"You are a subagent spawned to complete a specific task.",
"",
"## Rules",
"- Stay focused on the assigned task below.",
"- Complete the task thoroughly and report your findings.",
"- Do NOT initiate side actions unrelated to the task.",
"- Do NOT attempt to communicate with the user directly.",
"- Do NOT spawn nested subagents.",
"- Your session is ephemeral and will be cleaned up after completion.",
"",
"## Context",
`Requester session: ${requesterSessionId}`,
`Child session: ${childSessionId}`,
];
if (label) {
lines.push(`Label: "${label}"`);
}
lines.push("", "## Task", task);
return lines.join("\n");
}
/**
* Read the latest assistant reply from a session's JSONL file.
*/
export function readLatestAssistantReply(sessionId: string): string | undefined {
const entries = readEntries(sessionId);
// Walk backwards to find last assistant message
for (let i = entries.length - 1; i >= 0; i--) {
const entry = entries[i]!;
if (entry.type !== "message") continue;
const message = entry.message;
if (message.role !== "assistant") continue;
return extractAssistantText(message);
}
return undefined;
}
/**
* Extract text content from an assistant message.
* AgentMessage.content for assistant is (TextContent | ThinkingContent | ToolCall)[].
*/
function extractAssistantText(message: { role: string; content: unknown }): string {
const content = message.content;
if (typeof content === "string") {
return sanitizeText(content);
}
if (!Array.isArray(content)) return "";
const textParts: string[] = [];
for (const block of content) {
if (block && typeof block === "object" && "type" in block && block.type === "text" && "text" in block) {
textParts.push(String(block.text));
}
}
return sanitizeText(textParts.join("\n"));
}
/**
* Strip thinking tags and tool markers from text.
*/
function sanitizeText(text: string): string {
return text
.replace(/<thinking>[\s\S]*?<\/thinking>/g, "")
.replace(/<tool_call>[\s\S]*?<\/tool_call>/g, "")
.trim();
}
/**
* Format the duration between two timestamps as a human-readable string.
*/
function formatDuration(startMs: number, endMs: number): string {
const totalSeconds = Math.round((endMs - startMs) / 1000);
if (totalSeconds < 60) return `${totalSeconds}s`;
const minutes = Math.floor(totalSeconds / 60);
const seconds = totalSeconds % 60;
if (minutes < 60) return seconds > 0 ? `${minutes}m${seconds}s` : `${minutes}m`;
const hours = Math.floor(minutes / 60);
const remainingMinutes = minutes % 60;
return remainingMinutes > 0 ? `${hours}h${remainingMinutes}m` : `${hours}h`;
}
/**
* Format a status label from an outcome.
*/
function formatStatusLabel(outcome: SubagentRunOutcome | undefined): string {
if (!outcome) return "completed with unknown status";
switch (outcome.status) {
case "ok":
return "completed successfully";
case "error":
return outcome.error ? `failed: ${outcome.error}` : "failed";
case "timeout":
return "timed out";
default:
return "completed with unknown status";
}
}
/** Parameters for formatAnnouncementMessage */
export interface FormatAnnouncementParams {
runId: string;
childSessionId: string;
requesterSessionId: string;
task: string;
label?: string | undefined;
cleanup: "delete" | "keep";
outcome?: SubagentRunOutcome | undefined;
startedAt?: number | undefined;
endedAt?: number | undefined;
findings?: string | undefined;
}
/**
* Format the announcement message sent to the parent agent.
*/
export function formatAnnouncementMessage(params: FormatAnnouncementParams): string {
const { task, label, outcome, findings, startedAt, endedAt, childSessionId } = params;
const displayName = label || task.slice(0, 60);
const statusLabel = formatStatusLabel(outcome);
const parts: string[] = [
`A background task "${displayName}" just ${statusLabel}.`,
"",
"Findings:",
findings || "(no output)",
];
// Stats line
const stats: string[] = [];
if (startedAt && endedAt) {
stats.push(`runtime ${formatDuration(startedAt, endedAt)}`);
}
stats.push(`session ${childSessionId}`);
parts.push("", `Stats: ${stats.join(" • ")}`);
parts.push(
"",
"Summarize this naturally for the user. Keep it brief (1-2 sentences).",
"Flow it into the conversation naturally.",
"Do not mention technical details like session IDs or that this was a background task.",
"You can respond with NO_REPLY if no announcement is needed (e.g., internal task with no user-facing result).",
);
return parts.join("\n");
}
/**
* Run the full subagent announcement flow:
* 1. Read child's last assistant reply
* 2. Format announcement message
* 3. Send to parent agent via Hub
*/
export function runSubagentAnnounceFlow(params: SubagentAnnounceParams): boolean {
const { requesterSessionId, childSessionId } = params;
// Read child's final output
const findings = readLatestAssistantReply(childSessionId);
// Format the announcement
const message = formatAnnouncementMessage({
runId: params.runId,
childSessionId: params.childSessionId,
requesterSessionId: params.requesterSessionId,
task: params.task,
label: params.label,
cleanup: params.cleanup,
outcome: params.outcome,
startedAt: params.startedAt,
endedAt: params.endedAt,
findings,
});
// Deliver to parent agent via Hub
try {
const hub = getHub();
const parentAgent = hub.getAgent(requesterSessionId);
if (!parentAgent || parentAgent.closed) {
console.warn(
`[SubagentAnnounce] Parent agent not found or closed: ${requesterSessionId}`,
);
return false;
}
parentAgent.write(message);
return true;
} catch (err) {
console.error(`[SubagentAnnounce] Failed to announce to parent:`, err);
return false;
}
}

View file

@ -0,0 +1,38 @@
/**
* Subagent orchestration system.
*
* Provides child agent spawning, lifecycle management,
* persistent registry, and result announcement flow.
*/
export type {
SubagentRunOutcome,
SubagentRunRecord,
RegisterSubagentRunParams,
SubagentAnnounceParams,
SubagentSystemPromptParams,
} from "./types.js";
export {
initSubagentRegistry,
registerSubagentRun,
listSubagentRuns,
releaseSubagentRun,
getSubagentRun,
resetSubagentRegistryForTests,
shutdownSubagentRegistry,
} from "./registry.js";
export {
buildSubagentSystemPrompt,
readLatestAssistantReply,
formatAnnouncementMessage,
runSubagentAnnounceFlow,
} from "./announce.js";
export type { FormatAnnouncementParams } from "./announce.js";
export {
loadSubagentRuns,
saveSubagentRuns,
getSubagentStorePath,
} from "./registry-store.js";

View file

@ -0,0 +1,81 @@
import { describe, it, expect, beforeEach, afterEach } from "vitest";
import { mkdtempSync, rmSync, existsSync } from "node:fs";
import { join } from "node:path";
import { tmpdir } from "node:os";
import type { SubagentRunRecord } from "./types.js";
// We need to test the store functions with a custom directory.
// Since the store uses DATA_DIR from shared, we test the serialization logic directly.
describe("registry-store serialization", () => {
let tempDir: string;
beforeEach(() => {
tempDir = mkdtempSync(join(tmpdir(), "subagent-store-test-"));
});
afterEach(() => {
rmSync(tempDir, { recursive: true, force: true });
});
it("round-trips SubagentRunRecord through JSON", () => {
const record: SubagentRunRecord = {
runId: "run-123",
childSessionId: "child-456",
requesterSessionId: "parent-789",
task: "Analyze code quality",
label: "Code Review",
cleanup: "delete",
createdAt: Date.now(),
startedAt: Date.now(),
endedAt: Date.now() + 30000,
outcome: { status: "ok" },
archiveAtMs: Date.now() + 3600000,
cleanupHandled: true,
cleanupCompletedAt: Date.now() + 30100,
};
// Serialize and deserialize
const json = JSON.stringify({ version: 1, runs: { "run-123": record } });
const parsed = JSON.parse(json);
expect(parsed.version).toBe(1);
expect(parsed.runs["run-123"]).toEqual(record);
});
it("handles record with minimal fields", () => {
const record: SubagentRunRecord = {
runId: "run-minimal",
childSessionId: "child-1",
requesterSessionId: "parent-1",
task: "Do something",
cleanup: "keep",
createdAt: Date.now(),
};
const json = JSON.stringify({ version: 1, runs: { "run-minimal": record } });
const parsed = JSON.parse(json);
expect(parsed.runs["run-minimal"].runId).toBe("run-minimal");
expect(parsed.runs["run-minimal"].outcome).toBeUndefined();
expect(parsed.runs["run-minimal"].label).toBeUndefined();
});
it("handles error outcome serialization", () => {
const record: SubagentRunRecord = {
runId: "run-err",
childSessionId: "child-err",
requesterSessionId: "parent-1",
task: "Fail",
cleanup: "delete",
createdAt: Date.now(),
outcome: { status: "error", error: "Something went wrong" },
};
const json = JSON.stringify(record);
const parsed = JSON.parse(json) as SubagentRunRecord;
expect(parsed.outcome?.status).toBe("error");
expect(parsed.outcome?.error).toBe("Something went wrong");
});
});

View file

@ -0,0 +1,61 @@
/**
* Persistent storage for subagent run records.
*
* File: ~/.super-multica/subagents/runs.json
*/
import { existsSync, mkdirSync, readFileSync, writeFileSync } from "node:fs";
import { join } from "node:path";
import { DATA_DIR } from "../../shared/index.js";
import type { SubagentRunRecord } from "./types.js";
const SUBAGENTS_DIR = join(DATA_DIR, "subagents");
const RUNS_FILE = join(SUBAGENTS_DIR, "runs.json");
interface SubagentRunsStore {
version: 1;
runs: Record<string, SubagentRunRecord>;
}
function ensureDir(): void {
if (!existsSync(SUBAGENTS_DIR)) {
mkdirSync(SUBAGENTS_DIR, { recursive: true });
}
}
/** Get the path to the subagent store file (for testing) */
export function getSubagentStorePath(): string {
return RUNS_FILE;
}
/** Load all persisted subagent runs */
export function loadSubagentRuns(): Map<string, SubagentRunRecord> {
if (!existsSync(RUNS_FILE)) return new Map();
try {
const content = readFileSync(RUNS_FILE, "utf-8");
const store = JSON.parse(content) as SubagentRunsStore;
if (store.version !== 1) {
console.warn(`[SubagentStore] Unknown store version: ${store.version}, ignoring`);
return new Map();
}
return new Map(Object.entries(store.runs));
} catch (err) {
console.warn(`[SubagentStore] Failed to load runs:`, err);
return new Map();
}
}
/** Save all subagent runs to disk */
export function saveSubagentRuns(runs: Map<string, SubagentRunRecord>): void {
ensureDir();
const store: SubagentRunsStore = {
version: 1,
runs: Object.fromEntries(runs),
};
writeFileSync(RUNS_FILE, JSON.stringify(store, null, 2), "utf-8");
}

View file

@ -0,0 +1,161 @@
import { describe, it, expect, beforeEach } from "vitest";
import {
registerSubagentRun,
listSubagentRuns,
getSubagentRun,
releaseSubagentRun,
resetSubagentRegistryForTests,
shutdownSubagentRegistry,
} from "./registry.js";
// Note: These tests exercise the registry's in-memory state management.
// They do NOT test the full lifecycle (which requires a live Hub + AsyncAgent).
beforeEach(() => {
resetSubagentRegistryForTests();
});
describe("subagent registry", () => {
it("registers a run and retrieves it by ID", () => {
const record = registerSubagentRun({
runId: "run-1",
childSessionId: "child-1",
requesterSessionId: "parent-1",
task: "Analyze code",
label: "Code Analysis",
});
expect(record.runId).toBe("run-1");
expect(record.childSessionId).toBe("child-1");
expect(record.requesterSessionId).toBe("parent-1");
expect(record.task).toBe("Analyze code");
expect(record.label).toBe("Code Analysis");
expect(record.cleanup).toBe("delete"); // default
expect(record.createdAt).toBeGreaterThan(0);
expect(record.startedAt).toBeGreaterThan(0); // set by watchChildAgent
const retrieved = getSubagentRun("run-1");
expect(retrieved).toBe(record);
});
it("lists runs filtered by requester session", () => {
registerSubagentRun({
runId: "run-1",
childSessionId: "child-1",
requesterSessionId: "parent-A",
task: "Task 1",
});
registerSubagentRun({
runId: "run-2",
childSessionId: "child-2",
requesterSessionId: "parent-B",
task: "Task 2",
});
registerSubagentRun({
runId: "run-3",
childSessionId: "child-3",
requesterSessionId: "parent-A",
task: "Task 3",
});
const parentARuns = listSubagentRuns("parent-A");
expect(parentARuns).toHaveLength(2);
expect(parentARuns.map((r) => r.runId).sort()).toEqual(["run-1", "run-3"]);
const parentBRuns = listSubagentRuns("parent-B");
expect(parentBRuns).toHaveLength(1);
expect(parentBRuns[0]!.runId).toBe("run-2");
const emptyRuns = listSubagentRuns("parent-C");
expect(emptyRuns).toHaveLength(0);
});
it("releases a run from the registry", () => {
registerSubagentRun({
runId: "run-1",
childSessionId: "child-1",
requesterSessionId: "parent-1",
task: "Task",
});
expect(getSubagentRun("run-1")).toBeDefined();
const released = releaseSubagentRun("run-1");
expect(released).toBe(true);
expect(getSubagentRun("run-1")).toBeUndefined();
// Double release returns false
const releasedAgain = releaseSubagentRun("run-1");
expect(releasedAgain).toBe(false);
});
it("applies custom cleanup value", () => {
const record = registerSubagentRun({
runId: "run-keep",
childSessionId: "child-1",
requesterSessionId: "parent-1",
task: "Keep session",
cleanup: "keep",
});
expect(record.cleanup).toBe("keep");
});
it("registers a run and ends it with error when Hub is not available", () => {
// Without Hub initialized, watchChildAgent detects missing Hub
// and immediately ends the run with an error
registerSubagentRun({
runId: "run-no-hub",
childSessionId: "child-1",
requesterSessionId: "parent-1",
task: "Running task",
});
const record = getSubagentRun("run-no-hub");
expect(record?.startedAt).toBeGreaterThan(0);
expect(record?.endedAt).toBeGreaterThan(0);
expect(record?.outcome?.status).toBe("error");
expect(record?.outcome?.error).toContain("Hub not initialized");
});
it("shutdownSubagentRegistry marks unfinished runs as ended", () => {
// Directly set up a record without going through watchChildAgent
// to simulate a run that is still active
registerSubagentRun({
runId: "run-active",
childSessionId: "child-1",
requesterSessionId: "parent-1",
task: "Running task",
});
// The above run already ended due to no Hub; reset its endedAt
// to simulate a truly active run
const record = getSubagentRun("run-active");
if (record) {
record.endedAt = undefined;
record.outcome = undefined;
}
shutdownSubagentRegistry();
const after = getSubagentRun("run-active");
expect(after?.endedAt).toBeGreaterThan(0);
expect(after?.outcome?.status).toBe("unknown");
});
it("resetSubagentRegistryForTests clears all state", () => {
registerSubagentRun({
runId: "run-1",
childSessionId: "child-1",
requesterSessionId: "parent-1",
task: "Task",
});
expect(listSubagentRuns("parent-1")).toHaveLength(1);
resetSubagentRegistryForTests();
expect(listSubagentRuns("parent-1")).toHaveLength(0);
expect(getSubagentRun("run-1")).toBeUndefined();
});
});

View file

@ -0,0 +1,333 @@
/**
* Subagent registry in-memory tracking + lifecycle management.
*
* Tracks all active subagent runs, persists state to disk,
* watches for child completion, and triggers announce flow.
*/
import { getHub, isHubInitialized } from "../../hub/hub-singleton.js";
import { loadSubagentRuns, saveSubagentRuns } from "./registry-store.js";
import { runSubagentAnnounceFlow } from "./announce.js";
import type {
RegisterSubagentRunParams,
SubagentRunRecord,
} from "./types.js";
import { resolveSessionDir } from "../session/storage.js";
import { rmSync } from "node:fs";
/** Default archive retention: 60 minutes after completion */
const DEFAULT_ARCHIVE_AFTER_MS = 60 * 60 * 1000;
/** Archive sweep interval: 60 seconds */
const SWEEP_INTERVAL_MS = 60 * 1000;
// ============================================================================
// Module-level state
// ============================================================================
const subagentRuns = new Map<string, SubagentRunRecord>();
let sweepTimer: ReturnType<typeof setInterval> | undefined;
const resumedRuns = new Set<string>();
// ============================================================================
// Public API
// ============================================================================
/** Initialize registry from persisted state. Call once at startup. */
export function initSubagentRegistry(): void {
const persisted = loadSubagentRuns();
for (const [runId, record] of persisted) {
subagentRuns.set(runId, record);
// Resume incomplete runs
if (!record.cleanupHandled) {
if (record.endedAt) {
// Completed but cleanup not done — run announce flow
if (!resumedRuns.has(runId)) {
resumedRuns.add(runId);
handleRunCompletion(record);
}
} else {
// If not ended, the child agent session is lost on restart —
// mark as ended with unknown outcome
record.endedAt = Date.now();
record.outcome = { status: "unknown" };
persist();
if (!resumedRuns.has(runId)) {
resumedRuns.add(runId);
handleRunCompletion(record);
}
}
}
}
if (subagentRuns.size > 0) {
startSweeper();
console.log(`[SubagentRegistry] Loaded ${subagentRuns.size} persisted run(s)`);
}
}
/** Register a new subagent run and start tracking its lifecycle. */
export function registerSubagentRun(params: RegisterSubagentRunParams): SubagentRunRecord {
const {
runId,
childSessionId,
requesterSessionId,
task,
label,
cleanup = "delete",
timeoutSeconds,
} = params;
const record: SubagentRunRecord = {
runId,
childSessionId,
requesterSessionId,
task,
label,
cleanup,
createdAt: Date.now(),
};
subagentRuns.set(runId, record);
persist();
startSweeper();
// Start watching the child agent for completion
watchChildAgent(record, timeoutSeconds);
return record;
}
/** List all active runs for a given requester session. */
export function listSubagentRuns(requesterSessionId: string): SubagentRunRecord[] {
const result: SubagentRunRecord[] = [];
for (const record of subagentRuns.values()) {
if (record.requesterSessionId === requesterSessionId) {
result.push(record);
}
}
return result;
}
/** Remove a run from the registry. */
export function releaseSubagentRun(runId: string): boolean {
const deleted = subagentRuns.delete(runId);
if (deleted) {
persist();
if (subagentRuns.size === 0) {
stopSweeper();
}
}
return deleted;
}
/** Get a run by ID. */
export function getSubagentRun(runId: string): SubagentRunRecord | undefined {
return subagentRuns.get(runId);
}
/** Mark all active (non-ended) runs as ended with "unknown" status. Called during Hub shutdown. */
export function shutdownSubagentRegistry(): void {
const now = Date.now();
let updated = 0;
for (const record of subagentRuns.values()) {
if (!record.endedAt) {
record.endedAt = now;
record.outcome = { status: "unknown" };
updated++;
}
}
if (updated > 0) {
persist();
console.log(`[SubagentRegistry] Marked ${updated} active run(s) as ended during shutdown`);
}
stopSweeper();
}
/** Reset all state (for testing). */
export function resetSubagentRegistryForTests(): void {
subagentRuns.clear();
resumedRuns.clear();
stopSweeper();
}
// ============================================================================
// Lifecycle watching
// ============================================================================
function watchChildAgent(record: SubagentRunRecord, timeoutSeconds?: number): void {
const { childSessionId } = record;
// Mark as started
record.startedAt = Date.now();
persist();
const cleanup = (outcome: { status: "ok" | "error" | "timeout" | "unknown"; error?: string | undefined }) => {
if (record.endedAt) return; // Already finalized
if (timeoutTimer) clearTimeout(timeoutTimer);
record.endedAt = Date.now();
record.outcome = outcome;
persist();
handleRunCompletion(record);
};
// Set up timeout if specified
let timeoutTimer: ReturnType<typeof setTimeout> | undefined;
if (timeoutSeconds && timeoutSeconds > 0) {
timeoutTimer = setTimeout(() => {
cleanup({ status: "timeout" });
// Try to close the child agent
try {
const hub = getHub();
hub.closeAgent(childSessionId);
} catch {
// Hub may not be available
}
}, timeoutSeconds * 1000);
}
// Get child agent reference (Hub may not be available in tests)
if (!isHubInitialized()) {
cleanup({ status: "error", error: "Hub not initialized" });
return;
}
const hub = getHub();
const childAgent = hub.getAgent(childSessionId);
if (!childAgent) {
cleanup({ status: "error", error: "Child agent not found" });
return;
}
// Wait for the child agent's task queue to drain (task completion),
// then trigger announce flow. Uses waitForIdle() instead of consuming
// the stream (which would conflict with Hub.consumeAgent).
childAgent.waitForIdle().then(
() => cleanup({ status: "ok" }),
(err) => cleanup({
status: "error",
error: err instanceof Error ? err.message : String(err),
}),
);
// Also handle explicit close (e.g., timeout kill, Hub shutdown)
childAgent.onClose(() => {
cleanup({ status: record.outcome?.status ?? "unknown" });
});
}
// ============================================================================
// Cleanup + Announce
// ============================================================================
function handleRunCompletion(record: SubagentRunRecord): void {
if (record.cleanupHandled) return;
record.cleanupHandled = true;
persist();
// Run announce flow
const announced = runSubagentAnnounceFlow({
runId: record.runId,
childSessionId: record.childSessionId,
requesterSessionId: record.requesterSessionId,
task: record.task,
label: record.label,
cleanup: record.cleanup,
outcome: record.outcome,
startedAt: record.startedAt,
endedAt: record.endedAt,
});
if (!announced) {
console.warn(`[SubagentRegistry] Announce flow failed for run ${record.runId}`);
// Allow retry on next restart if announce failed.
record.cleanupHandled = false;
persist();
return;
}
// Handle session cleanup
if (record.cleanup === "delete") {
deleteChildSession(record.childSessionId);
}
// Schedule archive
record.archiveAtMs = Date.now() + DEFAULT_ARCHIVE_AFTER_MS;
record.cleanupCompletedAt = Date.now();
persist();
}
function deleteChildSession(sessionId: string): void {
try {
const sessionDir = resolveSessionDir(sessionId);
rmSync(sessionDir, { recursive: true, force: true });
console.log(`[SubagentRegistry] Deleted child session: ${sessionId}`);
} catch (err) {
console.warn(`[SubagentRegistry] Failed to delete child session ${sessionId}:`, err);
}
// Also close the agent in Hub
try {
const hub = getHub();
hub.closeAgent(sessionId);
} catch {
// Hub may not be available
}
}
// ============================================================================
// Archive sweeper
// ============================================================================
function startSweeper(): void {
if (sweepTimer) return;
sweepTimer = setInterval(sweep, SWEEP_INTERVAL_MS);
// Don't prevent process exit
if (sweepTimer.unref) sweepTimer.unref();
}
function stopSweeper(): void {
if (sweepTimer) {
clearInterval(sweepTimer);
sweepTimer = undefined;
}
}
function sweep(): void {
const now = Date.now();
let removed = 0;
for (const [runId, record] of subagentRuns) {
if (record.archiveAtMs !== undefined && record.archiveAtMs <= now) {
subagentRuns.delete(runId);
resumedRuns.delete(runId);
removed++;
}
}
if (removed > 0) {
persist();
console.log(`[SubagentRegistry] Archived ${removed} completed run(s)`);
}
if (subagentRuns.size === 0) {
stopSweeper();
}
}
// ============================================================================
// Persistence helper
// ============================================================================
function persist(): void {
try {
saveSubagentRuns(subagentRuns);
} catch (err) {
console.error(`[SubagentRegistry] Failed to persist runs:`, err);
}
}

View file

@ -0,0 +1,74 @@
/**
* Subagent orchestration types.
*
* Models the lifecycle of spawned child agents:
* created started ended cleanup
*/
/** Final outcome of a subagent run */
export type SubagentRunOutcome = {
status: "ok" | "error" | "timeout" | "unknown";
error?: string | undefined;
};
/** Persistent record tracking a single subagent run */
export type SubagentRunRecord = {
/** Unique run identifier (UUIDv7) */
runId: string;
/** Session ID of the child agent */
childSessionId: string;
/** Session ID of the parent (requester) agent */
requesterSessionId: string;
/** The task description / prompt given to the child */
task: string;
/** Optional human-readable label */
label?: string | undefined;
/** Session cleanup strategy after completion */
cleanup: "delete" | "keep";
/** Timestamp when the run was created */
createdAt: number;
/** Timestamp when the child agent started execution */
startedAt?: number | undefined;
/** Timestamp when the child agent finished */
endedAt?: number | undefined;
/** Final status of the run */
outcome?: SubagentRunOutcome | undefined;
/** Scheduled auto-archive time (ms since epoch) */
archiveAtMs?: number | undefined;
/** Whether the cleanup/announce flow has been initiated */
cleanupHandled?: boolean | undefined;
/** Timestamp when cleanup completed */
cleanupCompletedAt?: number | undefined;
};
/** Parameters for registering a new subagent run */
export type RegisterSubagentRunParams = {
runId: string;
childSessionId: string;
requesterSessionId: string;
task: string;
label?: string | undefined;
cleanup?: "delete" | "keep" | undefined;
timeoutSeconds?: number | undefined;
};
/** Parameters for the announce flow */
export type SubagentAnnounceParams = {
runId: string;
childSessionId: string;
requesterSessionId: string;
task: string;
label?: string | undefined;
cleanup: "delete" | "keep";
outcome?: SubagentRunOutcome | undefined;
startedAt?: number | undefined;
endedAt?: number | undefined;
};
/** Parameters for building the subagent system prompt */
export type SubagentSystemPromptParams = {
requesterSessionId: string;
childSessionId: string;
label?: string | undefined;
task: string;
};

View file

@ -6,6 +6,7 @@ import { createProcessTool } from "./tools/process.js";
import { createGlobTool } from "./tools/glob.js";
import { createWebFetchTool, createWebSearchTool } from "./tools/web/index.js";
import { createMemoryTools } from "./tools/memory/index.js";
import { createSessionsSpawnTool } from "./tools/sessions-spawn.js";
import { filterTools } from "./tools/policy.js";
import { isMulticaError, isRetryableError } from "../shared/errors.js";
@ -19,6 +20,10 @@ export interface CreateToolsOptions {
profileId?: string | undefined;
/** Base directory for profiles (optional) */
profileBaseDir?: string | undefined;
/** Whether this agent is a subagent (passed to sessions_spawn tool) */
isSubagent?: boolean | undefined;
/** Session ID of the agent (passed to sessions_spawn tool) */
sessionId?: string | undefined;
}
type ToolErrorPayload = {
@ -88,7 +93,7 @@ function wrapTool<TParams, TResult>(
export function createAllTools(options: CreateToolsOptions | string): AgentTool<any>[] {
// Support legacy string argument for backwards compatibility
const opts: CreateToolsOptions = typeof options === "string" ? { cwd: options } : options;
const { cwd, profileId, profileBaseDir } = opts;
const { cwd, profileId, profileBaseDir, isSubagent, sessionId } = opts;
const baseTools = createCodingTools(cwd).filter(
(tool) => tool.name !== "bash",
@ -118,6 +123,13 @@ export function createAllTools(options: CreateToolsOptions | string): AgentTool<
tools.push(...memoryTools);
}
// Add sessions_spawn tool (will be filtered by policy for subagents)
const sessionsSpawnTool = createSessionsSpawnTool({
isSubagent: isSubagent ?? false,
sessionId,
});
tools.push(sessionsSpawnTool as AgentTool<any>);
return tools;
}
@ -138,6 +150,8 @@ export function resolveTools(options: AgentOptions): AgentTool<any>[] {
cwd,
profileId: options.profileId,
profileBaseDir: options.profileBaseDir,
isSubagent: options.isSubagent,
sessionId: options.sessionId,
});
// Apply policy filtering

View file

@ -35,6 +35,9 @@ export const TOOL_GROUPS: Record<string, string[]> = {
// Memory tools (requires profileId)
"group:memory": ["memory_get", "memory_set", "memory_delete", "memory_list"],
// Subagent tools
"group:subagent": ["sessions_spawn"],
// All core tools
"group:core": [
"read",
@ -76,16 +79,8 @@ export const TOOL_PROFILES: Record<ToolProfileId, { allow?: string[]; deny?: str
* Subagents should not have access to session management or system tools.
*/
export const DEFAULT_SUBAGENT_TOOL_DENY: string[] = [
// Future: session management tools
// "sessions_list",
// "sessions_history",
// "sessions_send",
// "sessions_spawn",
// "session_status",
// Future: system tools
// "gateway",
// "agents_list",
// Subagents cannot spawn subagents (no nested spawning)
"sessions_spawn",
];
/**

View file

@ -0,0 +1,40 @@
import { describe, it, expect } from "vitest";
import { createSessionsSpawnTool } from "./sessions-spawn.js";
describe("sessions_spawn tool", () => {
it("has correct name and description", () => {
const tool = createSessionsSpawnTool({ isSubagent: false, sessionId: "test-session" });
expect(tool.name).toBe("sessions_spawn");
expect(tool.label).toBe("Spawn Subagent");
expect(tool.description).toContain("Spawn a background subagent");
});
it("rejects spawn from subagent sessions", async () => {
const tool = createSessionsSpawnTool({ isSubagent: true, sessionId: "child-session" });
const result = await tool.execute(
"call-1",
{ task: "do something" } as any,
new AbortController().signal,
);
expect(result.details.status).toBe("error");
expect(result.details.error).toContain("not allowed from sub-agent sessions");
const firstContent = result.content[0] as { type: string; text: string };
expect(firstContent.text).toContain("not allowed");
});
it("fails gracefully when Hub is not initialized", async () => {
const tool = createSessionsSpawnTool({ isSubagent: false, sessionId: "parent-session" });
const result = await tool.execute(
"call-2",
{ task: "analyze code", label: "Code Analysis" } as any,
new AbortController().signal,
);
// Should get an error because Hub singleton is not set up in test
expect(result.details.status).toBe("error");
expect(result.details.error).toContain("Hub");
});
});

View file

@ -0,0 +1,143 @@
/**
* sessions_spawn tool allows a parent agent to spawn subagent runs.
*
* Subagents run in isolated sessions with restricted tools.
* Results are announced back to the parent when the child completes.
*/
import { v7 as uuidv7 } from "uuid";
import { Type } from "@sinclair/typebox";
import type { AgentTool } from "@mariozechner/pi-agent-core";
import { getHub } from "../../hub/hub-singleton.js";
import { buildSubagentSystemPrompt } from "../subagent/announce.js";
import { registerSubagentRun } from "../subagent/registry.js";
const SessionsSpawnSchema = Type.Object({
task: Type.String({ description: "The task for the subagent to perform.", minLength: 1 }),
label: Type.Optional(
Type.String({ description: "Human-readable label for this background task." }),
),
model: Type.Optional(
Type.String({ description: "Override the LLM model for the subagent (e.g. 'gpt-4o', 'claude-sonnet')." }),
),
cleanup: Type.Optional(
Type.Union([Type.Literal("delete"), Type.Literal("keep")], {
description: "Session cleanup after completion. 'delete' removes session files, 'keep' preserves for audit. Default: 'delete'.",
}),
),
timeoutSeconds: Type.Optional(
Type.Number({
description: "Execution timeout in seconds. The subagent will be terminated if it exceeds this.",
minimum: 1,
}),
),
});
type SessionsSpawnArgs = {
task: string;
label?: string;
model?: string;
cleanup?: "delete" | "keep";
timeoutSeconds?: number;
};
export type SessionsSpawnResult = {
status: "accepted" | "error";
childSessionId?: string;
runId?: string;
error?: string;
};
export interface CreateSessionsSpawnToolOptions {
/** Whether the current agent is itself a subagent */
isSubagent?: boolean;
/** Session ID of the current (requester) agent */
sessionId?: string;
}
export function createSessionsSpawnTool(
options: CreateSessionsSpawnToolOptions,
): AgentTool<typeof SessionsSpawnSchema, SessionsSpawnResult> {
return {
name: "sessions_spawn",
label: "Spawn Subagent",
description:
"Spawn a background subagent to handle a specific task. The subagent runs in an isolated session with its own tool set. " +
"When it completes, its findings are announced back to you automatically. " +
"Use this for parallelizable work, long-running analysis, or tasks that benefit from isolation.",
parameters: SessionsSpawnSchema,
execute: async (_toolCallId, args) => {
const { task, label, model, cleanup = "delete", timeoutSeconds } = args as SessionsSpawnArgs;
// Guard: subagents cannot spawn subagents
if (options.isSubagent) {
return {
content: [{ type: "text", text: "Error: sessions_spawn is not allowed from sub-agent sessions." }],
details: {
status: "error",
error: "sessions_spawn is not allowed from sub-agent sessions",
},
};
}
const requesterSessionId = options.sessionId ?? "unknown";
const runId = uuidv7();
const childSessionId = uuidv7();
// Build system prompt for the child
const systemPrompt = buildSubagentSystemPrompt({
requesterSessionId,
childSessionId,
label,
task,
});
// Spawn child agent via Hub
try {
const hub = getHub();
const childAgent = hub.createSubagent(childSessionId, {
systemPrompt,
model,
});
// Write the task to the child (non-blocking) before registering,
// so waitForIdle() observes the queued work.
childAgent.write(task);
// Register the run for lifecycle tracking
registerSubagentRun({
runId,
childSessionId,
requesterSessionId,
task,
label,
cleanup,
timeoutSeconds,
});
return {
content: [
{
type: "text",
text: `Subagent spawned successfully.\n\nRun ID: ${runId}\nSession: ${childSessionId}\nTask: ${label || task.slice(0, 80)}\n\nThe subagent is now working in the background. You will receive its findings when it completes.`,
},
],
details: {
status: "accepted",
childSessionId,
runId,
},
};
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
return {
content: [{ type: "text", text: `Error spawning subagent: ${message}` }],
details: {
status: "error",
error: message,
},
};
}
},
};
}

View file

@ -21,6 +21,8 @@ export type AgentOptions = {
model?: string | undefined;
/** Custom API key (overrides environment variable) */
apiKey?: string | undefined;
/** Pin a specific auth profile ID (e.g. "anthropic:backup"). Disables rotation. */
authProfileId?: string | undefined;
/** Custom base URL for the provider endpoint */
baseUrl?: string | undefined;
/** System prompt, if profileId is set will auto-construct from profile */

28
src/hub/hub-singleton.ts Normal file
View file

@ -0,0 +1,28 @@
/**
* Global Hub singleton for cross-module access.
*
* Used by subagent tools and announce flow to interact with the Hub
* without threading references through the entire call chain.
*/
import type { Hub } from "./hub.js";
let _hub: Hub | undefined;
/** Set the global Hub instance. Called once during Hub construction. */
export function setHub(hub: Hub): void {
_hub = hub;
}
/** Get the global Hub instance. Throws if not yet initialized. */
export function getHub(): Hub {
if (!_hub) {
throw new Error("[Hub] Hub singleton not initialized. Ensure Hub is constructed before accessing.");
}
return _hub;
}
/** Check if the Hub singleton has been initialized. */
export function isHubInitialized(): boolean {
return _hub !== undefined;
}

View file

@ -9,7 +9,10 @@ import {
type ResponseErrorPayload,
} from "@multica/sdk";
import { AsyncAgent } from "../agent/async-agent.js";
import type { AgentOptions } from "../agent/types.js";
import { getHubId } from "./hub-identity.js";
import { setHub } from "./hub-singleton.js";
import { initSubagentRegistry, shutdownSubagentRegistry } from "../agent/subagent/index.js";
import { loadAgentRecords, addAgentRecord, removeAgentRecord } from "./agent-store.js";
import { RpcDispatcher, RpcError } from "./rpc/dispatcher.js";
import { createGetAgentMessagesHandler } from "./rpc/handlers/get-agent-messages.js";
@ -22,6 +25,8 @@ import { createUpdateGatewayHandler } from "./rpc/handlers/update-gateway.js";
export class Hub {
private readonly agents = new Map<string, AsyncAgent>();
private readonly agentSenders = new Map<string, string>();
private readonly agentStreamIds = new Map<string, string>();
private readonly agentStreamCounters = new Map<string, number>();
private readonly rpc: RpcDispatcher;
private client: GatewayClient;
url: string;
@ -46,6 +51,12 @@ export class Hub {
this.rpc.register("deleteAgent", createDeleteAgentHandler(this));
this.rpc.register("updateGateway", createUpdateGatewayHandler(this));
// Register as global singleton for cross-module access (subagent tools, announce flow)
setHub(this);
// Restore subagent registry from persistent state
initSubagentRegistry();
this.client = this.createClient(this.url);
this.client.connect();
this.restoreAgents();
@ -144,31 +155,77 @@ export class Hub {
addAgentRecord({ id: agent.sessionId, createdAt: Date.now() });
}
// Forward streaming events to the requesting client
agent.onStream((payload) => {
const targetDeviceId = this.agentSenders.get(agent.sessionId);
if (targetDeviceId) {
this.client.send(targetDeviceId, StreamAction, payload);
}
});
// Internally consume messages produced by agent (fallback for non-stream scenarios)
// Internally consume agent output (AgentEvent stream + error Messages)
void this.consumeAgent(agent);
console.log(`Agent created: ${agent.sessionId}`);
return agent;
}
private getMessageIdFromEvent(event: unknown): string | undefined {
if (!event || typeof event !== "object") return undefined;
const maybeMsg = (event as { message?: unknown }).message;
if (!maybeMsg || typeof maybeMsg !== "object") return undefined;
const id = (maybeMsg as { id?: unknown }).id;
return typeof id === "string" && id.length > 0 ? id : undefined;
}
private beginStream(agentId: string, event: unknown): string {
const explicitId = this.getMessageIdFromEvent(event);
if (explicitId) {
this.agentStreamIds.set(agentId, explicitId);
return explicitId;
}
const next = (this.agentStreamCounters.get(agentId) ?? 0) + 1;
this.agentStreamCounters.set(agentId, next);
const fallback = `${agentId}:${next}`;
this.agentStreamIds.set(agentId, fallback);
return fallback;
}
private getActiveStreamId(agentId: string, event: unknown): string {
return this.agentStreamIds.get(agentId) ?? this.getMessageIdFromEvent(event) ?? agentId;
}
private endStream(agentId: string): void {
this.agentStreamIds.delete(agentId);
}
/** Internally read agent output and send via Gateway */
private async consumeAgent(agent: AsyncAgent): Promise<void> {
for await (const msg of agent.read()) {
console.log(`[${agent.sessionId}] ${msg.content}`);
for await (const item of agent.read()) {
const targetDeviceId = this.agentSenders.get(agent.sessionId);
if (targetDeviceId) {
if (!targetDeviceId) continue;
if ("content" in item) {
// Legacy Message (error fallback)
console.log(`[${agent.sessionId}] ${item.content}`);
this.client.send(targetDeviceId, "message", {
agentId: agent.sessionId,
content: msg.content,
content: item.content,
});
} else {
// Filter: only forward events useful for frontend rendering
const maybeMessage = (item as { message?: { role?: string } }).message;
const isAssistantMessage = maybeMessage?.role === "assistant";
const shouldForward =
((item.type === "message_start" || item.type === "message_update" || item.type === "message_end") && isAssistantMessage)
|| item.type === "tool_execution_start"
|| item.type === "tool_execution_end";
if (!shouldForward) continue;
if (item.type === "message_start") {
this.beginStream(agent.sessionId, item);
}
const streamId = this.getActiveStreamId(agent.sessionId, item);
this.client.send(targetDeviceId, StreamAction, {
streamId,
agentId: agent.sessionId,
event: item,
});
if (item.type === "message_end") {
this.endStream(agent.sessionId);
}
}
}
}
@ -195,6 +252,27 @@ export class Hub {
}
}
/** Create a subagent with specific options (isSubagent, systemPrompt, model) */
createSubagent(sessionId: string, options: Omit<AgentOptions, "sessionId"> = {}): AsyncAgent {
const existing = this.agents.get(sessionId);
if (existing && !existing.closed) {
return existing;
}
const agent = new AsyncAgent({
...options,
sessionId,
isSubagent: true,
});
this.agents.set(agent.sessionId, agent);
// Subagents are ephemeral — don't persist to agent store
void this.consumeAgent(agent);
console.log(`[Hub] Subagent created: ${agent.sessionId}`);
return agent;
}
getAgent(id: string): AsyncAgent | undefined {
return this.agents.get(id);
}
@ -211,14 +289,22 @@ export class Hub {
agent.close();
this.agents.delete(id);
this.agentSenders.delete(id);
this.agentStreamIds.delete(id);
this.agentStreamCounters.delete(id);
removeAgentRecord(id);
return true;
}
shutdown(): void {
// Finalize subagent registry before closing agents
shutdownSubagentRegistry();
for (const [id, agent] of this.agents) {
agent.close();
this.agents.delete(id);
this.agentSenders.delete(id);
this.agentStreamIds.delete(id);
this.agentStreamCounters.delete(id);
}
this.client.disconnect();
console.log("Hub shut down");