diff --git a/src/hub/block-chunker.test.ts b/src/hub/block-chunker.test.ts new file mode 100644 index 00000000..dfda6cc6 --- /dev/null +++ b/src/hub/block-chunker.test.ts @@ -0,0 +1,254 @@ +import { describe, it, expect } from "vitest"; +import { BlockChunker, type BlockChunkerConfig } from "./block-chunker.js"; + +function cfg(overrides?: Partial): BlockChunkerConfig { + return { + minChars: 50, + maxChars: 200, + breakPreference: "paragraph", + ...overrides, + }; +} + +describe("BlockChunker", () => { + describe("tryChunk", () => { + it("returns null when buffer is below minChars", () => { + const chunker = new BlockChunker(cfg({ minChars: 100 })); + expect(chunker.tryChunk("short text")).toBeNull(); + }); + + it("returns null when buffer is between minChars and maxChars with no break", () => { + const chunker = new BlockChunker(cfg({ minChars: 10, maxChars: 200 })); + // No whitespace, newline, or punctuation — no break possible + const text = "a".repeat(50); + expect(chunker.tryChunk(text)).toBeNull(); + }); + + it("splits at paragraph break (\\n\\n) for paragraph preference", () => { + const chunker = new BlockChunker(cfg({ minChars: 10, maxChars: 200 })); + const text = "Hello world, this is a test.\n\nThis is a new paragraph."; + const result = chunker.tryChunk(text); + expect(result).not.toBeNull(); + expect(result!.chunk).toBe("Hello world, this is a test.\n\n"); + expect(result!.remainder).toBe("This is a new paragraph."); + }); + + it("splits at newline when no paragraph break available", () => { + const chunker = new BlockChunker(cfg({ minChars: 10, maxChars: 200 })); + const text = "Hello world, this is a test.\nThis is the next line."; + const result = chunker.tryChunk(text); + expect(result).not.toBeNull(); + expect(result!.chunk).toBe("Hello world, this is a test.\n"); + expect(result!.remainder).toBe("This is the next line."); + }); + + it("splits at sentence end when no newline available", () => { + const chunker = new BlockChunker(cfg({ minChars: 10, maxChars: 200 })); + const text = "Hello world, this is a test. This is the next sentence."; + const result = chunker.tryChunk(text); + expect(result).not.toBeNull(); + expect(result!.chunk).toBe("Hello world, this is a test. "); + expect(result!.remainder).toBe("This is the next sentence."); + }); + + it("splits at word boundary as last resort", () => { + const chunker = new BlockChunker(cfg({ minChars: 10, maxChars: 200 })); + // No paragraph, newline, or sentence break — only spaces + const text = "word1 word2 word3 word4 word5 word6 word7 word8 word9 word10"; + const result = chunker.tryChunk(text); + expect(result).not.toBeNull(); + // Should split at the last word boundary + expect(result!.chunk.endsWith(" ")).toBe(true); + expect(result!.chunk.length + result!.remainder.length).toBe(text.length); + }); + + it("hard-cuts at maxChars when no natural break found", () => { + const chunker = new BlockChunker(cfg({ minChars: 10, maxChars: 50 })); + const text = "a".repeat(100); // No breaks possible + const result = chunker.tryChunk(text); + expect(result).not.toBeNull(); + expect(result!.chunk).toBe("a".repeat(50)); + expect(result!.remainder).toBe("a".repeat(50)); + }); + + it("prefers break closest to end of search window (larger chunks)", () => { + const chunker = new BlockChunker(cfg({ minChars: 5, maxChars: 100 })); + const text = "aaa\nbbb\nccc\nddd"; + const result = chunker.tryChunk(text); + expect(result).not.toBeNull(); + // Should pick the last newline break (after "ccc\n") + expect(result!.chunk).toBe("aaa\nbbb\nccc\n"); + expect(result!.remainder).toBe("ddd"); + }); + + it("respects breakPreference: newline skips paragraph-only search", () => { + const chunker = new BlockChunker(cfg({ minChars: 10, maxChars: 200, breakPreference: "newline" })); + const text = "Hello world, this is a test.\nSecond line here."; + const result = chunker.tryChunk(text); + expect(result).not.toBeNull(); + expect(result!.chunk).toBe("Hello world, this is a test.\n"); + }); + + it("respects breakPreference: sentence skips paragraph and newline", () => { + const chunker = new BlockChunker(cfg({ minChars: 10, maxChars: 200, breakPreference: "sentence" })); + const text = "First sentence.\nSecond sentence here! And a third."; + const result = chunker.tryChunk(text); + expect(result).not.toBeNull(); + // Sentence preference should find sentence breaks, scanning backwards + // The last sentence break is after "third." — but that includes newlines too + // Actually, sentence breakers scan backwards, so they'll find "third." last + expect(result!.remainder.length).toBeGreaterThan(0); + }); + + it("handles sentence break at end of string by splitting earlier", () => { + const chunker = new BlockChunker(cfg({ minChars: 10, maxChars: 200, breakPreference: "sentence" })); + const text = "First sentence. Second sentence."; + const result = chunker.tryChunk(text); + expect(result).not.toBeNull(); + // The last period is at end of text (not a useful break), so it splits at the first sentence + expect(result!.chunk).toBe("First sentence. "); + expect(result!.remainder).toBe("Second sentence."); + }); + + it("handles exclamation mark as sentence break", () => { + const chunker = new BlockChunker(cfg({ minChars: 10, maxChars: 200, breakPreference: "sentence" })); + const text = "Hello world! This is great! More text here"; + const result = chunker.tryChunk(text); + expect(result).not.toBeNull(); + expect(result!.chunk).toContain("!"); + }); + + it("handles question mark as sentence break", () => { + const chunker = new BlockChunker(cfg({ minChars: 10, maxChars: 200, breakPreference: "sentence" })); + const text = "Is this working? I hope so? Let us see"; + const result = chunker.tryChunk(text); + expect(result).not.toBeNull(); + expect(result!.chunk).toContain("?"); + }); + + it("emits multiple chunks for very long text", () => { + const chunker = new BlockChunker(cfg({ minChars: 10, maxChars: 50 })); + const text = "First paragraph here.\n\nSecond paragraph here.\n\nThird paragraph."; + const chunks: string[] = []; + let buffer = text; + + let result = chunker.tryChunk(buffer); + while (result) { + chunks.push(result.chunk); + buffer = result.remainder; + result = chunker.tryChunk(buffer); + } + // Flush remainder + const flushed = chunker.flush(buffer); + if (flushed) chunks.push(flushed.chunk); + + expect(chunks.length).toBeGreaterThan(1); + expect(chunks.join("")).toBe(text); + }); + }); + + describe("flush", () => { + it("returns entire buffer as chunk", () => { + const chunker = new BlockChunker(cfg()); + const result = chunker.flush("some remaining text"); + expect(result).not.toBeNull(); + expect(result!.chunk).toBe("some remaining text"); + expect(result!.remainder).toBe(""); + }); + + it("returns null for empty buffer", () => { + const chunker = new BlockChunker(cfg()); + expect(chunker.flush("")).toBeNull(); + }); + }); + + describe("fence safety", () => { + it("does not split inside a fenced code block at natural breaks", () => { + const chunker = new BlockChunker(cfg({ minChars: 10, maxChars: 200 })); + const text = "Before code:\n\n```python\ndef foo():\n return 42\n```\n\nAfter code."; + const result = chunker.tryChunk(text); + expect(result).not.toBeNull(); + // Should NOT split at the \n\n inside the code block + // The first valid paragraph break is after "Before code:\n\n" + // or after the closing fence "```\n\n" + const chunk = result!.chunk; + // The chunk should either be "Before code:\n\n" or include the entire fence + // Since we scan backwards, the last paragraph break after ```\n\n should be found first + const fenceOpens = (chunk.match(/```/g) || []).length; + // If chunk contains an opening fence, it must also contain the closing fence + if (fenceOpens > 0) { + expect(fenceOpens % 2).toBe(0); + } + }); + + it("closes and reopens fence on hard cut", () => { + const chunker = new BlockChunker(cfg({ minChars: 10, maxChars: 50 })); + const text = "```python\n" + "x = 1\n".repeat(20) + "```"; + const result = chunker.tryChunk(text); + expect(result).not.toBeNull(); + // Chunk should end with closing fence + expect(result!.chunk.trimEnd()).toMatch(/```$/); + // Remainder should start with reopened fence + expect(result!.remainder).toMatch(/^```python\n/); + }); + + it("preserves language tag when reopening fence", () => { + const chunker = new BlockChunker(cfg({ minChars: 10, maxChars: 60 })); + const text = "```typescript\nconst a = 1;\nconst b = 2;\nconst c = 3;\nconst d = 4;\nconst e = 5;\n```"; + const result = chunker.tryChunk(text); + if (result && result.remainder.startsWith("```")) { + expect(result.remainder).toMatch(/^```typescript\n/); + } + }); + + it("handles ~~~ fence markers", () => { + const chunker = new BlockChunker(cfg({ minChars: 10, maxChars: 50 })); + const text = "~~~python\n" + "x = 1\n".repeat(20) + "~~~"; + const result = chunker.tryChunk(text); + expect(result).not.toBeNull(); + // Chunk should end with closing tilde fence + expect(result!.chunk.trimEnd()).toMatch(/~~~$/); + // Remainder should start with reopened tilde fence + expect(result!.remainder).toMatch(/^~~~python\n/); + }); + + it("handles text before and after a code block", () => { + const chunker = new BlockChunker(cfg({ minChars: 10, maxChars: 500 })); + const text = "Some text before.\n\n```\ncode here\n```\n\nSome text after the code block."; + const result = chunker.tryChunk(text); + expect(result).not.toBeNull(); + // Should split at a paragraph break that is NOT inside a fence + const chunk = result!.chunk; + const remainder = result!.remainder; + expect(chunk + remainder).toBe(text); + }); + + it("does not treat fence with info string as closing fence", () => { + const chunker = new BlockChunker(cfg({ minChars: 10, maxChars: 500 })); + // The second ```python should NOT close the first fence (CommonMark: closing fence has no info string) + const text = "Before.\n\n```python\ncode line 1\n```python\ncode line 2\n```\n\nAfter text here."; + const result = chunker.tryChunk(text); + expect(result).not.toBeNull(); + const chunk = result!.chunk; + // The split should not land between ```python and ```python (inside fence) + // It should either be before the fence or after the closing ``` + const fenceOpens = (chunk.match(/```python/g) || []).length; + const fenceCloses = (chunk.match(/^```$/gm) || []).length; + if (fenceOpens > 0) { + // If chunk includes opening fences, it must include the real close + expect(fenceCloses).toBeGreaterThanOrEqual(1); + } + }); + + it("handles multiple sequential code blocks", () => { + const chunker = new BlockChunker(cfg({ minChars: 10, maxChars: 500 })); + const text = "```js\nfoo()\n```\n\n```py\nbar()\n```\n\nEnd."; + const result = chunker.tryChunk(text); + expect(result).not.toBeNull(); + // Should be able to split between the two code blocks + const chunk = result!.chunk; + const remainder = result!.remainder; + expect(chunk + remainder).toBe(text); + }); + }); +}); diff --git a/src/hub/block-chunker.ts b/src/hub/block-chunker.ts new file mode 100644 index 00000000..6daadad0 --- /dev/null +++ b/src/hub/block-chunker.ts @@ -0,0 +1,240 @@ +/** + * Block chunker for splitting streaming text at natural boundaries. + * + * Used by MessageAggregator to produce appropriately-sized text blocks + * for messaging platforms that cannot consume raw streaming deltas. + */ + +export interface BlockChunkerConfig { + /** Minimum text chars before attempting a break (default: 200) */ + minChars: number; + /** Hard maximum chunk size — will force-break here (default: 2000) */ + maxChars: number; + /** Preferred break type priority */ + breakPreference: "paragraph" | "newline" | "sentence"; +} + +export const DEFAULT_CHUNKER_CONFIG: BlockChunkerConfig = { + minChars: 200, + maxChars: 2000, + breakPreference: "paragraph", +}; + +export interface ChunkResult { + /** Text to emit as a block */ + chunk: string; + /** Remaining text to keep in buffer */ + remainder: string; +} + +interface FenceInfo { + marker: string; // "```" or "~~~" + lang: string; // language identifier, e.g. "typescript" +} + +/** + * Detect if `text` has an unclosed fenced code block at the given position. + * Scans line-by-line tracking fence open/close pairs. + */ +function detectFenceAt(text: string, upTo: number): FenceInfo | null { + const region = text.slice(0, upTo); + const lines = region.split("\n"); + let openFence: FenceInfo | null = null; + + for (const line of lines) { + const match = line.match(/^(`{3,}|~{3,})(\S*)\s*$/); + if (!match) continue; + const marker = match[1]; + const lang = match[2] ?? ""; + const markerChar = marker[0]; + + if (openFence === null) { + // Opening a new fence + openFence = { marker, lang }; + } else if (markerChar === openFence.marker[0] && marker.length >= openFence.marker.length && lang === "") { + // Closing the current fence (same char, at least as many chars, no info string per CommonMark) + openFence = null; + } + // Otherwise: different char or shorter marker, not a close — ignore + } + + return openFence; +} + +/** + * Check if position is inside an unclosed fenced code block. + */ +function isInsideFence(text: string, position: number): boolean { + return detectFenceAt(text, position) !== null; +} + +/** + * If a chunk ends inside an open fence, close it in the chunk + * and reopen it in the remainder. + */ +function applyFenceSafety(chunk: string, remainder: string): { chunk: string; remainder: string } { + const fence = detectFenceAt(chunk, chunk.length); + if (!fence) return { chunk, remainder }; + + // Close the fence in the chunk + const closedChunk = chunk.endsWith("\n") + ? chunk + fence.marker + "\n" + : chunk + "\n" + fence.marker + "\n"; + + // Reopen the fence in the remainder + const reopenTag = fence.lang ? `${fence.marker}${fence.lang}` : fence.marker; + const reopenedRemainder = reopenTag + "\n" + remainder; + + return { chunk: closedChunk, remainder: reopenedRemainder }; +} + +export class BlockChunker { + private readonly minChars: number; + private readonly maxChars: number; + private readonly breakPreference: "paragraph" | "newline" | "sentence"; + + constructor(config: BlockChunkerConfig) { + this.minChars = config.minChars; + this.maxChars = config.maxChars; + this.breakPreference = config.breakPreference; + } + + /** + * Attempt to extract a chunk from the buffer. + * Returns null if buffer is below minChars or no suitable break found + * (unless buffer exceeds maxChars, in which case a hard break is forced). + */ + tryChunk(buffer: string): ChunkResult | null { + if (buffer.length < this.minChars) return null; + + // Search window: minChars..min(buffer.length, maxChars) + const searchEnd = Math.min(buffer.length, this.maxChars); + const breakIndex = this.findBreakIndex(buffer, this.minChars, searchEnd); + + if (breakIndex !== -1) { + return { + chunk: buffer.slice(0, breakIndex), + remainder: buffer.slice(breakIndex), + }; + } + + // No break found within search window + if (buffer.length >= this.maxChars) { + // Hard cut at maxChars with fence safety + const raw = { + chunk: buffer.slice(0, this.maxChars), + remainder: buffer.slice(this.maxChars), + }; + return applyFenceSafety(raw.chunk, raw.remainder); + } + + // Buffer is between minChars and maxChars with no break — wait for more text + return null; + } + + /** + * Force-flush: return entire buffer as a chunk. + */ + flush(buffer: string): ChunkResult | null { + if (buffer.length === 0) return null; + return { chunk: buffer, remainder: "" }; + } + + /** + * Find the best break index in the buffer within [searchStart, searchEnd). + * Scans backwards from searchEnd to prefer later breaks (larger chunks). + * Returns the index AFTER the break character(s) — i.e., the start of the remainder. + * Returns -1 if no suitable break found. + */ + private findBreakIndex(buffer: string, searchStart: number, searchEnd: number): number { + const breakers = this.getBreakers(); + const bufLen = buffer.length; + + for (const breaker of breakers) { + const index = breaker(buffer, searchStart, searchEnd, bufLen); + if (index !== -1 && !isInsideFence(buffer, index)) { + return index; + } + } + + return -1; + } + + /** + * Get break functions in priority order based on breakPreference. + */ + private getBreakers(): Array<(buffer: string, start: number, end: number, bufLen: number) => number> { + switch (this.breakPreference) { + case "paragraph": + return [findParagraphBreak, findNewlineBreak, findSentenceBreak, findWordBreak]; + case "newline": + return [findNewlineBreak, findSentenceBreak, findWordBreak]; + case "sentence": + return [findSentenceBreak, findWordBreak]; + } + } +} + +/** + * Find a paragraph break (\n\n) scanning backwards from end. + * Returns the index after the break (start of next paragraph). + */ +function findParagraphBreak(buffer: string, start: number, end: number, bufLen: number): number { + for (let i = end - 1; i >= start + 1; i--) { + if (buffer[i] === "\n" && buffer[i - 1] === "\n") { + const idx = i + 1; + if (idx < bufLen) return idx; + } + } + return -1; +} + +/** + * Find a newline break (\n) scanning backwards from end. + * Returns the index after the newline. + */ +function findNewlineBreak(buffer: string, start: number, end: number, bufLen: number): number { + for (let i = end - 1; i >= start; i--) { + if (buffer[i] === "\n") { + const idx = i + 1; + if (idx < bufLen) return idx; + } + } + return -1; +} + +/** + * Find a sentence break (.!? followed by whitespace or end of string). + * Returns the index after the whitespace following the punctuation. + */ +function findSentenceBreak(buffer: string, start: number, end: number, bufLen: number): number { + for (let i = end - 1; i >= start; i--) { + const ch = buffer[i]; + if (ch === "." || ch === "!" || ch === "?") { + const next = i + 1; + if (next < bufLen && /\s/.test(buffer[next])) { + // Break after the whitespace + const idx = next + 1; + if (idx < bufLen) return idx; + } else if (next >= bufLen) { + // Punctuation at end of text — not a useful split point, skip + continue; + } + } + } + return -1; +} + +/** + * Find a word boundary (whitespace) scanning backwards from end. + * Returns the index after the whitespace character. + */ +function findWordBreak(buffer: string, start: number, end: number, bufLen: number): number { + for (let i = end - 1; i >= start; i--) { + if (/\s/.test(buffer[i])) { + const idx = i + 1; + if (idx < bufLen) return idx; + } + } + return -1; +} diff --git a/src/hub/message-aggregator.test.ts b/src/hub/message-aggregator.test.ts new file mode 100644 index 00000000..52516648 --- /dev/null +++ b/src/hub/message-aggregator.test.ts @@ -0,0 +1,403 @@ +import { describe, it, expect, vi, beforeEach } from "vitest"; +import { MessageAggregator, type BlockReply, type BlockChunkerConfig } from "./message-aggregator.js"; +import type { AgentEvent } from "@mariozechner/pi-agent-core"; +import type { MulticaEvent } from "../agent/events.js"; + +// --- Test helpers --- + +function makeMessageStart(id = "msg-1"): AgentEvent { + return { + type: "message_start", + message: { role: "assistant", content: [], id }, + } as unknown as AgentEvent; +} + +function makeMessageUpdate(fullText: string): AgentEvent { + return { + type: "message_update", + message: { + role: "assistant", + content: [{ type: "text", text: fullText }], + }, + } as unknown as AgentEvent; +} + +function makeMessageUpdateWithThinking(text: string, thinking: string): AgentEvent { + return { + type: "message_update", + message: { + role: "assistant", + content: [ + { type: "thinking", thinking }, + { type: "text", text }, + ], + }, + } as unknown as AgentEvent; +} + +function makeMessageEnd(fullText: string): AgentEvent { + return { + type: "message_end", + message: { + role: "assistant", + content: [{ type: "text", text: fullText }], + stopReason: "end_turn", + }, + } as unknown as AgentEvent; +} + +function makeToolStart(name = "exec"): AgentEvent { + return { + type: "tool_execution_start", + toolName: name, + args: {}, + toolCallId: "tool-1", + } as unknown as AgentEvent; +} + +function makeToolEnd(name = "exec"): AgentEvent { + return { + type: "tool_execution_end", + toolName: name, + toolCallId: "tool-1", + result: {}, + isError: false, + } as unknown as AgentEvent; +} + +function makeCompactionStart(): MulticaEvent { + return { type: "compaction_start" }; +} + +function makeCompactionEnd(): MulticaEvent { + return { type: "compaction_end", removed: 5, kept: 10, reason: "tokens" }; +} + +function smallConfig(overrides?: Partial): BlockChunkerConfig { + return { + minChars: 20, + maxChars: 100, + breakPreference: "paragraph", + ...overrides, + }; +} + +describe("MessageAggregator", () => { + let blocks: BlockReply[]; + let passedThrough: Array; + let onBlock: ReturnType; + let onPassthrough: ReturnType; + + beforeEach(() => { + blocks = []; + passedThrough = []; + onBlock = vi.fn((block: BlockReply) => blocks.push(block)); + onPassthrough = vi.fn((event: AgentEvent | MulticaEvent) => passedThrough.push(event)); + }); + + describe("event routing", () => { + it("passes through tool_execution_start immediately", () => { + const agg = new MessageAggregator(smallConfig(), onBlock, onPassthrough); + const event = makeToolStart(); + agg.handleEvent(event); + expect(onPassthrough).toHaveBeenCalledWith(event); + expect(onBlock).not.toHaveBeenCalled(); + }); + + it("passes through tool_execution_end immediately", () => { + const agg = new MessageAggregator(smallConfig(), onBlock, onPassthrough); + const event = makeToolEnd(); + agg.handleEvent(event); + expect(onPassthrough).toHaveBeenCalledWith(event); + expect(onBlock).not.toHaveBeenCalled(); + }); + + it("passes through tool_execution_update immediately", () => { + const agg = new MessageAggregator(smallConfig(), onBlock, onPassthrough); + const event = { type: "tool_execution_update", toolCallId: "tool-1", content: "output" } as unknown as AgentEvent; + agg.handleEvent(event); + expect(onPassthrough).toHaveBeenCalledWith(event); + expect(onBlock).not.toHaveBeenCalled(); + }); + + it("passes through compaction_start immediately", () => { + const agg = new MessageAggregator(smallConfig(), onBlock, onPassthrough); + const event = makeCompactionStart(); + agg.handleEvent(event); + expect(onPassthrough).toHaveBeenCalledWith(event); + }); + + it("passes through compaction_end immediately", () => { + const agg = new MessageAggregator(smallConfig(), onBlock, onPassthrough); + const event = makeCompactionEnd(); + agg.handleEvent(event); + expect(onPassthrough).toHaveBeenCalledWith(event); + }); + + it("passes through message_start immediately and resets state", () => { + const agg = new MessageAggregator(smallConfig(), onBlock, onPassthrough); + const event = makeMessageStart(); + agg.handleEvent(event); + expect(onPassthrough).toHaveBeenCalledWith(event); + expect(onBlock).not.toHaveBeenCalled(); + }); + + it("passes through message_end after flushing buffer", () => { + const agg = new MessageAggregator(smallConfig(), onBlock, onPassthrough); + agg.handleEvent(makeMessageStart()); + + // Add some text that is below minChars + agg.handleEvent(makeMessageUpdate("Hello world")); + + // End the message — should flush the buffer as a block, then pass through message_end + const endEvent = makeMessageEnd("Hello world"); + agg.handleEvent(endEvent); + + expect(blocks).toHaveLength(1); + expect(blocks[0].text).toBe("Hello world"); + expect(blocks[0].isFinal).toBe(true); + + // message_start + message_end both passed through + const passthroughTypes = passedThrough.map((e) => e.type); + expect(passthroughTypes).toContain("message_start"); + expect(passthroughTypes).toContain("message_end"); + }); + }); + + describe("text delta computation", () => { + it("computes delta from accumulated text in message_update", () => { + const agg = new MessageAggregator(smallConfig({ minChars: 1000 }), onBlock, onPassthrough); + agg.handleEvent(makeMessageStart()); + + // First update: "Hello" + agg.handleEvent(makeMessageUpdate("Hello")); + // Second update: "Hello world" (full text, not delta) + agg.handleEvent(makeMessageUpdate("Hello world")); + + // Flush to see accumulated text + agg.handleEvent(makeMessageEnd("Hello world")); + + expect(blocks).toHaveLength(1); + expect(blocks[0].text).toBe("Hello world"); + }); + + it("ignores ThinkingContent blocks, only extracts text", () => { + const agg = new MessageAggregator(smallConfig({ minChars: 1000 }), onBlock, onPassthrough); + agg.handleEvent(makeMessageStart()); + + agg.handleEvent(makeMessageUpdateWithThinking("visible text", "internal thinking")); + + agg.handleEvent(makeMessageEnd("visible text")); + + expect(blocks).toHaveLength(1); + expect(blocks[0].text).toBe("visible text"); + expect(blocks[0].text).not.toContain("internal thinking"); + }); + + it("handles empty delta (duplicate event) gracefully", () => { + const agg = new MessageAggregator(smallConfig({ minChars: 1000 }), onBlock, onPassthrough); + agg.handleEvent(makeMessageStart()); + + agg.handleEvent(makeMessageUpdate("Hello")); + agg.handleEvent(makeMessageUpdate("Hello")); // duplicate + agg.handleEvent(makeMessageUpdate("Hello")); // duplicate again + + agg.handleEvent(makeMessageEnd("Hello")); + + expect(blocks).toHaveLength(1); + expect(blocks[0].text).toBe("Hello"); + }); + + it("handles monotonically growing text correctly", () => { + const agg = new MessageAggregator(smallConfig({ minChars: 1000 }), onBlock, onPassthrough); + agg.handleEvent(makeMessageStart()); + + agg.handleEvent(makeMessageUpdate("H")); + agg.handleEvent(makeMessageUpdate("He")); + agg.handleEvent(makeMessageUpdate("Hel")); + agg.handleEvent(makeMessageUpdate("Hell")); + agg.handleEvent(makeMessageUpdate("Hello")); + + agg.handleEvent(makeMessageEnd("Hello")); + + expect(blocks).toHaveLength(1); + expect(blocks[0].text).toBe("Hello"); + }); + }); + + describe("block emission", () => { + it("does not emit block when buffer is below minChars", () => { + const agg = new MessageAggregator(smallConfig({ minChars: 100 }), onBlock, onPassthrough); + agg.handleEvent(makeMessageStart()); + agg.handleEvent(makeMessageUpdate("Short text.")); + expect(onBlock).not.toHaveBeenCalled(); + }); + + it("emits block when buffer reaches paragraph break after minChars", () => { + const agg = new MessageAggregator(smallConfig({ minChars: 20 }), onBlock, onPassthrough); + agg.handleEvent(makeMessageStart()); + + const text = "This is the first paragraph with enough chars.\n\nSecond paragraph starts here."; + agg.handleEvent(makeMessageUpdate(text)); + + expect(blocks.length).toBeGreaterThanOrEqual(1); + expect(blocks[0].text).toContain("first paragraph"); + expect(blocks[0].isFinal).toBe(false); + }); + + it("emits multiple blocks for very long text", () => { + const agg = new MessageAggregator(smallConfig({ minChars: 20, maxChars: 50 }), onBlock, onPassthrough); + agg.handleEvent(makeMessageStart()); + + const text = "First paragraph content.\n\nSecond paragraph content.\n\nThird paragraph content."; + agg.handleEvent(makeMessageUpdate(text)); + + expect(blocks.length).toBeGreaterThanOrEqual(2); + // All blocks except the last should have isFinal=false + for (let i = 0; i < blocks.length; i++) { + expect(blocks[i].isFinal).toBe(false); + } + }); + + it("flushes remaining buffer on message_end with isFinal=true", () => { + const agg = new MessageAggregator(smallConfig({ minChars: 20, maxChars: 100 }), onBlock, onPassthrough); + agg.handleEvent(makeMessageStart()); + + const text = "First paragraph here.\n\nSmall tail."; + agg.handleEvent(makeMessageUpdate(text)); + agg.handleEvent(makeMessageEnd(text)); + + const finalBlock = blocks[blocks.length - 1]; + expect(finalBlock.isFinal).toBe(true); + }); + + it("increments block index for each emitted block", () => { + const agg = new MessageAggregator(smallConfig({ minChars: 20, maxChars: 50 }), onBlock, onPassthrough); + agg.handleEvent(makeMessageStart()); + + const text = "First paragraph content.\n\nSecond paragraph content.\n\nThird paragraph."; + agg.handleEvent(makeMessageUpdate(text)); + agg.handleEvent(makeMessageEnd(text)); + + for (let i = 0; i < blocks.length; i++) { + expect(blocks[i].index).toBe(i); + } + }); + + it("resets state on message_start", () => { + const agg = new MessageAggregator(smallConfig({ minChars: 1000 }), onBlock, onPassthrough); + + // First message cycle + agg.handleEvent(makeMessageStart("msg-1")); + agg.handleEvent(makeMessageUpdate("First message text.")); + agg.handleEvent(makeMessageEnd("First message text.")); + + expect(blocks).toHaveLength(1); + expect(blocks[0].index).toBe(0); + + // Second message cycle — index should reset + agg.handleEvent(makeMessageStart("msg-2")); + agg.handleEvent(makeMessageUpdate("Second message text.")); + agg.handleEvent(makeMessageEnd("Second message text.")); + + expect(blocks).toHaveLength(2); + expect(blocks[1].index).toBe(0); // Reset after new message_start + }); + + it("does not emit empty block on message_end with no content", () => { + const agg = new MessageAggregator(smallConfig(), onBlock, onPassthrough); + agg.handleEvent(makeMessageStart()); + agg.handleEvent(makeMessageEnd("")); + + expect(onBlock).not.toHaveBeenCalled(); + }); + }); + + describe("interleaved events", () => { + it("handles tool events interleaved with text updates", () => { + const agg = new MessageAggregator(smallConfig({ minChars: 1000 }), onBlock, onPassthrough); + agg.handleEvent(makeMessageStart()); + + agg.handleEvent(makeMessageUpdate("Before tool call.")); + agg.handleEvent(makeToolStart()); + agg.handleEvent(makeToolEnd()); + // After tool execution, the model continues generating text + // Note: message_update accumulates full text, so it includes everything + agg.handleEvent(makeMessageUpdate("Before tool call. After tool result.")); + + agg.handleEvent(makeMessageEnd("Before tool call. After tool result.")); + + // Tool events should have passed through + const toolEvents = passedThrough.filter( + (e) => e.type === "tool_execution_start" || e.type === "tool_execution_end", + ); + expect(toolEvents).toHaveLength(2); + + // Final block should contain all text + expect(blocks).toHaveLength(1); + expect(blocks[0].text).toBe("Before tool call. After tool result."); + }); + + it("handles multiple message cycles (reset between)", () => { + const agg = new MessageAggregator(smallConfig({ minChars: 1000 }), onBlock, onPassthrough); + + // Cycle 1 + agg.handleEvent(makeMessageStart("msg-1")); + agg.handleEvent(makeMessageUpdate("First response.")); + agg.handleEvent(makeMessageEnd("First response.")); + + // Cycle 2 + agg.handleEvent(makeMessageStart("msg-2")); + agg.handleEvent(makeMessageUpdate("Second response.")); + agg.handleEvent(makeMessageEnd("Second response.")); + + expect(blocks).toHaveLength(2); + expect(blocks[0].text).toBe("First response."); + expect(blocks[1].text).toBe("Second response."); + // Both should be final (flushed on message_end) + expect(blocks[0].isFinal).toBe(true); + expect(blocks[1].isFinal).toBe(true); + }); + + it("handles compaction events between messages", () => { + const agg = new MessageAggregator(smallConfig({ minChars: 1000 }), onBlock, onPassthrough); + + agg.handleEvent(makeMessageStart("msg-1")); + agg.handleEvent(makeMessageUpdate("Text before compaction.")); + agg.handleEvent(makeMessageEnd("Text before compaction.")); + + // Compaction happens + agg.handleEvent(makeCompactionStart()); + agg.handleEvent(makeCompactionEnd()); + + // New message after compaction + agg.handleEvent(makeMessageStart("msg-2")); + agg.handleEvent(makeMessageUpdate("Text after compaction.")); + agg.handleEvent(makeMessageEnd("Text after compaction.")); + + expect(blocks).toHaveLength(2); + const compactionEvents = passedThrough.filter( + (e) => e.type === "compaction_start" || e.type === "compaction_end", + ); + expect(compactionEvents).toHaveLength(2); + }); + }); + + describe("reset()", () => { + it("clears all internal state", () => { + const agg = new MessageAggregator(smallConfig({ minChars: 1000 }), onBlock, onPassthrough); + + agg.handleEvent(makeMessageStart()); + agg.handleEvent(makeMessageUpdate("Some accumulated text.")); + + // Reset externally + agg.reset(); + + // Now end the message — buffer should be empty, no block emitted + agg.handleEvent(makeMessageEnd("Some accumulated text.")); + + // Only the message_end passthrough, no block (buffer was cleared) + expect(onBlock).not.toHaveBeenCalled(); + }); + }); +}); diff --git a/src/hub/message-aggregator.ts b/src/hub/message-aggregator.ts new file mode 100644 index 00000000..4f4b3738 --- /dev/null +++ b/src/hub/message-aggregator.ts @@ -0,0 +1,138 @@ +/** + * Message aggregator for buffering streaming agent events + * and emitting complete "block replies" at natural text boundaries. + * + * Used for third-party messaging integrations (Discord, Telegram) + * that cannot consume raw streaming deltas. + */ + +import type { AgentEvent } from "@mariozechner/pi-agent-core"; +import type { MulticaEvent } from "../agent/events.js"; +import { extractText } from "../agent/extract-text.js"; +import { BlockChunker, DEFAULT_CHUNKER_CONFIG, type BlockChunkerConfig } from "./block-chunker.js"; + +/** A completed text block emitted by the aggregator */ +export interface BlockReply { + /** 0-based sequence number within the current message */ + index: number; + /** The text content of this block */ + text: string; + /** Whether this is the final block in the current message */ + isFinal: boolean; +} + +/** Callback for receiving aggregated block replies */ +export type BlockReplyCallback = (block: BlockReply) => void; + +/** Callback for pass-through events (tool, compaction, lifecycle) */ +export type PassthroughCallback = (event: AgentEvent | MulticaEvent) => void; + +export { type BlockChunkerConfig, DEFAULT_CHUNKER_CONFIG }; + +export class MessageAggregator { + private buffer = ""; + private previousText = ""; + private blockIndex = 0; + private readonly chunker: BlockChunker; + + constructor( + config: BlockChunkerConfig, + private readonly onBlock: BlockReplyCallback, + private readonly onPassthrough: PassthroughCallback, + ) { + this.chunker = new BlockChunker(config); + } + + /** + * Feed an agent event into the aggregator. + * Text content from message_update events is buffered and emitted as block replies. + * All other events are passed through immediately. + */ + handleEvent(event: AgentEvent | MulticaEvent): void { + switch (event.type) { + case "compaction_start": + case "compaction_end": + case "tool_execution_start": + case "tool_execution_end": + case "tool_execution_update": + this.onPassthrough(event); + return; + + case "message_start": + this.resetState(); + this.onPassthrough(event); + return; + + case "message_update": + this.handleMessageUpdate(event as AgentEvent & { type: "message_update" }); + return; + + case "message_end": + this.handleMessageEnd(event as AgentEvent & { type: "message_end" }); + this.onPassthrough(event); + return; + + default: + // agent_start, agent_end, turn_start, turn_end, etc. + this.onPassthrough(event); + return; + } + } + + /** Reset all buffering state (e.g. between messages or for external use) */ + reset(): void { + this.resetState(); + } + + private handleMessageUpdate(event: AgentEvent): void { + const message = (event as { message?: unknown }).message; + const currentText = extractText(message as Parameters[0]); + this.appendDelta(currentText); + + // Try to emit chunks from buffer + let result = this.chunker.tryChunk(this.buffer); + while (result !== null) { + this.emitBlock(result.chunk, false); + this.buffer = result.remainder; + result = this.chunker.tryChunk(this.buffer); + } + } + + private handleMessageEnd(event: AgentEvent): void { + const message = (event as { message?: unknown }).message; + const currentText = extractText(message as Parameters[0]); + this.appendDelta(currentText); + this.flushBuffer(); + } + + private appendDelta(currentText: string): void { + // Compute incremental delta (monotonic accumulation) + if (currentText.length <= this.previousText.length) return; + const delta = currentText.slice(this.previousText.length); + + this.previousText = currentText; + this.buffer += delta; + } + + private flushBuffer(): void { + const result = this.chunker.flush(this.buffer); + if (result) { + this.emitBlock(result.chunk, true); + this.buffer = result.remainder; + } + } + + private emitBlock(text: string, isFinal: boolean): void { + this.onBlock({ + index: this.blockIndex++, + text, + isFinal, + }); + } + + private resetState(): void { + this.buffer = ""; + this.previousText = ""; + this.blockIndex = 0; + } +}