Merge pull request #99 from multica-ai/message-aggregation

feat(hub): add message aggregation module for third-party messaging integrations
This commit is contained in:
LinYushen 2026-02-06 12:51:14 +08:00 committed by GitHub
commit 3b09d8d44d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 1035 additions and 0 deletions

View file

@ -0,0 +1,254 @@
import { describe, it, expect } from "vitest";
import { BlockChunker, type BlockChunkerConfig } from "./block-chunker.js";
function cfg(overrides?: Partial<BlockChunkerConfig>): BlockChunkerConfig {
return {
minChars: 50,
maxChars: 200,
breakPreference: "paragraph",
...overrides,
};
}
describe("BlockChunker", () => {
describe("tryChunk", () => {
it("returns null when buffer is below minChars", () => {
const chunker = new BlockChunker(cfg({ minChars: 100 }));
expect(chunker.tryChunk("short text")).toBeNull();
});
it("returns null when buffer is between minChars and maxChars with no break", () => {
const chunker = new BlockChunker(cfg({ minChars: 10, maxChars: 200 }));
// No whitespace, newline, or punctuation — no break possible
const text = "a".repeat(50);
expect(chunker.tryChunk(text)).toBeNull();
});
it("splits at paragraph break (\\n\\n) for paragraph preference", () => {
const chunker = new BlockChunker(cfg({ minChars: 10, maxChars: 200 }));
const text = "Hello world, this is a test.\n\nThis is a new paragraph.";
const result = chunker.tryChunk(text);
expect(result).not.toBeNull();
expect(result!.chunk).toBe("Hello world, this is a test.\n\n");
expect(result!.remainder).toBe("This is a new paragraph.");
});
it("splits at newline when no paragraph break available", () => {
const chunker = new BlockChunker(cfg({ minChars: 10, maxChars: 200 }));
const text = "Hello world, this is a test.\nThis is the next line.";
const result = chunker.tryChunk(text);
expect(result).not.toBeNull();
expect(result!.chunk).toBe("Hello world, this is a test.\n");
expect(result!.remainder).toBe("This is the next line.");
});
it("splits at sentence end when no newline available", () => {
const chunker = new BlockChunker(cfg({ minChars: 10, maxChars: 200 }));
const text = "Hello world, this is a test. This is the next sentence.";
const result = chunker.tryChunk(text);
expect(result).not.toBeNull();
expect(result!.chunk).toBe("Hello world, this is a test. ");
expect(result!.remainder).toBe("This is the next sentence.");
});
it("splits at word boundary as last resort", () => {
const chunker = new BlockChunker(cfg({ minChars: 10, maxChars: 200 }));
// No paragraph, newline, or sentence break — only spaces
const text = "word1 word2 word3 word4 word5 word6 word7 word8 word9 word10";
const result = chunker.tryChunk(text);
expect(result).not.toBeNull();
// Should split at the last word boundary
expect(result!.chunk.endsWith(" ")).toBe(true);
expect(result!.chunk.length + result!.remainder.length).toBe(text.length);
});
it("hard-cuts at maxChars when no natural break found", () => {
const chunker = new BlockChunker(cfg({ minChars: 10, maxChars: 50 }));
const text = "a".repeat(100); // No breaks possible
const result = chunker.tryChunk(text);
expect(result).not.toBeNull();
expect(result!.chunk).toBe("a".repeat(50));
expect(result!.remainder).toBe("a".repeat(50));
});
it("prefers break closest to end of search window (larger chunks)", () => {
const chunker = new BlockChunker(cfg({ minChars: 5, maxChars: 100 }));
const text = "aaa\nbbb\nccc\nddd";
const result = chunker.tryChunk(text);
expect(result).not.toBeNull();
// Should pick the last newline break (after "ccc\n")
expect(result!.chunk).toBe("aaa\nbbb\nccc\n");
expect(result!.remainder).toBe("ddd");
});
it("respects breakPreference: newline skips paragraph-only search", () => {
const chunker = new BlockChunker(cfg({ minChars: 10, maxChars: 200, breakPreference: "newline" }));
const text = "Hello world, this is a test.\nSecond line here.";
const result = chunker.tryChunk(text);
expect(result).not.toBeNull();
expect(result!.chunk).toBe("Hello world, this is a test.\n");
});
it("respects breakPreference: sentence skips paragraph and newline", () => {
const chunker = new BlockChunker(cfg({ minChars: 10, maxChars: 200, breakPreference: "sentence" }));
const text = "First sentence.\nSecond sentence here! And a third.";
const result = chunker.tryChunk(text);
expect(result).not.toBeNull();
// Sentence preference should find sentence breaks, scanning backwards
// The last sentence break is after "third." — but that includes newlines too
// Actually, sentence breakers scan backwards, so they'll find "third." last
expect(result!.remainder.length).toBeGreaterThan(0);
});
it("handles sentence break at end of string by splitting earlier", () => {
const chunker = new BlockChunker(cfg({ minChars: 10, maxChars: 200, breakPreference: "sentence" }));
const text = "First sentence. Second sentence.";
const result = chunker.tryChunk(text);
expect(result).not.toBeNull();
// The last period is at end of text (not a useful break), so it splits at the first sentence
expect(result!.chunk).toBe("First sentence. ");
expect(result!.remainder).toBe("Second sentence.");
});
it("handles exclamation mark as sentence break", () => {
const chunker = new BlockChunker(cfg({ minChars: 10, maxChars: 200, breakPreference: "sentence" }));
const text = "Hello world! This is great! More text here";
const result = chunker.tryChunk(text);
expect(result).not.toBeNull();
expect(result!.chunk).toContain("!");
});
it("handles question mark as sentence break", () => {
const chunker = new BlockChunker(cfg({ minChars: 10, maxChars: 200, breakPreference: "sentence" }));
const text = "Is this working? I hope so? Let us see";
const result = chunker.tryChunk(text);
expect(result).not.toBeNull();
expect(result!.chunk).toContain("?");
});
it("emits multiple chunks for very long text", () => {
const chunker = new BlockChunker(cfg({ minChars: 10, maxChars: 50 }));
const text = "First paragraph here.\n\nSecond paragraph here.\n\nThird paragraph.";
const chunks: string[] = [];
let buffer = text;
let result = chunker.tryChunk(buffer);
while (result) {
chunks.push(result.chunk);
buffer = result.remainder;
result = chunker.tryChunk(buffer);
}
// Flush remainder
const flushed = chunker.flush(buffer);
if (flushed) chunks.push(flushed.chunk);
expect(chunks.length).toBeGreaterThan(1);
expect(chunks.join("")).toBe(text);
});
});
describe("flush", () => {
it("returns entire buffer as chunk", () => {
const chunker = new BlockChunker(cfg());
const result = chunker.flush("some remaining text");
expect(result).not.toBeNull();
expect(result!.chunk).toBe("some remaining text");
expect(result!.remainder).toBe("");
});
it("returns null for empty buffer", () => {
const chunker = new BlockChunker(cfg());
expect(chunker.flush("")).toBeNull();
});
});
describe("fence safety", () => {
it("does not split inside a fenced code block at natural breaks", () => {
const chunker = new BlockChunker(cfg({ minChars: 10, maxChars: 200 }));
const text = "Before code:\n\n```python\ndef foo():\n return 42\n```\n\nAfter code.";
const result = chunker.tryChunk(text);
expect(result).not.toBeNull();
// Should NOT split at the \n\n inside the code block
// The first valid paragraph break is after "Before code:\n\n"
// or after the closing fence "```\n\n"
const chunk = result!.chunk;
// The chunk should either be "Before code:\n\n" or include the entire fence
// Since we scan backwards, the last paragraph break after ```\n\n should be found first
const fenceOpens = (chunk.match(/```/g) || []).length;
// If chunk contains an opening fence, it must also contain the closing fence
if (fenceOpens > 0) {
expect(fenceOpens % 2).toBe(0);
}
});
it("closes and reopens fence on hard cut", () => {
const chunker = new BlockChunker(cfg({ minChars: 10, maxChars: 50 }));
const text = "```python\n" + "x = 1\n".repeat(20) + "```";
const result = chunker.tryChunk(text);
expect(result).not.toBeNull();
// Chunk should end with closing fence
expect(result!.chunk.trimEnd()).toMatch(/```$/);
// Remainder should start with reopened fence
expect(result!.remainder).toMatch(/^```python\n/);
});
it("preserves language tag when reopening fence", () => {
const chunker = new BlockChunker(cfg({ minChars: 10, maxChars: 60 }));
const text = "```typescript\nconst a = 1;\nconst b = 2;\nconst c = 3;\nconst d = 4;\nconst e = 5;\n```";
const result = chunker.tryChunk(text);
if (result && result.remainder.startsWith("```")) {
expect(result.remainder).toMatch(/^```typescript\n/);
}
});
it("handles ~~~ fence markers", () => {
const chunker = new BlockChunker(cfg({ minChars: 10, maxChars: 50 }));
const text = "~~~python\n" + "x = 1\n".repeat(20) + "~~~";
const result = chunker.tryChunk(text);
expect(result).not.toBeNull();
// Chunk should end with closing tilde fence
expect(result!.chunk.trimEnd()).toMatch(/~~~$/);
// Remainder should start with reopened tilde fence
expect(result!.remainder).toMatch(/^~~~python\n/);
});
it("handles text before and after a code block", () => {
const chunker = new BlockChunker(cfg({ minChars: 10, maxChars: 500 }));
const text = "Some text before.\n\n```\ncode here\n```\n\nSome text after the code block.";
const result = chunker.tryChunk(text);
expect(result).not.toBeNull();
// Should split at a paragraph break that is NOT inside a fence
const chunk = result!.chunk;
const remainder = result!.remainder;
expect(chunk + remainder).toBe(text);
});
it("does not treat fence with info string as closing fence", () => {
const chunker = new BlockChunker(cfg({ minChars: 10, maxChars: 500 }));
// The second ```python should NOT close the first fence (CommonMark: closing fence has no info string)
const text = "Before.\n\n```python\ncode line 1\n```python\ncode line 2\n```\n\nAfter text here.";
const result = chunker.tryChunk(text);
expect(result).not.toBeNull();
const chunk = result!.chunk;
// The split should not land between ```python and ```python (inside fence)
// It should either be before the fence or after the closing ```
const fenceOpens = (chunk.match(/```python/g) || []).length;
const fenceCloses = (chunk.match(/^```$/gm) || []).length;
if (fenceOpens > 0) {
// If chunk includes opening fences, it must include the real close
expect(fenceCloses).toBeGreaterThanOrEqual(1);
}
});
it("handles multiple sequential code blocks", () => {
const chunker = new BlockChunker(cfg({ minChars: 10, maxChars: 500 }));
const text = "```js\nfoo()\n```\n\n```py\nbar()\n```\n\nEnd.";
const result = chunker.tryChunk(text);
expect(result).not.toBeNull();
// Should be able to split between the two code blocks
const chunk = result!.chunk;
const remainder = result!.remainder;
expect(chunk + remainder).toBe(text);
});
});
});

240
src/hub/block-chunker.ts Normal file
View file

@ -0,0 +1,240 @@
/**
* Block chunker for splitting streaming text at natural boundaries.
*
* Used by MessageAggregator to produce appropriately-sized text blocks
* for messaging platforms that cannot consume raw streaming deltas.
*/
export interface BlockChunkerConfig {
/** Minimum text chars before attempting a break (default: 200) */
minChars: number;
/** Hard maximum chunk size — will force-break here (default: 2000) */
maxChars: number;
/** Preferred break type priority */
breakPreference: "paragraph" | "newline" | "sentence";
}
export const DEFAULT_CHUNKER_CONFIG: BlockChunkerConfig = {
minChars: 200,
maxChars: 2000,
breakPreference: "paragraph",
};
export interface ChunkResult {
/** Text to emit as a block */
chunk: string;
/** Remaining text to keep in buffer */
remainder: string;
}
interface FenceInfo {
marker: string; // "```" or "~~~"
lang: string; // language identifier, e.g. "typescript"
}
/**
* Detect if `text` has an unclosed fenced code block at the given position.
* Scans line-by-line tracking fence open/close pairs.
*/
function detectFenceAt(text: string, upTo: number): FenceInfo | null {
const region = text.slice(0, upTo);
const lines = region.split("\n");
let openFence: FenceInfo | null = null;
for (const line of lines) {
const match = line.match(/^(`{3,}|~{3,})(\S*)\s*$/);
if (!match) continue;
const marker = match[1];
const lang = match[2] ?? "";
const markerChar = marker[0];
if (openFence === null) {
// Opening a new fence
openFence = { marker, lang };
} else if (markerChar === openFence.marker[0] && marker.length >= openFence.marker.length && lang === "") {
// Closing the current fence (same char, at least as many chars, no info string per CommonMark)
openFence = null;
}
// Otherwise: different char or shorter marker, not a close — ignore
}
return openFence;
}
/**
* Check if position is inside an unclosed fenced code block.
*/
function isInsideFence(text: string, position: number): boolean {
return detectFenceAt(text, position) !== null;
}
/**
* If a chunk ends inside an open fence, close it in the chunk
* and reopen it in the remainder.
*/
function applyFenceSafety(chunk: string, remainder: string): { chunk: string; remainder: string } {
const fence = detectFenceAt(chunk, chunk.length);
if (!fence) return { chunk, remainder };
// Close the fence in the chunk
const closedChunk = chunk.endsWith("\n")
? chunk + fence.marker + "\n"
: chunk + "\n" + fence.marker + "\n";
// Reopen the fence in the remainder
const reopenTag = fence.lang ? `${fence.marker}${fence.lang}` : fence.marker;
const reopenedRemainder = reopenTag + "\n" + remainder;
return { chunk: closedChunk, remainder: reopenedRemainder };
}
export class BlockChunker {
private readonly minChars: number;
private readonly maxChars: number;
private readonly breakPreference: "paragraph" | "newline" | "sentence";
constructor(config: BlockChunkerConfig) {
this.minChars = config.minChars;
this.maxChars = config.maxChars;
this.breakPreference = config.breakPreference;
}
/**
* Attempt to extract a chunk from the buffer.
* Returns null if buffer is below minChars or no suitable break found
* (unless buffer exceeds maxChars, in which case a hard break is forced).
*/
tryChunk(buffer: string): ChunkResult | null {
if (buffer.length < this.minChars) return null;
// Search window: minChars..min(buffer.length, maxChars)
const searchEnd = Math.min(buffer.length, this.maxChars);
const breakIndex = this.findBreakIndex(buffer, this.minChars, searchEnd);
if (breakIndex !== -1) {
return {
chunk: buffer.slice(0, breakIndex),
remainder: buffer.slice(breakIndex),
};
}
// No break found within search window
if (buffer.length >= this.maxChars) {
// Hard cut at maxChars with fence safety
const raw = {
chunk: buffer.slice(0, this.maxChars),
remainder: buffer.slice(this.maxChars),
};
return applyFenceSafety(raw.chunk, raw.remainder);
}
// Buffer is between minChars and maxChars with no break — wait for more text
return null;
}
/**
* Force-flush: return entire buffer as a chunk.
*/
flush(buffer: string): ChunkResult | null {
if (buffer.length === 0) return null;
return { chunk: buffer, remainder: "" };
}
/**
* Find the best break index in the buffer within [searchStart, searchEnd).
* Scans backwards from searchEnd to prefer later breaks (larger chunks).
* Returns the index AFTER the break character(s) i.e., the start of the remainder.
* Returns -1 if no suitable break found.
*/
private findBreakIndex(buffer: string, searchStart: number, searchEnd: number): number {
const breakers = this.getBreakers();
const bufLen = buffer.length;
for (const breaker of breakers) {
const index = breaker(buffer, searchStart, searchEnd, bufLen);
if (index !== -1 && !isInsideFence(buffer, index)) {
return index;
}
}
return -1;
}
/**
* Get break functions in priority order based on breakPreference.
*/
private getBreakers(): Array<(buffer: string, start: number, end: number, bufLen: number) => number> {
switch (this.breakPreference) {
case "paragraph":
return [findParagraphBreak, findNewlineBreak, findSentenceBreak, findWordBreak];
case "newline":
return [findNewlineBreak, findSentenceBreak, findWordBreak];
case "sentence":
return [findSentenceBreak, findWordBreak];
}
}
}
/**
* Find a paragraph break (\n\n) scanning backwards from end.
* Returns the index after the break (start of next paragraph).
*/
function findParagraphBreak(buffer: string, start: number, end: number, bufLen: number): number {
for (let i = end - 1; i >= start + 1; i--) {
if (buffer[i] === "\n" && buffer[i - 1] === "\n") {
const idx = i + 1;
if (idx < bufLen) return idx;
}
}
return -1;
}
/**
* Find a newline break (\n) scanning backwards from end.
* Returns the index after the newline.
*/
function findNewlineBreak(buffer: string, start: number, end: number, bufLen: number): number {
for (let i = end - 1; i >= start; i--) {
if (buffer[i] === "\n") {
const idx = i + 1;
if (idx < bufLen) return idx;
}
}
return -1;
}
/**
* Find a sentence break (.!? followed by whitespace or end of string).
* Returns the index after the whitespace following the punctuation.
*/
function findSentenceBreak(buffer: string, start: number, end: number, bufLen: number): number {
for (let i = end - 1; i >= start; i--) {
const ch = buffer[i];
if (ch === "." || ch === "!" || ch === "?") {
const next = i + 1;
if (next < bufLen && /\s/.test(buffer[next])) {
// Break after the whitespace
const idx = next + 1;
if (idx < bufLen) return idx;
} else if (next >= bufLen) {
// Punctuation at end of text — not a useful split point, skip
continue;
}
}
}
return -1;
}
/**
* Find a word boundary (whitespace) scanning backwards from end.
* Returns the index after the whitespace character.
*/
function findWordBreak(buffer: string, start: number, end: number, bufLen: number): number {
for (let i = end - 1; i >= start; i--) {
if (/\s/.test(buffer[i])) {
const idx = i + 1;
if (idx < bufLen) return idx;
}
}
return -1;
}

View file

@ -0,0 +1,403 @@
import { describe, it, expect, vi, beforeEach } from "vitest";
import { MessageAggregator, type BlockReply, type BlockChunkerConfig } from "./message-aggregator.js";
import type { AgentEvent } from "@mariozechner/pi-agent-core";
import type { MulticaEvent } from "../agent/events.js";
// --- Test helpers ---
function makeMessageStart(id = "msg-1"): AgentEvent {
return {
type: "message_start",
message: { role: "assistant", content: [], id },
} as unknown as AgentEvent;
}
function makeMessageUpdate(fullText: string): AgentEvent {
return {
type: "message_update",
message: {
role: "assistant",
content: [{ type: "text", text: fullText }],
},
} as unknown as AgentEvent;
}
function makeMessageUpdateWithThinking(text: string, thinking: string): AgentEvent {
return {
type: "message_update",
message: {
role: "assistant",
content: [
{ type: "thinking", thinking },
{ type: "text", text },
],
},
} as unknown as AgentEvent;
}
function makeMessageEnd(fullText: string): AgentEvent {
return {
type: "message_end",
message: {
role: "assistant",
content: [{ type: "text", text: fullText }],
stopReason: "end_turn",
},
} as unknown as AgentEvent;
}
function makeToolStart(name = "exec"): AgentEvent {
return {
type: "tool_execution_start",
toolName: name,
args: {},
toolCallId: "tool-1",
} as unknown as AgentEvent;
}
function makeToolEnd(name = "exec"): AgentEvent {
return {
type: "tool_execution_end",
toolName: name,
toolCallId: "tool-1",
result: {},
isError: false,
} as unknown as AgentEvent;
}
function makeCompactionStart(): MulticaEvent {
return { type: "compaction_start" };
}
function makeCompactionEnd(): MulticaEvent {
return { type: "compaction_end", removed: 5, kept: 10, reason: "tokens" };
}
function smallConfig(overrides?: Partial<BlockChunkerConfig>): BlockChunkerConfig {
return {
minChars: 20,
maxChars: 100,
breakPreference: "paragraph",
...overrides,
};
}
describe("MessageAggregator", () => {
let blocks: BlockReply[];
let passedThrough: Array<AgentEvent | MulticaEvent>;
let onBlock: ReturnType<typeof vi.fn>;
let onPassthrough: ReturnType<typeof vi.fn>;
beforeEach(() => {
blocks = [];
passedThrough = [];
onBlock = vi.fn((block: BlockReply) => blocks.push(block));
onPassthrough = vi.fn((event: AgentEvent | MulticaEvent) => passedThrough.push(event));
});
describe("event routing", () => {
it("passes through tool_execution_start immediately", () => {
const agg = new MessageAggregator(smallConfig(), onBlock, onPassthrough);
const event = makeToolStart();
agg.handleEvent(event);
expect(onPassthrough).toHaveBeenCalledWith(event);
expect(onBlock).not.toHaveBeenCalled();
});
it("passes through tool_execution_end immediately", () => {
const agg = new MessageAggregator(smallConfig(), onBlock, onPassthrough);
const event = makeToolEnd();
agg.handleEvent(event);
expect(onPassthrough).toHaveBeenCalledWith(event);
expect(onBlock).not.toHaveBeenCalled();
});
it("passes through tool_execution_update immediately", () => {
const agg = new MessageAggregator(smallConfig(), onBlock, onPassthrough);
const event = { type: "tool_execution_update", toolCallId: "tool-1", content: "output" } as unknown as AgentEvent;
agg.handleEvent(event);
expect(onPassthrough).toHaveBeenCalledWith(event);
expect(onBlock).not.toHaveBeenCalled();
});
it("passes through compaction_start immediately", () => {
const agg = new MessageAggregator(smallConfig(), onBlock, onPassthrough);
const event = makeCompactionStart();
agg.handleEvent(event);
expect(onPassthrough).toHaveBeenCalledWith(event);
});
it("passes through compaction_end immediately", () => {
const agg = new MessageAggregator(smallConfig(), onBlock, onPassthrough);
const event = makeCompactionEnd();
agg.handleEvent(event);
expect(onPassthrough).toHaveBeenCalledWith(event);
});
it("passes through message_start immediately and resets state", () => {
const agg = new MessageAggregator(smallConfig(), onBlock, onPassthrough);
const event = makeMessageStart();
agg.handleEvent(event);
expect(onPassthrough).toHaveBeenCalledWith(event);
expect(onBlock).not.toHaveBeenCalled();
});
it("passes through message_end after flushing buffer", () => {
const agg = new MessageAggregator(smallConfig(), onBlock, onPassthrough);
agg.handleEvent(makeMessageStart());
// Add some text that is below minChars
agg.handleEvent(makeMessageUpdate("Hello world"));
// End the message — should flush the buffer as a block, then pass through message_end
const endEvent = makeMessageEnd("Hello world");
agg.handleEvent(endEvent);
expect(blocks).toHaveLength(1);
expect(blocks[0].text).toBe("Hello world");
expect(blocks[0].isFinal).toBe(true);
// message_start + message_end both passed through
const passthroughTypes = passedThrough.map((e) => e.type);
expect(passthroughTypes).toContain("message_start");
expect(passthroughTypes).toContain("message_end");
});
});
describe("text delta computation", () => {
it("computes delta from accumulated text in message_update", () => {
const agg = new MessageAggregator(smallConfig({ minChars: 1000 }), onBlock, onPassthrough);
agg.handleEvent(makeMessageStart());
// First update: "Hello"
agg.handleEvent(makeMessageUpdate("Hello"));
// Second update: "Hello world" (full text, not delta)
agg.handleEvent(makeMessageUpdate("Hello world"));
// Flush to see accumulated text
agg.handleEvent(makeMessageEnd("Hello world"));
expect(blocks).toHaveLength(1);
expect(blocks[0].text).toBe("Hello world");
});
it("ignores ThinkingContent blocks, only extracts text", () => {
const agg = new MessageAggregator(smallConfig({ minChars: 1000 }), onBlock, onPassthrough);
agg.handleEvent(makeMessageStart());
agg.handleEvent(makeMessageUpdateWithThinking("visible text", "internal thinking"));
agg.handleEvent(makeMessageEnd("visible text"));
expect(blocks).toHaveLength(1);
expect(blocks[0].text).toBe("visible text");
expect(blocks[0].text).not.toContain("internal thinking");
});
it("handles empty delta (duplicate event) gracefully", () => {
const agg = new MessageAggregator(smallConfig({ minChars: 1000 }), onBlock, onPassthrough);
agg.handleEvent(makeMessageStart());
agg.handleEvent(makeMessageUpdate("Hello"));
agg.handleEvent(makeMessageUpdate("Hello")); // duplicate
agg.handleEvent(makeMessageUpdate("Hello")); // duplicate again
agg.handleEvent(makeMessageEnd("Hello"));
expect(blocks).toHaveLength(1);
expect(blocks[0].text).toBe("Hello");
});
it("handles monotonically growing text correctly", () => {
const agg = new MessageAggregator(smallConfig({ minChars: 1000 }), onBlock, onPassthrough);
agg.handleEvent(makeMessageStart());
agg.handleEvent(makeMessageUpdate("H"));
agg.handleEvent(makeMessageUpdate("He"));
agg.handleEvent(makeMessageUpdate("Hel"));
agg.handleEvent(makeMessageUpdate("Hell"));
agg.handleEvent(makeMessageUpdate("Hello"));
agg.handleEvent(makeMessageEnd("Hello"));
expect(blocks).toHaveLength(1);
expect(blocks[0].text).toBe("Hello");
});
});
describe("block emission", () => {
it("does not emit block when buffer is below minChars", () => {
const agg = new MessageAggregator(smallConfig({ minChars: 100 }), onBlock, onPassthrough);
agg.handleEvent(makeMessageStart());
agg.handleEvent(makeMessageUpdate("Short text."));
expect(onBlock).not.toHaveBeenCalled();
});
it("emits block when buffer reaches paragraph break after minChars", () => {
const agg = new MessageAggregator(smallConfig({ minChars: 20 }), onBlock, onPassthrough);
agg.handleEvent(makeMessageStart());
const text = "This is the first paragraph with enough chars.\n\nSecond paragraph starts here.";
agg.handleEvent(makeMessageUpdate(text));
expect(blocks.length).toBeGreaterThanOrEqual(1);
expect(blocks[0].text).toContain("first paragraph");
expect(blocks[0].isFinal).toBe(false);
});
it("emits multiple blocks for very long text", () => {
const agg = new MessageAggregator(smallConfig({ minChars: 20, maxChars: 50 }), onBlock, onPassthrough);
agg.handleEvent(makeMessageStart());
const text = "First paragraph content.\n\nSecond paragraph content.\n\nThird paragraph content.";
agg.handleEvent(makeMessageUpdate(text));
expect(blocks.length).toBeGreaterThanOrEqual(2);
// All blocks except the last should have isFinal=false
for (let i = 0; i < blocks.length; i++) {
expect(blocks[i].isFinal).toBe(false);
}
});
it("flushes remaining buffer on message_end with isFinal=true", () => {
const agg = new MessageAggregator(smallConfig({ minChars: 20, maxChars: 100 }), onBlock, onPassthrough);
agg.handleEvent(makeMessageStart());
const text = "First paragraph here.\n\nSmall tail.";
agg.handleEvent(makeMessageUpdate(text));
agg.handleEvent(makeMessageEnd(text));
const finalBlock = blocks[blocks.length - 1];
expect(finalBlock.isFinal).toBe(true);
});
it("increments block index for each emitted block", () => {
const agg = new MessageAggregator(smallConfig({ minChars: 20, maxChars: 50 }), onBlock, onPassthrough);
agg.handleEvent(makeMessageStart());
const text = "First paragraph content.\n\nSecond paragraph content.\n\nThird paragraph.";
agg.handleEvent(makeMessageUpdate(text));
agg.handleEvent(makeMessageEnd(text));
for (let i = 0; i < blocks.length; i++) {
expect(blocks[i].index).toBe(i);
}
});
it("resets state on message_start", () => {
const agg = new MessageAggregator(smallConfig({ minChars: 1000 }), onBlock, onPassthrough);
// First message cycle
agg.handleEvent(makeMessageStart("msg-1"));
agg.handleEvent(makeMessageUpdate("First message text."));
agg.handleEvent(makeMessageEnd("First message text."));
expect(blocks).toHaveLength(1);
expect(blocks[0].index).toBe(0);
// Second message cycle — index should reset
agg.handleEvent(makeMessageStart("msg-2"));
agg.handleEvent(makeMessageUpdate("Second message text."));
agg.handleEvent(makeMessageEnd("Second message text."));
expect(blocks).toHaveLength(2);
expect(blocks[1].index).toBe(0); // Reset after new message_start
});
it("does not emit empty block on message_end with no content", () => {
const agg = new MessageAggregator(smallConfig(), onBlock, onPassthrough);
agg.handleEvent(makeMessageStart());
agg.handleEvent(makeMessageEnd(""));
expect(onBlock).not.toHaveBeenCalled();
});
});
describe("interleaved events", () => {
it("handles tool events interleaved with text updates", () => {
const agg = new MessageAggregator(smallConfig({ minChars: 1000 }), onBlock, onPassthrough);
agg.handleEvent(makeMessageStart());
agg.handleEvent(makeMessageUpdate("Before tool call."));
agg.handleEvent(makeToolStart());
agg.handleEvent(makeToolEnd());
// After tool execution, the model continues generating text
// Note: message_update accumulates full text, so it includes everything
agg.handleEvent(makeMessageUpdate("Before tool call. After tool result."));
agg.handleEvent(makeMessageEnd("Before tool call. After tool result."));
// Tool events should have passed through
const toolEvents = passedThrough.filter(
(e) => e.type === "tool_execution_start" || e.type === "tool_execution_end",
);
expect(toolEvents).toHaveLength(2);
// Final block should contain all text
expect(blocks).toHaveLength(1);
expect(blocks[0].text).toBe("Before tool call. After tool result.");
});
it("handles multiple message cycles (reset between)", () => {
const agg = new MessageAggregator(smallConfig({ minChars: 1000 }), onBlock, onPassthrough);
// Cycle 1
agg.handleEvent(makeMessageStart("msg-1"));
agg.handleEvent(makeMessageUpdate("First response."));
agg.handleEvent(makeMessageEnd("First response."));
// Cycle 2
agg.handleEvent(makeMessageStart("msg-2"));
agg.handleEvent(makeMessageUpdate("Second response."));
agg.handleEvent(makeMessageEnd("Second response."));
expect(blocks).toHaveLength(2);
expect(blocks[0].text).toBe("First response.");
expect(blocks[1].text).toBe("Second response.");
// Both should be final (flushed on message_end)
expect(blocks[0].isFinal).toBe(true);
expect(blocks[1].isFinal).toBe(true);
});
it("handles compaction events between messages", () => {
const agg = new MessageAggregator(smallConfig({ minChars: 1000 }), onBlock, onPassthrough);
agg.handleEvent(makeMessageStart("msg-1"));
agg.handleEvent(makeMessageUpdate("Text before compaction."));
agg.handleEvent(makeMessageEnd("Text before compaction."));
// Compaction happens
agg.handleEvent(makeCompactionStart());
agg.handleEvent(makeCompactionEnd());
// New message after compaction
agg.handleEvent(makeMessageStart("msg-2"));
agg.handleEvent(makeMessageUpdate("Text after compaction."));
agg.handleEvent(makeMessageEnd("Text after compaction."));
expect(blocks).toHaveLength(2);
const compactionEvents = passedThrough.filter(
(e) => e.type === "compaction_start" || e.type === "compaction_end",
);
expect(compactionEvents).toHaveLength(2);
});
});
describe("reset()", () => {
it("clears all internal state", () => {
const agg = new MessageAggregator(smallConfig({ minChars: 1000 }), onBlock, onPassthrough);
agg.handleEvent(makeMessageStart());
agg.handleEvent(makeMessageUpdate("Some accumulated text."));
// Reset externally
agg.reset();
// Now end the message — buffer should be empty, no block emitted
agg.handleEvent(makeMessageEnd("Some accumulated text."));
// Only the message_end passthrough, no block (buffer was cleared)
expect(onBlock).not.toHaveBeenCalled();
});
});
});

View file

@ -0,0 +1,138 @@
/**
* Message aggregator for buffering streaming agent events
* and emitting complete "block replies" at natural text boundaries.
*
* Used for third-party messaging integrations (Discord, Telegram)
* that cannot consume raw streaming deltas.
*/
import type { AgentEvent } from "@mariozechner/pi-agent-core";
import type { MulticaEvent } from "../agent/events.js";
import { extractText } from "../agent/extract-text.js";
import { BlockChunker, DEFAULT_CHUNKER_CONFIG, type BlockChunkerConfig } from "./block-chunker.js";
/** A completed text block emitted by the aggregator */
export interface BlockReply {
/** 0-based sequence number within the current message */
index: number;
/** The text content of this block */
text: string;
/** Whether this is the final block in the current message */
isFinal: boolean;
}
/** Callback for receiving aggregated block replies */
export type BlockReplyCallback = (block: BlockReply) => void;
/** Callback for pass-through events (tool, compaction, lifecycle) */
export type PassthroughCallback = (event: AgentEvent | MulticaEvent) => void;
export { type BlockChunkerConfig, DEFAULT_CHUNKER_CONFIG };
export class MessageAggregator {
private buffer = "";
private previousText = "";
private blockIndex = 0;
private readonly chunker: BlockChunker;
constructor(
config: BlockChunkerConfig,
private readonly onBlock: BlockReplyCallback,
private readonly onPassthrough: PassthroughCallback,
) {
this.chunker = new BlockChunker(config);
}
/**
* Feed an agent event into the aggregator.
* Text content from message_update events is buffered and emitted as block replies.
* All other events are passed through immediately.
*/
handleEvent(event: AgentEvent | MulticaEvent): void {
switch (event.type) {
case "compaction_start":
case "compaction_end":
case "tool_execution_start":
case "tool_execution_end":
case "tool_execution_update":
this.onPassthrough(event);
return;
case "message_start":
this.resetState();
this.onPassthrough(event);
return;
case "message_update":
this.handleMessageUpdate(event as AgentEvent & { type: "message_update" });
return;
case "message_end":
this.handleMessageEnd(event as AgentEvent & { type: "message_end" });
this.onPassthrough(event);
return;
default:
// agent_start, agent_end, turn_start, turn_end, etc.
this.onPassthrough(event);
return;
}
}
/** Reset all buffering state (e.g. between messages or for external use) */
reset(): void {
this.resetState();
}
private handleMessageUpdate(event: AgentEvent): void {
const message = (event as { message?: unknown }).message;
const currentText = extractText(message as Parameters<typeof extractText>[0]);
this.appendDelta(currentText);
// Try to emit chunks from buffer
let result = this.chunker.tryChunk(this.buffer);
while (result !== null) {
this.emitBlock(result.chunk, false);
this.buffer = result.remainder;
result = this.chunker.tryChunk(this.buffer);
}
}
private handleMessageEnd(event: AgentEvent): void {
const message = (event as { message?: unknown }).message;
const currentText = extractText(message as Parameters<typeof extractText>[0]);
this.appendDelta(currentText);
this.flushBuffer();
}
private appendDelta(currentText: string): void {
// Compute incremental delta (monotonic accumulation)
if (currentText.length <= this.previousText.length) return;
const delta = currentText.slice(this.previousText.length);
this.previousText = currentText;
this.buffer += delta;
}
private flushBuffer(): void {
const result = this.chunker.flush(this.buffer);
if (result) {
this.emitBlock(result.chunk, true);
this.buffer = result.remainder;
}
}
private emitBlock(text: string, isFinal: boolean): void {
this.onBlock({
index: this.blockIndex++,
text,
isFinal,
});
}
private resetState(): void {
this.buffer = "";
this.previousText = "";
this.blockIndex = 0;
}
}