From 70f375555234c80eb3bfa8f2718f55e57f596177 Mon Sep 17 00:00:00 2001 From: yushen Date: Thu, 5 Feb 2026 14:54:52 +0800 Subject: [PATCH] feat(agent): emit compaction lifecycle events Introduce MulticaEvent type system parallel to pi-agent-core's AgentEvent, with compaction_start and compaction_end events. Agent.subscribeAll() merges both event streams. maybeCompact() now emits events bracketing the compaction work, gated by a new SessionManager.needsCompaction() pre-check. Co-Authored-By: Claude Opus 4.5 --- src/agent/async-agent.ts | 14 ++++++----- src/agent/events.ts | 24 +++++++++++++++++++ src/agent/index.ts | 1 + src/agent/runner.ts | 36 ++++++++++++++++++++++++++++ src/agent/session/session-manager.ts | 16 +++++++++++++ 5 files changed, 85 insertions(+), 6 deletions(-) create mode 100644 src/agent/events.ts diff --git a/src/agent/async-agent.ts b/src/agent/async-agent.ts index ebc06317..c181c5e2 100644 --- a/src/agent/async-agent.ts +++ b/src/agent/async-agent.ts @@ -3,11 +3,12 @@ import type { AgentEvent, AgentMessage } from "@mariozechner/pi-agent-core"; import { Agent } from "./runner.js"; import { Channel } from "./channel.js"; import type { AgentOptions, Message } from "./types.js"; +import type { MulticaEvent } from "./events.js"; const devNull = { write: () => true } as unknown as NodeJS.WritableStream; -/** Discriminated union of legacy Message (error fallback) and raw AgentEvent */ -export type ChannelItem = Message | AgentEvent; +/** Discriminated union of legacy Message, raw AgentEvent, and MulticaEvent */ +export type ChannelItem = Message | AgentEvent | MulticaEvent; export class AsyncAgent { private readonly agent: Agent; @@ -24,8 +25,8 @@ export class AsyncAgent { }); this.sessionId = this.agent.sessionId; - // Forward raw AgentEvent into the channel - this.agent.subscribe((event: AgentEvent) => { + // Forward raw AgentEvent and MulticaEvent into the channel + this.agent.subscribeAll((event: AgentEvent | MulticaEvent) => { this.channel.send(event); }); } @@ -64,10 +65,11 @@ export class AsyncAgent { /** * Subscribe to agent events directly (supports multiple subscribers). * Unlike read(), this allows multiple consumers to receive the same events. + * Receives both pi-agent-core AgentEvent and MulticaEvent (e.g. compaction). */ - subscribe(callback: (event: AgentEvent) => void): () => void { + subscribe(callback: (event: AgentEvent | MulticaEvent) => void): () => void { console.log(`[AsyncAgent] Adding subscriber for agent: ${this.sessionId}`); - const unsubscribe = this.agent.subscribe((event) => { + const unsubscribe = this.agent.subscribeAll((event) => { console.log(`[AsyncAgent] Event received: ${event.type}`); callback(event); }); diff --git a/src/agent/events.ts b/src/agent/events.ts new file mode 100644 index 00000000..e0429d5a --- /dev/null +++ b/src/agent/events.ts @@ -0,0 +1,24 @@ +/** + * Super Multica custom events (parallel to pi-agent-core's AgentEvent) + * + * These events extend the agent's event system with Multica-specific + * lifecycle events that pi-agent-core does not provide. + */ + +/** Emitted when context compaction begins */ +export type CompactionStartEvent = { + type: "compaction_start"; +}; + +/** Emitted when context compaction completes */ +export type CompactionEndEvent = { + type: "compaction_end"; + removed: number; + kept: number; + tokensRemoved?: number; + tokensKept?: number; + reason: "count" | "tokens" | "summary"; +}; + +/** Union of all Multica-specific events */ +export type MulticaEvent = CompactionStartEvent | CompactionEndEvent; diff --git a/src/agent/index.ts b/src/agent/index.ts index 16dac2f9..720eb5b6 100644 --- a/src/agent/index.ts +++ b/src/agent/index.ts @@ -1,5 +1,6 @@ export * from "./runner.js"; export * from "./types.js"; +export * from "./events.js"; export * from "./profile/index.js"; export * from "./context-window/index.js"; export * from "./skills/index.js"; diff --git a/src/agent/runner.ts b/src/agent/runner.ts index 00c0e3b2..261f315f 100644 --- a/src/agent/runner.ts +++ b/src/agent/runner.ts @@ -1,6 +1,7 @@ import { Agent as PiAgentCore, type AgentEvent, type AgentMessage } from "@mariozechner/pi-agent-core"; import { v7 as uuidv7 } from "uuid"; import type { AgentOptions, AgentRunResult, ReasoningMode } from "./types.js"; +import type { MulticaEvent } from "./events.js"; import { createAgentOutput } from "./cli/output.js"; import { resolveModel, resolveTools } from "./tools.js"; import { @@ -83,6 +84,9 @@ export class Agent { private readonly stderr: NodeJS.WritableStream; private initialized = false; + // MulticaEvent subscribers (parallel to PiAgentCore's subscriber list) + private multicaListeners: Array<(event: MulticaEvent) => void> = []; + // Auth profile rotation state private resolvedProvider: string; private currentApiKey: string | undefined; @@ -323,6 +327,27 @@ export class Agent { return this.agent.subscribe(fn); } + /** Subscribe to both AgentEvent and MulticaEvent streams */ + subscribeAll(fn: (event: AgentEvent | MulticaEvent) => void): () => void { + const unsubCore = this.agent.subscribe(fn); + this.multicaListeners.push(fn); + return () => { + unsubCore(); + const idx = this.multicaListeners.indexOf(fn); + if (idx >= 0) this.multicaListeners.splice(idx, 1); + }; + } + + private emitMulticaEvent(event: MulticaEvent): void { + for (const fn of this.multicaListeners) { + try { + fn(event); + } catch { + // Don't let listener errors break the agent loop + } + } + } + async run(prompt: string): Promise { if (!this.initialized) { await this.session.repairIfNeeded((msg) => console.error(msg)); @@ -449,10 +474,21 @@ export class Agent { private async maybeCompact() { const messages = this.agent.state.messages.slice(); + if (!this.session.needsCompaction(messages)) return; + + this.emitMulticaEvent({ type: "compaction_start" }); const result = await this.session.maybeCompact(messages); if (result?.kept) { this.agent.replaceMessages(result.kept); } + this.emitMulticaEvent({ + type: "compaction_end", + removed: result?.removedCount ?? 0, + kept: result?.kept.length ?? messages.length, + tokensRemoved: result?.tokensRemoved, + tokensKept: result?.tokensKept, + reason: result?.reason ?? "tokens", + }); } /** diff --git a/src/agent/session/session-manager.ts b/src/agent/session/session-manager.ts index fd27505a..bb55721a 100644 --- a/src/agent/session/session-manager.ts +++ b/src/agent/session/session-manager.ts @@ -3,6 +3,7 @@ import { getModel, type Model } from "@mariozechner/pi-ai"; import type { SessionEntry, SessionMeta } from "./types.js"; import { appendEntry, readEntries, resolveSessionPath, writeEntries } from "./storage.js"; import { compactMessages, compactMessagesAsync } from "./compaction.js"; +import { estimateTokenUsage, shouldCompact as shouldCompactTokens } from "../context-window/index.js"; import { credentialManager } from "../credentials.js"; import { repairSessionFileIfNeeded, type RepairReport } from "./session-file-repair.js"; import { sanitizeToolCallInputs, sanitizeToolUseResultPairing } from "./session-transcript-repair.js"; @@ -193,6 +194,21 @@ export class SessionManager { ); } + /** Check whether compaction would trigger for the given messages (without executing it) */ + needsCompaction(messages: AgentMessage[]): boolean { + if (this.compactionMode === "count") { + return messages.length > this.maxMessages; + } + // Token and summary modes use the same token-based threshold + const estimation = estimateTokenUsage({ + messages, + systemPrompt: this.systemPrompt, + contextWindowTokens: this.contextWindowTokens, + reserveTokens: this.reserveTokens, + }); + return shouldCompactTokens(estimation); + } + async maybeCompact(messages: AgentMessage[]) { let result;