feat(agent): emit compaction lifecycle events
Introduce MulticaEvent type system parallel to pi-agent-core's AgentEvent, with compaction_start and compaction_end events. Agent.subscribeAll() merges both event streams. maybeCompact() now emits events bracketing the compaction work, gated by a new SessionManager.needsCompaction() pre-check. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
parent
6e8f0a3c41
commit
70f3755552
5 changed files with 85 additions and 6 deletions
|
|
@ -3,11 +3,12 @@ import type { AgentEvent, AgentMessage } from "@mariozechner/pi-agent-core";
|
|||
import { Agent } from "./runner.js";
|
||||
import { Channel } from "./channel.js";
|
||||
import type { AgentOptions, Message } from "./types.js";
|
||||
import type { MulticaEvent } from "./events.js";
|
||||
|
||||
const devNull = { write: () => true } as unknown as NodeJS.WritableStream;
|
||||
|
||||
/** Discriminated union of legacy Message (error fallback) and raw AgentEvent */
|
||||
export type ChannelItem = Message | AgentEvent;
|
||||
/** Discriminated union of legacy Message, raw AgentEvent, and MulticaEvent */
|
||||
export type ChannelItem = Message | AgentEvent | MulticaEvent;
|
||||
|
||||
export class AsyncAgent {
|
||||
private readonly agent: Agent;
|
||||
|
|
@ -24,8 +25,8 @@ export class AsyncAgent {
|
|||
});
|
||||
this.sessionId = this.agent.sessionId;
|
||||
|
||||
// Forward raw AgentEvent into the channel
|
||||
this.agent.subscribe((event: AgentEvent) => {
|
||||
// Forward raw AgentEvent and MulticaEvent into the channel
|
||||
this.agent.subscribeAll((event: AgentEvent | MulticaEvent) => {
|
||||
this.channel.send(event);
|
||||
});
|
||||
}
|
||||
|
|
@ -64,10 +65,11 @@ export class AsyncAgent {
|
|||
/**
|
||||
* Subscribe to agent events directly (supports multiple subscribers).
|
||||
* Unlike read(), this allows multiple consumers to receive the same events.
|
||||
* Receives both pi-agent-core AgentEvent and MulticaEvent (e.g. compaction).
|
||||
*/
|
||||
subscribe(callback: (event: AgentEvent) => void): () => void {
|
||||
subscribe(callback: (event: AgentEvent | MulticaEvent) => void): () => void {
|
||||
console.log(`[AsyncAgent] Adding subscriber for agent: ${this.sessionId}`);
|
||||
const unsubscribe = this.agent.subscribe((event) => {
|
||||
const unsubscribe = this.agent.subscribeAll((event) => {
|
||||
console.log(`[AsyncAgent] Event received: ${event.type}`);
|
||||
callback(event);
|
||||
});
|
||||
|
|
|
|||
24
src/agent/events.ts
Normal file
24
src/agent/events.ts
Normal file
|
|
@ -0,0 +1,24 @@
|
|||
/**
|
||||
* Super Multica custom events (parallel to pi-agent-core's AgentEvent)
|
||||
*
|
||||
* These events extend the agent's event system with Multica-specific
|
||||
* lifecycle events that pi-agent-core does not provide.
|
||||
*/
|
||||
|
||||
/** Emitted when context compaction begins */
|
||||
export type CompactionStartEvent = {
|
||||
type: "compaction_start";
|
||||
};
|
||||
|
||||
/** Emitted when context compaction completes */
|
||||
export type CompactionEndEvent = {
|
||||
type: "compaction_end";
|
||||
removed: number;
|
||||
kept: number;
|
||||
tokensRemoved?: number;
|
||||
tokensKept?: number;
|
||||
reason: "count" | "tokens" | "summary";
|
||||
};
|
||||
|
||||
/** Union of all Multica-specific events */
|
||||
export type MulticaEvent = CompactionStartEvent | CompactionEndEvent;
|
||||
|
|
@ -1,5 +1,6 @@
|
|||
export * from "./runner.js";
|
||||
export * from "./types.js";
|
||||
export * from "./events.js";
|
||||
export * from "./profile/index.js";
|
||||
export * from "./context-window/index.js";
|
||||
export * from "./skills/index.js";
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
import { Agent as PiAgentCore, type AgentEvent, type AgentMessage } from "@mariozechner/pi-agent-core";
|
||||
import { v7 as uuidv7 } from "uuid";
|
||||
import type { AgentOptions, AgentRunResult, ReasoningMode } from "./types.js";
|
||||
import type { MulticaEvent } from "./events.js";
|
||||
import { createAgentOutput } from "./cli/output.js";
|
||||
import { resolveModel, resolveTools } from "./tools.js";
|
||||
import {
|
||||
|
|
@ -83,6 +84,9 @@ export class Agent {
|
|||
private readonly stderr: NodeJS.WritableStream;
|
||||
private initialized = false;
|
||||
|
||||
// MulticaEvent subscribers (parallel to PiAgentCore's subscriber list)
|
||||
private multicaListeners: Array<(event: MulticaEvent) => void> = [];
|
||||
|
||||
// Auth profile rotation state
|
||||
private resolvedProvider: string;
|
||||
private currentApiKey: string | undefined;
|
||||
|
|
@ -323,6 +327,27 @@ export class Agent {
|
|||
return this.agent.subscribe(fn);
|
||||
}
|
||||
|
||||
/** Subscribe to both AgentEvent and MulticaEvent streams */
|
||||
subscribeAll(fn: (event: AgentEvent | MulticaEvent) => void): () => void {
|
||||
const unsubCore = this.agent.subscribe(fn);
|
||||
this.multicaListeners.push(fn);
|
||||
return () => {
|
||||
unsubCore();
|
||||
const idx = this.multicaListeners.indexOf(fn);
|
||||
if (idx >= 0) this.multicaListeners.splice(idx, 1);
|
||||
};
|
||||
}
|
||||
|
||||
private emitMulticaEvent(event: MulticaEvent): void {
|
||||
for (const fn of this.multicaListeners) {
|
||||
try {
|
||||
fn(event);
|
||||
} catch {
|
||||
// Don't let listener errors break the agent loop
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async run(prompt: string): Promise<AgentRunResult> {
|
||||
if (!this.initialized) {
|
||||
await this.session.repairIfNeeded((msg) => console.error(msg));
|
||||
|
|
@ -449,10 +474,21 @@ export class Agent {
|
|||
|
||||
private async maybeCompact() {
|
||||
const messages = this.agent.state.messages.slice();
|
||||
if (!this.session.needsCompaction(messages)) return;
|
||||
|
||||
this.emitMulticaEvent({ type: "compaction_start" });
|
||||
const result = await this.session.maybeCompact(messages);
|
||||
if (result?.kept) {
|
||||
this.agent.replaceMessages(result.kept);
|
||||
}
|
||||
this.emitMulticaEvent({
|
||||
type: "compaction_end",
|
||||
removed: result?.removedCount ?? 0,
|
||||
kept: result?.kept.length ?? messages.length,
|
||||
tokensRemoved: result?.tokensRemoved,
|
||||
tokensKept: result?.tokensKept,
|
||||
reason: result?.reason ?? "tokens",
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -3,6 +3,7 @@ import { getModel, type Model } from "@mariozechner/pi-ai";
|
|||
import type { SessionEntry, SessionMeta } from "./types.js";
|
||||
import { appendEntry, readEntries, resolveSessionPath, writeEntries } from "./storage.js";
|
||||
import { compactMessages, compactMessagesAsync } from "./compaction.js";
|
||||
import { estimateTokenUsage, shouldCompact as shouldCompactTokens } from "../context-window/index.js";
|
||||
import { credentialManager } from "../credentials.js";
|
||||
import { repairSessionFileIfNeeded, type RepairReport } from "./session-file-repair.js";
|
||||
import { sanitizeToolCallInputs, sanitizeToolUseResultPairing } from "./session-transcript-repair.js";
|
||||
|
|
@ -193,6 +194,21 @@ export class SessionManager {
|
|||
);
|
||||
}
|
||||
|
||||
/** Check whether compaction would trigger for the given messages (without executing it) */
|
||||
needsCompaction(messages: AgentMessage[]): boolean {
|
||||
if (this.compactionMode === "count") {
|
||||
return messages.length > this.maxMessages;
|
||||
}
|
||||
// Token and summary modes use the same token-based threshold
|
||||
const estimation = estimateTokenUsage({
|
||||
messages,
|
||||
systemPrompt: this.systemPrompt,
|
||||
contextWindowTokens: this.contextWindowTokens,
|
||||
reserveTokens: this.reserveTokens,
|
||||
});
|
||||
return shouldCompactTokens(estimation);
|
||||
}
|
||||
|
||||
async maybeCompact(messages: AgentMessage[]) {
|
||||
let result;
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue