feat(shared): add error handling infrastructure (#27)

- Add JakartaError base class with error codes and retry semantics
- Add typed error classes for network, session, compaction, process, channel, and gateway errors
- Add withRetry utility with exponential backoff, jitter, and abort support
- Add CancellationToken with hierarchical parent-child cancellation support

Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Jiayuan 2026-01-30 13:50:49 +08:00 committed by GitHub
parent 6f8c9ef383
commit f74ac430a9
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 880 additions and 0 deletions

293
src/shared/cancellation.ts Normal file
View file

@ -0,0 +1,293 @@
/**
* Hierarchical cancellation token for coordinated operation cancellation.
* Supports parent-child relationships where cancelling a parent cancels all children.
*/
/**
* Error thrown when an operation is cancelled
*/
export class CancellationError extends Error {
constructor(message = "Operation was cancelled") {
super(message);
this.name = "CancellationError";
}
}
/**
* A cancellation token that can be used to cancel async operations.
* Supports hierarchical cancellation where cancelling a parent cancels all children.
*
* @example
* ```typescript
* // Create a root token
* const rootToken = new CancellationToken();
*
* // Create child tokens for sub-operations
* const childToken = rootToken.createChild();
*
* // Use the signal with fetch or other abortable APIs
* fetch(url, { signal: childToken.signal });
*
* // Cancel all operations
* rootToken.cancel();
* ```
*/
export class CancellationToken {
private readonly controller: AbortController;
private readonly children: CancellationToken[] = [];
private readonly _parent: CancellationToken | undefined;
private readonly onCancelCallbacks: Array<() => void> = [];
/**
* Create a new cancellation token
* @param parent Optional parent token - this token will be cancelled when parent is cancelled
*/
constructor(parent?: CancellationToken) {
this.controller = new AbortController();
this._parent = parent;
if (parent) {
parent.children.push(this);
// If parent is already cancelled, cancel immediately
if (parent.isCancelled) {
this.controller.abort();
} else {
// Cancel when parent is cancelled
parent.signal.addEventListener(
"abort",
() => {
this.cancel();
},
{ once: true },
);
}
}
}
/**
* Get the parent token if any
*/
get parent(): CancellationToken | undefined {
return this._parent;
}
/**
* Get the AbortSignal for use with fetch, timers, etc.
*/
get signal(): AbortSignal {
return this.controller.signal;
}
/**
* Check if this token has been cancelled
*/
get isCancelled(): boolean {
return this.controller.signal.aborted;
}
/**
* Cancel this token and all child tokens
*/
cancel(): void {
if (this.isCancelled) {
return;
}
// Notify callbacks first
for (const callback of this.onCancelCallbacks) {
try {
callback();
} catch {
// Ignore callback errors
}
}
// Abort this token
this.controller.abort();
// Cancel all children
for (const child of this.children) {
child.cancel();
}
}
/**
* Create a child token that will be cancelled when this token is cancelled.
* Child tokens can also be cancelled independently without affecting the parent.
*/
createChild(): CancellationToken {
return new CancellationToken(this);
}
/**
* Throw CancellationError if this token has been cancelled.
* Useful for checking cancellation at checkpoints in long-running operations.
*
* @example
* ```typescript
* for (const item of items) {
* token.throwIfCancelled();
* await processItem(item);
* }
* ```
*/
throwIfCancelled(): void {
if (this.isCancelled) {
throw new CancellationError();
}
}
/**
* Register a callback to be called when this token is cancelled.
* The callback is called synchronously during cancellation.
*
* @param callback Function to call on cancellation
* @returns Function to unregister the callback
*/
onCancel(callback: () => void): () => void {
if (this.isCancelled) {
// Already cancelled, call immediately
try {
callback();
} catch {
// Ignore callback errors
}
return () => {};
}
this.onCancelCallbacks.push(callback);
// Return unsubscribe function
return () => {
const index = this.onCancelCallbacks.indexOf(callback);
if (index !== -1) {
this.onCancelCallbacks.splice(index, 1);
}
};
}
/**
* Wait for this token to be cancelled.
* Useful for cleanup tasks that should run on cancellation.
*
* @example
* ```typescript
* // In a cleanup routine
* await token.waitForCancellation();
* cleanup();
* ```
*/
waitForCancellation(): Promise<void> {
if (this.isCancelled) {
return Promise.resolve();
}
return new Promise((resolve) => {
this.signal.addEventListener("abort", () => resolve(), { once: true });
});
}
/**
* Run an async function with this token's signal.
* Throws CancellationError if cancelled before completion.
*
* @example
* ```typescript
* const result = await token.run(async (signal) => {
* const response = await fetch(url, { signal });
* return response.json();
* });
* ```
*/
async run<T>(fn: (signal: AbortSignal) => Promise<T>): Promise<T> {
this.throwIfCancelled();
return fn(this.signal);
}
/**
* Detach this token from its parent.
* After detachment, cancelling the parent will not cancel this token.
*/
detach(): void {
if (this._parent) {
const index = this._parent.children.indexOf(this);
if (index !== -1) {
this._parent.children.splice(index, 1);
}
}
}
}
/**
* Create a cancellation token that automatically cancels after a timeout
*
* @example
* ```typescript
* const token = withTimeout(5000); // Cancel after 5 seconds
* await fetch(url, { signal: token.signal });
* ```
*/
export function withTimeout(ms: number, parent?: CancellationToken): CancellationToken {
const token = new CancellationToken(parent);
const timeout = setTimeout(() => {
token.cancel();
}, ms);
// Clear timeout if cancelled by other means
token.onCancel(() => {
clearTimeout(timeout);
});
return token;
}
/**
* Create a cancellation token from an existing AbortSignal
*/
export function fromAbortSignal(signal: AbortSignal): CancellationToken {
const token = new CancellationToken();
if (signal.aborted) {
token.cancel();
} else {
signal.addEventListener(
"abort",
() => {
token.cancel();
},
{ once: true },
);
}
return token;
}
/**
* Combine multiple cancellation tokens into one.
* The combined token is cancelled when ANY of the source tokens is cancelled.
*
* @example
* ```typescript
* const userToken = new CancellationToken();
* const timeoutToken = withTimeout(5000);
* const combined = combineTokens(userToken, timeoutToken);
* ```
*/
export function combineTokens(...tokens: CancellationToken[]): CancellationToken {
const combined = new CancellationToken();
for (const token of tokens) {
if (token.isCancelled) {
combined.cancel();
break;
}
token.onCancel(() => {
combined.cancel();
});
}
return combined;
}

353
src/shared/errors.ts Normal file
View file

@ -0,0 +1,353 @@
/**
* Error type hierarchy for Multica agent system.
* Provides typed errors with retry semantics and serialization support.
*/
/**
* Base error class for all Multica errors.
* Provides common functionality like error codes, retry semantics, and JSON serialization.
*/
export abstract class MulticaError extends Error {
/** Unique error code for programmatic handling */
abstract readonly code: string;
/** Whether this error type is generally retryable */
abstract readonly retryable: boolean;
/** Timestamp when the error occurred */
readonly timestamp = Date.now();
/** Additional context about the error */
readonly details: Record<string, unknown> | undefined;
constructor(message: string, details?: Record<string, unknown>) {
super(message);
this.name = this.constructor.name;
this.details = details;
// Maintains proper stack trace for where error was thrown (V8 only)
if (Error.captureStackTrace) {
Error.captureStackTrace(this, this.constructor);
}
}
/**
* Serialize error for logging or transmission
*/
toJSON(): Record<string, unknown> {
return {
name: this.name,
code: this.code,
message: this.message,
retryable: this.retryable,
timestamp: this.timestamp,
details: this.details,
stack: this.stack,
};
}
}
// =============================================================================
// Network / API Errors
// =============================================================================
/**
* General network connectivity error (DNS, TCP, TLS failures)
*/
export class NetworkError extends MulticaError {
readonly code = "NETWORK_ERROR" as const;
readonly retryable = true;
}
/**
* Streaming connection was unexpectedly disconnected
*/
export class StreamDisconnectedError extends MulticaError {
readonly code = "STREAM_DISCONNECTED" as const;
readonly retryable = true;
}
/**
* API rate limit exceeded
*/
export class RateLimitError extends MulticaError {
readonly code = "RATE_LIMIT" as const;
readonly retryable = true;
/** Seconds to wait before retrying (from Retry-After header) */
readonly retryAfter: number | undefined;
constructor(message: string, retryAfter?: number, details?: Record<string, unknown>) {
super(message, { ...details, retryAfter });
this.retryAfter = retryAfter;
}
}
/**
* API returned an error response
*/
export class APIError extends MulticaError {
readonly code = "API_ERROR" as const;
/** HTTP status code if available */
readonly statusCode: number | undefined;
/** Whether this specific API error is retryable */
readonly retryable: boolean;
constructor(
message: string,
statusCode?: number,
retryable = false,
details?: Record<string, unknown>,
) {
super(message, { ...details, statusCode });
this.statusCode = statusCode;
this.retryable = retryable;
}
}
// =============================================================================
// Session / Storage Errors
// =============================================================================
/**
* Failed to read or write session data
*/
export class SessionStorageError extends MulticaError {
readonly code = "SESSION_STORAGE_ERROR" as const;
readonly retryable = false;
/** The session ID that failed */
readonly sessionId: string | undefined;
constructor(message: string, sessionId?: string, details?: Record<string, unknown>) {
super(message, { ...details, sessionId });
this.sessionId = sessionId;
}
}
/**
* Session data is corrupted or partially unreadable
*/
export class SessionCorruptedError extends MulticaError {
readonly code = "SESSION_CORRUPTED" as const;
readonly retryable = false;
/** Number of entries that were successfully recovered */
readonly recoveredEntries: number | undefined;
/** Number of entries that were corrupted */
readonly corruptedEntries: number | undefined;
constructor(
message: string,
recoveredEntries?: number,
corruptedEntries?: number,
details?: Record<string, unknown>,
) {
super(message, { ...details, recoveredEntries, corruptedEntries });
this.recoveredEntries = recoveredEntries;
this.corruptedEntries = corruptedEntries;
}
}
// =============================================================================
// Compaction Errors
// =============================================================================
/**
* General compaction operation failure
*/
export class CompactionError extends MulticaError {
readonly code = "COMPACTION_ERROR" as const;
readonly retryable = true;
/** Compaction mode that failed */
readonly mode: string | undefined;
constructor(message: string, mode?: string, details?: Record<string, unknown>) {
super(message, { ...details, mode });
this.mode = mode;
}
}
/**
* Failed to generate conversation summary for compaction
*/
export class SummaryGenerationError extends MulticaError {
readonly code = "SUMMARY_GENERATION_ERROR" as const;
readonly retryable = true;
/** Number of messages that were being summarized */
readonly messagesCount: number | undefined;
constructor(message: string, messagesCount?: number, details?: Record<string, unknown>) {
super(message, { ...details, messagesCount });
this.messagesCount = messagesCount;
}
}
// =============================================================================
// Process Execution Errors
// =============================================================================
/**
* Process execution exceeded timeout
*/
export class ProcessTimeoutError extends MulticaError {
readonly code = "PROCESS_TIMEOUT" as const;
readonly retryable = false;
/** Timeout in milliseconds */
readonly timeoutMs: number | undefined;
/** Process ID if available */
readonly pid: number | undefined;
constructor(
message: string,
timeoutMs?: number,
pid?: number,
details?: Record<string, unknown>,
) {
super(message, { ...details, timeoutMs, pid });
this.timeoutMs = timeoutMs;
this.pid = pid;
}
}
/**
* Process was killed by signal
*/
export class ProcessKilledError extends MulticaError {
readonly code = "PROCESS_KILLED" as const;
readonly retryable = false;
/** Signal that killed the process */
readonly signal: string | undefined;
/** Process ID if available */
readonly pid: number | undefined;
constructor(
message: string,
signal?: string,
pid?: number,
details?: Record<string, unknown>,
) {
super(message, { ...details, signal, pid });
this.signal = signal;
this.pid = pid;
}
}
// =============================================================================
// Channel Errors
// =============================================================================
/**
* Attempted to use a closed channel
*/
export class ChannelClosedError extends MulticaError {
readonly code = "CHANNEL_CLOSED" as const;
readonly retryable = false;
}
// =============================================================================
// Hub / Gateway Errors
// =============================================================================
/**
* Failed to connect to gateway
*/
export class GatewayConnectionError extends MulticaError {
readonly code = "GATEWAY_CONNECTION_ERROR" as const;
readonly retryable = true;
/** Gateway URL that failed */
readonly url: string | undefined;
constructor(message: string, url?: string, details?: Record<string, unknown>) {
super(message, { ...details, url });
this.url = url;
}
}
/**
* Failed to deliver message through gateway
*/
export class MessageDeliveryError extends MulticaError {
readonly code = "MESSAGE_DELIVERY_ERROR" as const;
readonly retryable = true;
/** ID of the message that failed */
readonly messageId: string | undefined;
/** Target device ID */
readonly targetDeviceId: string | undefined;
constructor(
message: string,
messageId?: string,
targetDeviceId?: string,
details?: Record<string, unknown>,
) {
super(message, { ...details, messageId, targetDeviceId });
this.messageId = messageId;
this.targetDeviceId = targetDeviceId;
}
}
// =============================================================================
// Type Guards
// =============================================================================
/**
* Check if an error is a MulticaError
*/
export function isMulticaError(error: unknown): error is MulticaError {
return error instanceof MulticaError;
}
/**
* Check if an error is retryable
*/
export function isRetryableError(error: unknown): boolean {
if (error instanceof MulticaError) {
return error.retryable;
}
// Check for common transient error patterns
if (error instanceof Error) {
const message = error.message.toLowerCase();
return (
message.includes("network") ||
message.includes("timeout") ||
message.includes("econnreset") ||
message.includes("econnrefused") ||
message.includes("socket hang up") ||
message.includes("fetch failed")
);
}
return false;
}
/**
* All Jakarta error codes for type-safe handling
*/
export type MulticaErrorCode =
| "NETWORK_ERROR"
| "STREAM_DISCONNECTED"
| "RATE_LIMIT"
| "API_ERROR"
| "SESSION_STORAGE_ERROR"
| "SESSION_CORRUPTED"
| "COMPACTION_ERROR"
| "SUMMARY_GENERATION_ERROR"
| "PROCESS_TIMEOUT"
| "PROCESS_KILLED"
| "CHANNEL_CLOSED"
| "GATEWAY_CONNECTION_ERROR"
| "MESSAGE_DELIVERY_ERROR";

View file

@ -1,2 +1,5 @@
export * from "./types.js";
export * from "./errors.js";
export * from "./retry.js";
export * from "./cancellation.js";
export * from "./gateway-sdk/index.js";

231
src/shared/retry.ts Normal file
View file

@ -0,0 +1,231 @@
/**
* Retry utility with exponential backoff, jitter, and abort support.
*/
import { type MulticaError, isRetryableError, RateLimitError } from "./errors.js";
/**
* Options for retry behavior
*/
export interface RetryOptions {
/** Maximum number of attempts (default: 3) */
maxAttempts?: number;
/** Base delay in milliseconds (default: 1000) */
baseDelay?: number;
/** Maximum delay in milliseconds (default: 30000) */
maxDelay?: number;
/** Backoff multiplier (default: 2 for exponential) */
backoffFactor?: number;
/** Add randomness to delay to prevent thundering herd (default: true) */
jitter?: boolean;
/** Only retry errors with these codes (if specified) */
retryableErrors?: string[];
/** Abort signal to cancel retry loop */
signal?: AbortSignal;
/** Callback invoked before each retry */
onRetry?: (error: Error, attempt: number, delay: number) => void;
}
/**
* Result of a retry operation
*/
export interface RetryResult<T> {
/** The successful result value */
value: T;
/** Number of attempts made (1 = success on first try) */
attempts: number;
/** Total time spent including delays */
totalTimeMs: number;
}
/**
* Error thrown when operation is aborted
*/
export class AbortError extends Error {
constructor(message = "Operation aborted") {
super(message);
this.name = "AbortError";
}
}
/**
* Execute a function with automatic retry on failure.
*
* @example
* ```typescript
* const result = await withRetry(
* () => fetchData(),
* {
* maxAttempts: 3,
* baseDelay: 1000,
* onRetry: (err, attempt) => console.log(`Retry ${attempt}: ${err.message}`)
* }
* );
* ```
*/
export async function withRetry<T>(
fn: () => Promise<T>,
options: RetryOptions = {},
): Promise<T> {
const result = await withRetryResult(fn, options);
return result.value;
}
/**
* Execute a function with automatic retry, returning detailed result info.
*/
export async function withRetryResult<T>(
fn: () => Promise<T>,
options: RetryOptions = {},
): Promise<RetryResult<T>> {
const {
maxAttempts = 3,
baseDelay = 1000,
maxDelay = 30000,
backoffFactor = 2,
jitter = true,
retryableErrors,
signal,
onRetry,
} = options;
const startTime = Date.now();
let lastError: Error | undefined;
for (let attempt = 1; attempt <= maxAttempts; attempt++) {
// Check for abort before each attempt
if (signal?.aborted) {
throw new AbortError();
}
try {
const value = await fn();
return {
value,
attempts: attempt,
totalTimeMs: Date.now() - startTime,
};
} catch (error) {
lastError = error instanceof Error ? error : new Error(String(error));
// Check if we should retry
const shouldRetry = isErrorRetryable(lastError, retryableErrors);
if (!shouldRetry || attempt === maxAttempts) {
throw lastError;
}
// Calculate delay with exponential backoff
let delay = calculateDelay(attempt, baseDelay, maxDelay, backoffFactor);
// Handle rate limit retry-after
if (lastError instanceof RateLimitError && lastError.retryAfter) {
delay = Math.max(delay, lastError.retryAfter * 1000);
}
// Add jitter
if (jitter) {
delay = addJitter(delay);
}
// Notify before retry
onRetry?.(lastError, attempt, delay);
// Wait before retrying
await sleep(delay, signal);
}
}
// Should not reach here, but TypeScript needs this
throw lastError ?? new Error("Retry failed");
}
/**
* Check if an error should be retried based on options
*/
function isErrorRetryable(error: Error, allowedCodes?: string[]): boolean {
// If specific codes are provided, only retry those
if (allowedCodes && allowedCodes.length > 0) {
const jakartaError = error as MulticaError;
if (jakartaError.code) {
return allowedCodes.includes(jakartaError.code);
}
return false;
}
// Otherwise use default retryable check
return isRetryableError(error);
}
/**
* Calculate delay with exponential backoff
*/
function calculateDelay(
attempt: number,
baseDelay: number,
maxDelay: number,
backoffFactor: number,
): number {
const delay = baseDelay * Math.pow(backoffFactor, attempt - 1);
return Math.min(delay, maxDelay);
}
/**
* Add jitter to delay (±50%)
*/
function addJitter(delay: number): number {
// Random value between 0.5 and 1.5
const factor = 0.5 + Math.random();
return Math.floor(delay * factor);
}
/**
* Sleep for specified duration with abort support
*/
export function sleep(ms: number, signal?: AbortSignal): Promise<void> {
return new Promise((resolve, reject) => {
if (signal?.aborted) {
reject(new AbortError());
return;
}
const timeout = setTimeout(resolve, ms);
const abortHandler = () => {
clearTimeout(timeout);
reject(new AbortError());
};
signal?.addEventListener("abort", abortHandler, { once: true });
// Clean up abort listener after timeout completes
setTimeout(() => {
signal?.removeEventListener("abort", abortHandler);
}, ms + 1);
});
}
/**
* Create a retry wrapper with preset options
*
* @example
* ```typescript
* const retryWithDefaults = createRetry({ maxAttempts: 5, baseDelay: 2000 });
* const result = await retryWithDefaults(() => fetchData());
* ```
*/
export function createRetry(
defaultOptions: RetryOptions,
): <T>(fn: () => Promise<T>, options?: RetryOptions) => Promise<T> {
return <T>(fn: () => Promise<T>, options?: RetryOptions) =>
withRetry(fn, { ...defaultOptions, ...options });
}