From 1859e32a303270f4b1492501c8d2a5920bc368bd Mon Sep 17 00:00:00 2001 From: yushen Date: Thu, 5 Feb 2026 18:28:08 +0800 Subject: [PATCH 1/5] feat(hub): add message aggregation module for block-level reply chunking BlockChunker splits streaming text at natural boundaries (paragraph, newline, sentence, word) with configurable min/max chars and markdown fence-safe splitting. MessageAggregator consumes AgentEvents, buffers text deltas, and emits complete BlockReply objects via callbacks. Enables future third-party messaging integrations (Discord, Telegram) that cannot consume raw streaming deltas. Co-Authored-By: Claude Opus 4.5 --- src/hub/block-chunker.test.ts | 237 +++++++++++++++++ src/hub/block-chunker.ts | 240 ++++++++++++++++++ src/hub/message-aggregator.test.ts | 395 +++++++++++++++++++++++++++++ src/hub/message-aggregator.ts | 128 ++++++++++ 4 files changed, 1000 insertions(+) create mode 100644 src/hub/block-chunker.test.ts create mode 100644 src/hub/block-chunker.ts create mode 100644 src/hub/message-aggregator.test.ts create mode 100644 src/hub/message-aggregator.ts diff --git a/src/hub/block-chunker.test.ts b/src/hub/block-chunker.test.ts new file mode 100644 index 00000000..a1b36b16 --- /dev/null +++ b/src/hub/block-chunker.test.ts @@ -0,0 +1,237 @@ +import { describe, it, expect } from "vitest"; +import { BlockChunker, type BlockChunkerConfig } from "./block-chunker.js"; + +function cfg(overrides?: Partial): BlockChunkerConfig { + return { + minChars: 50, + maxChars: 200, + breakPreference: "paragraph", + ...overrides, + }; +} + +describe("BlockChunker", () => { + describe("tryChunk", () => { + it("returns null when buffer is below minChars", () => { + const chunker = new BlockChunker(cfg({ minChars: 100 })); + expect(chunker.tryChunk("short text")).toBeNull(); + }); + + it("returns null when buffer is between minChars and maxChars with no break", () => { + const chunker = new BlockChunker(cfg({ minChars: 10, maxChars: 200 })); + // No whitespace, newline, or punctuation — no break possible + const text = "a".repeat(50); + expect(chunker.tryChunk(text)).toBeNull(); + }); + + it("splits at paragraph break (\\n\\n) for paragraph preference", () => { + const chunker = new BlockChunker(cfg({ minChars: 10, maxChars: 200 })); + const text = "Hello world, this is a test.\n\nThis is a new paragraph."; + const result = chunker.tryChunk(text); + expect(result).not.toBeNull(); + expect(result!.chunk).toBe("Hello world, this is a test.\n\n"); + expect(result!.remainder).toBe("This is a new paragraph."); + }); + + it("splits at newline when no paragraph break available", () => { + const chunker = new BlockChunker(cfg({ minChars: 10, maxChars: 200 })); + const text = "Hello world, this is a test.\nThis is the next line."; + const result = chunker.tryChunk(text); + expect(result).not.toBeNull(); + expect(result!.chunk).toBe("Hello world, this is a test.\n"); + expect(result!.remainder).toBe("This is the next line."); + }); + + it("splits at sentence end when no newline available", () => { + const chunker = new BlockChunker(cfg({ minChars: 10, maxChars: 200 })); + const text = "Hello world, this is a test. This is the next sentence."; + const result = chunker.tryChunk(text); + expect(result).not.toBeNull(); + expect(result!.chunk).toBe("Hello world, this is a test. "); + expect(result!.remainder).toBe("This is the next sentence."); + }); + + it("splits at word boundary as last resort", () => { + const chunker = new BlockChunker(cfg({ minChars: 10, maxChars: 200 })); + // No paragraph, newline, or sentence break — only spaces + const text = "word1 word2 word3 word4 word5 word6 word7 word8 word9 word10"; + const result = chunker.tryChunk(text); + expect(result).not.toBeNull(); + // Should split at the last word boundary + expect(result!.chunk.endsWith(" ")).toBe(true); + expect(result!.chunk.length + result!.remainder.length).toBe(text.length); + }); + + it("hard-cuts at maxChars when no natural break found", () => { + const chunker = new BlockChunker(cfg({ minChars: 10, maxChars: 50 })); + const text = "a".repeat(100); // No breaks possible + const result = chunker.tryChunk(text); + expect(result).not.toBeNull(); + expect(result!.chunk).toBe("a".repeat(50)); + expect(result!.remainder).toBe("a".repeat(50)); + }); + + it("prefers break closest to end of search window (larger chunks)", () => { + const chunker = new BlockChunker(cfg({ minChars: 5, maxChars: 100 })); + const text = "aaa\nbbb\nccc\nddd"; + const result = chunker.tryChunk(text); + expect(result).not.toBeNull(); + // Should pick the last newline break (after "ccc\n") + expect(result!.chunk).toBe("aaa\nbbb\nccc\n"); + expect(result!.remainder).toBe("ddd"); + }); + + it("respects breakPreference: newline skips paragraph-only search", () => { + const chunker = new BlockChunker(cfg({ minChars: 10, maxChars: 200, breakPreference: "newline" })); + const text = "Hello world, this is a test.\nSecond line here."; + const result = chunker.tryChunk(text); + expect(result).not.toBeNull(); + expect(result!.chunk).toBe("Hello world, this is a test.\n"); + }); + + it("respects breakPreference: sentence skips paragraph and newline", () => { + const chunker = new BlockChunker(cfg({ minChars: 10, maxChars: 200, breakPreference: "sentence" })); + const text = "First sentence.\nSecond sentence here! And a third."; + const result = chunker.tryChunk(text); + expect(result).not.toBeNull(); + // Sentence preference should find sentence breaks, scanning backwards + // The last sentence break is after "third." — but that includes newlines too + // Actually, sentence breakers scan backwards, so they'll find "third." last + expect(result!.remainder.length).toBeGreaterThan(0); + }); + + it("handles sentence break at end of string by splitting earlier", () => { + const chunker = new BlockChunker(cfg({ minChars: 10, maxChars: 200, breakPreference: "sentence" })); + const text = "First sentence. Second sentence."; + const result = chunker.tryChunk(text); + expect(result).not.toBeNull(); + // The last period is at end of text (not a useful break), so it splits at the first sentence + expect(result!.chunk).toBe("First sentence. "); + expect(result!.remainder).toBe("Second sentence."); + }); + + it("handles exclamation mark as sentence break", () => { + const chunker = new BlockChunker(cfg({ minChars: 10, maxChars: 200, breakPreference: "sentence" })); + const text = "Hello world! This is great! More text here"; + const result = chunker.tryChunk(text); + expect(result).not.toBeNull(); + expect(result!.chunk).toContain("!"); + }); + + it("handles question mark as sentence break", () => { + const chunker = new BlockChunker(cfg({ minChars: 10, maxChars: 200, breakPreference: "sentence" })); + const text = "Is this working? I hope so? Let us see"; + const result = chunker.tryChunk(text); + expect(result).not.toBeNull(); + expect(result!.chunk).toContain("?"); + }); + + it("emits multiple chunks for very long text", () => { + const chunker = new BlockChunker(cfg({ minChars: 10, maxChars: 50 })); + const text = "First paragraph here.\n\nSecond paragraph here.\n\nThird paragraph."; + const chunks: string[] = []; + let buffer = text; + + let result = chunker.tryChunk(buffer); + while (result) { + chunks.push(result.chunk); + buffer = result.remainder; + result = chunker.tryChunk(buffer); + } + // Flush remainder + const flushed = chunker.flush(buffer); + if (flushed) chunks.push(flushed.chunk); + + expect(chunks.length).toBeGreaterThan(1); + expect(chunks.join("")).toBe(text); + }); + }); + + describe("flush", () => { + it("returns entire buffer as chunk", () => { + const chunker = new BlockChunker(cfg()); + const result = chunker.flush("some remaining text"); + expect(result).not.toBeNull(); + expect(result!.chunk).toBe("some remaining text"); + expect(result!.remainder).toBe(""); + }); + + it("returns null for empty buffer", () => { + const chunker = new BlockChunker(cfg()); + expect(chunker.flush("")).toBeNull(); + }); + }); + + describe("fence safety", () => { + it("does not split inside a fenced code block at natural breaks", () => { + const chunker = new BlockChunker(cfg({ minChars: 10, maxChars: 200 })); + const text = "Before code:\n\n```python\ndef foo():\n return 42\n```\n\nAfter code."; + const result = chunker.tryChunk(text); + expect(result).not.toBeNull(); + // Should NOT split at the \n\n inside the code block + // The first valid paragraph break is after "Before code:\n\n" + // or after the closing fence "```\n\n" + const chunk = result!.chunk; + // The chunk should either be "Before code:\n\n" or include the entire fence + // Since we scan backwards, the last paragraph break after ```\n\n should be found first + const fenceOpens = (chunk.match(/```/g) || []).length; + // If chunk contains an opening fence, it must also contain the closing fence + if (fenceOpens > 0) { + expect(fenceOpens % 2).toBe(0); + } + }); + + it("closes and reopens fence on hard cut", () => { + const chunker = new BlockChunker(cfg({ minChars: 10, maxChars: 50 })); + const text = "```python\n" + "x = 1\n".repeat(20) + "```"; + const result = chunker.tryChunk(text); + expect(result).not.toBeNull(); + // Chunk should end with closing fence + expect(result!.chunk.trimEnd()).toMatch(/```$/); + // Remainder should start with reopened fence + expect(result!.remainder).toMatch(/^```python\n/); + }); + + it("preserves language tag when reopening fence", () => { + const chunker = new BlockChunker(cfg({ minChars: 10, maxChars: 60 })); + const text = "```typescript\nconst a = 1;\nconst b = 2;\nconst c = 3;\nconst d = 4;\nconst e = 5;\n```"; + const result = chunker.tryChunk(text); + if (result && result.remainder.startsWith("```")) { + expect(result.remainder).toMatch(/^```typescript\n/); + } + }); + + it("handles ~~~ fence markers", () => { + const chunker = new BlockChunker(cfg({ minChars: 10, maxChars: 50 })); + const text = "~~~python\n" + "x = 1\n".repeat(20) + "~~~"; + const result = chunker.tryChunk(text); + expect(result).not.toBeNull(); + // Chunk should end with closing tilde fence + expect(result!.chunk.trimEnd()).toMatch(/~~~$/); + // Remainder should start with reopened tilde fence + expect(result!.remainder).toMatch(/^~~~python\n/); + }); + + it("handles text before and after a code block", () => { + const chunker = new BlockChunker(cfg({ minChars: 10, maxChars: 500 })); + const text = "Some text before.\n\n```\ncode here\n```\n\nSome text after the code block."; + const result = chunker.tryChunk(text); + expect(result).not.toBeNull(); + // Should split at a paragraph break that is NOT inside a fence + const chunk = result!.chunk; + const remainder = result!.remainder; + expect(chunk + remainder).toBe(text); + }); + + it("handles multiple sequential code blocks", () => { + const chunker = new BlockChunker(cfg({ minChars: 10, maxChars: 500 })); + const text = "```js\nfoo()\n```\n\n```py\nbar()\n```\n\nEnd."; + const result = chunker.tryChunk(text); + expect(result).not.toBeNull(); + // Should be able to split between the two code blocks + const chunk = result!.chunk; + const remainder = result!.remainder; + expect(chunk + remainder).toBe(text); + }); + }); +}); diff --git a/src/hub/block-chunker.ts b/src/hub/block-chunker.ts new file mode 100644 index 00000000..a7f04ed7 --- /dev/null +++ b/src/hub/block-chunker.ts @@ -0,0 +1,240 @@ +/** + * Block chunker for splitting streaming text at natural boundaries. + * + * Used by MessageAggregator to produce appropriately-sized text blocks + * for messaging platforms that cannot consume raw streaming deltas. + */ + +export interface BlockChunkerConfig { + /** Minimum text chars before attempting a break (default: 200) */ + minChars: number; + /** Hard maximum chunk size — will force-break here (default: 2000) */ + maxChars: number; + /** Preferred break type priority */ + breakPreference: "paragraph" | "newline" | "sentence"; +} + +export const DEFAULT_CHUNKER_CONFIG: BlockChunkerConfig = { + minChars: 200, + maxChars: 2000, + breakPreference: "paragraph", +}; + +export interface ChunkResult { + /** Text to emit as a block */ + chunk: string; + /** Remaining text to keep in buffer */ + remainder: string; +} + +interface FenceInfo { + marker: string; // "```" or "~~~" + lang: string; // language identifier, e.g. "typescript" +} + +/** + * Detect if `text` has an unclosed fenced code block at the given position. + * Scans line-by-line tracking fence open/close pairs. + */ +function detectFenceAt(text: string, upTo: number): FenceInfo | null { + const region = text.slice(0, upTo); + const lines = region.split("\n"); + let openFence: FenceInfo | null = null; + + for (const line of lines) { + const match = line.match(/^(`{3,}|~{3,})(\S*)\s*$/); + if (!match) continue; + const marker = match[1]; + const lang = match[2] ?? ""; + const markerChar = marker[0]; + + if (openFence === null) { + // Opening a new fence + openFence = { marker, lang }; + } else if (markerChar === openFence.marker[0] && marker.length >= openFence.marker.length) { + // Closing the current fence (same char, at least as many chars) + openFence = null; + } + // Otherwise: different char or shorter marker, not a close — ignore + } + + return openFence; +} + +/** + * Check if position is inside an unclosed fenced code block. + */ +function isInsideFence(text: string, position: number): boolean { + return detectFenceAt(text, position) !== null; +} + +/** + * If a chunk ends inside an open fence, close it in the chunk + * and reopen it in the remainder. + */ +function applyFenceSafety(chunk: string, remainder: string): { chunk: string; remainder: string } { + const fence = detectFenceAt(chunk, chunk.length); + if (!fence) return { chunk, remainder }; + + // Close the fence in the chunk + const closedChunk = chunk.endsWith("\n") + ? chunk + fence.marker + "\n" + : chunk + "\n" + fence.marker + "\n"; + + // Reopen the fence in the remainder + const reopenTag = fence.lang ? `${fence.marker}${fence.lang}` : fence.marker; + const reopenedRemainder = reopenTag + "\n" + remainder; + + return { chunk: closedChunk, remainder: reopenedRemainder }; +} + +export class BlockChunker { + private readonly minChars: number; + private readonly maxChars: number; + private readonly breakPreference: "paragraph" | "newline" | "sentence"; + + constructor(config: BlockChunkerConfig) { + this.minChars = config.minChars; + this.maxChars = config.maxChars; + this.breakPreference = config.breakPreference; + } + + /** + * Attempt to extract a chunk from the buffer. + * Returns null if buffer is below minChars or no suitable break found + * (unless buffer exceeds maxChars, in which case a hard break is forced). + */ + tryChunk(buffer: string): ChunkResult | null { + if (buffer.length < this.minChars) return null; + + // Search window: minChars..min(buffer.length, maxChars) + const searchEnd = Math.min(buffer.length, this.maxChars); + const breakIndex = this.findBreakIndex(buffer, this.minChars, searchEnd); + + if (breakIndex !== -1) { + return { + chunk: buffer.slice(0, breakIndex), + remainder: buffer.slice(breakIndex), + }; + } + + // No break found within search window + if (buffer.length >= this.maxChars) { + // Hard cut at maxChars with fence safety + const raw = { + chunk: buffer.slice(0, this.maxChars), + remainder: buffer.slice(this.maxChars), + }; + return applyFenceSafety(raw.chunk, raw.remainder); + } + + // Buffer is between minChars and maxChars with no break — wait for more text + return null; + } + + /** + * Force-flush: return entire buffer as a chunk. + */ + flush(buffer: string): ChunkResult | null { + if (buffer.length === 0) return null; + return { chunk: buffer, remainder: "" }; + } + + /** + * Find the best break index in the buffer within [searchStart, searchEnd). + * Scans backwards from searchEnd to prefer later breaks (larger chunks). + * Returns the index AFTER the break character(s) — i.e., the start of the remainder. + * Returns -1 if no suitable break found. + */ + private findBreakIndex(buffer: string, searchStart: number, searchEnd: number): number { + const breakers = this.getBreakers(); + const bufLen = buffer.length; + + for (const breaker of breakers) { + const index = breaker(buffer, searchStart, searchEnd, bufLen); + if (index !== -1 && !isInsideFence(buffer, index)) { + return index; + } + } + + return -1; + } + + /** + * Get break functions in priority order based on breakPreference. + */ + private getBreakers(): Array<(buffer: string, start: number, end: number, bufLen: number) => number> { + switch (this.breakPreference) { + case "paragraph": + return [findParagraphBreak, findNewlineBreak, findSentenceBreak, findWordBreak]; + case "newline": + return [findNewlineBreak, findSentenceBreak, findWordBreak]; + case "sentence": + return [findSentenceBreak, findWordBreak]; + } + } +} + +/** + * Find a paragraph break (\n\n) scanning backwards from end. + * Returns the index after the break (start of next paragraph). + */ +function findParagraphBreak(buffer: string, start: number, end: number, bufLen: number): number { + for (let i = end - 1; i >= start + 1; i--) { + if (buffer[i] === "\n" && buffer[i - 1] === "\n") { + const idx = i + 1; + if (idx < bufLen) return idx; + } + } + return -1; +} + +/** + * Find a newline break (\n) scanning backwards from end. + * Returns the index after the newline. + */ +function findNewlineBreak(buffer: string, start: number, end: number, bufLen: number): number { + for (let i = end - 1; i >= start; i--) { + if (buffer[i] === "\n") { + const idx = i + 1; + if (idx < bufLen) return idx; + } + } + return -1; +} + +/** + * Find a sentence break (.!? followed by whitespace or end of string). + * Returns the index after the whitespace following the punctuation. + */ +function findSentenceBreak(buffer: string, start: number, end: number, bufLen: number): number { + for (let i = end - 1; i >= start; i--) { + const ch = buffer[i]; + if (ch === "." || ch === "!" || ch === "?") { + const next = i + 1; + if (next < bufLen && /\s/.test(buffer[next])) { + // Break after the whitespace + const idx = next + 1; + if (idx < bufLen) return idx; + } else if (next >= bufLen) { + // Punctuation at end of text — not a useful split point, skip + continue; + } + } + } + return -1; +} + +/** + * Find a word boundary (whitespace) scanning backwards from end. + * Returns the index after the whitespace character. + */ +function findWordBreak(buffer: string, start: number, end: number, bufLen: number): number { + for (let i = end - 1; i >= start; i--) { + if (/\s/.test(buffer[i])) { + const idx = i + 1; + if (idx < bufLen) return idx; + } + } + return -1; +} diff --git a/src/hub/message-aggregator.test.ts b/src/hub/message-aggregator.test.ts new file mode 100644 index 00000000..135a748c --- /dev/null +++ b/src/hub/message-aggregator.test.ts @@ -0,0 +1,395 @@ +import { describe, it, expect, vi, beforeEach } from "vitest"; +import { MessageAggregator, type BlockReply, type BlockChunkerConfig } from "./message-aggregator.js"; +import type { AgentEvent } from "@mariozechner/pi-agent-core"; +import type { MulticaEvent } from "../agent/events.js"; + +// --- Test helpers --- + +function makeMessageStart(id = "msg-1"): AgentEvent { + return { + type: "message_start", + message: { role: "assistant", content: [], id }, + } as unknown as AgentEvent; +} + +function makeMessageUpdate(fullText: string): AgentEvent { + return { + type: "message_update", + message: { + role: "assistant", + content: [{ type: "text", text: fullText }], + }, + } as unknown as AgentEvent; +} + +function makeMessageUpdateWithThinking(text: string, thinking: string): AgentEvent { + return { + type: "message_update", + message: { + role: "assistant", + content: [ + { type: "thinking", thinking }, + { type: "text", text }, + ], + }, + } as unknown as AgentEvent; +} + +function makeMessageEnd(fullText: string): AgentEvent { + return { + type: "message_end", + message: { + role: "assistant", + content: [{ type: "text", text: fullText }], + stopReason: "end_turn", + }, + } as unknown as AgentEvent; +} + +function makeToolStart(name = "exec"): AgentEvent { + return { + type: "tool_execution_start", + toolName: name, + args: {}, + toolCallId: "tool-1", + } as unknown as AgentEvent; +} + +function makeToolEnd(name = "exec"): AgentEvent { + return { + type: "tool_execution_end", + toolName: name, + toolCallId: "tool-1", + result: {}, + isError: false, + } as unknown as AgentEvent; +} + +function makeCompactionStart(): MulticaEvent { + return { type: "compaction_start" }; +} + +function makeCompactionEnd(): MulticaEvent { + return { type: "compaction_end", removed: 5, kept: 10, reason: "tokens" }; +} + +function smallConfig(overrides?: Partial): BlockChunkerConfig { + return { + minChars: 20, + maxChars: 100, + breakPreference: "paragraph", + ...overrides, + }; +} + +describe("MessageAggregator", () => { + let blocks: BlockReply[]; + let passedThrough: Array; + let onBlock: ReturnType; + let onPassthrough: ReturnType; + + beforeEach(() => { + blocks = []; + passedThrough = []; + onBlock = vi.fn((block: BlockReply) => blocks.push(block)); + onPassthrough = vi.fn((event: AgentEvent | MulticaEvent) => passedThrough.push(event)); + }); + + describe("event routing", () => { + it("passes through tool_execution_start immediately", () => { + const agg = new MessageAggregator(smallConfig(), onBlock, onPassthrough); + const event = makeToolStart(); + agg.handleEvent(event); + expect(onPassthrough).toHaveBeenCalledWith(event); + expect(onBlock).not.toHaveBeenCalled(); + }); + + it("passes through tool_execution_end immediately", () => { + const agg = new MessageAggregator(smallConfig(), onBlock, onPassthrough); + const event = makeToolEnd(); + agg.handleEvent(event); + expect(onPassthrough).toHaveBeenCalledWith(event); + expect(onBlock).not.toHaveBeenCalled(); + }); + + it("passes through compaction_start immediately", () => { + const agg = new MessageAggregator(smallConfig(), onBlock, onPassthrough); + const event = makeCompactionStart(); + agg.handleEvent(event); + expect(onPassthrough).toHaveBeenCalledWith(event); + }); + + it("passes through compaction_end immediately", () => { + const agg = new MessageAggregator(smallConfig(), onBlock, onPassthrough); + const event = makeCompactionEnd(); + agg.handleEvent(event); + expect(onPassthrough).toHaveBeenCalledWith(event); + }); + + it("passes through message_start immediately and resets state", () => { + const agg = new MessageAggregator(smallConfig(), onBlock, onPassthrough); + const event = makeMessageStart(); + agg.handleEvent(event); + expect(onPassthrough).toHaveBeenCalledWith(event); + expect(onBlock).not.toHaveBeenCalled(); + }); + + it("passes through message_end after flushing buffer", () => { + const agg = new MessageAggregator(smallConfig(), onBlock, onPassthrough); + agg.handleEvent(makeMessageStart()); + + // Add some text that is below minChars + agg.handleEvent(makeMessageUpdate("Hello world")); + + // End the message — should flush the buffer as a block, then pass through message_end + const endEvent = makeMessageEnd("Hello world"); + agg.handleEvent(endEvent); + + expect(blocks).toHaveLength(1); + expect(blocks[0].text).toBe("Hello world"); + expect(blocks[0].isFinal).toBe(true); + + // message_start + message_end both passed through + const passthroughTypes = passedThrough.map((e) => e.type); + expect(passthroughTypes).toContain("message_start"); + expect(passthroughTypes).toContain("message_end"); + }); + }); + + describe("text delta computation", () => { + it("computes delta from accumulated text in message_update", () => { + const agg = new MessageAggregator(smallConfig({ minChars: 1000 }), onBlock, onPassthrough); + agg.handleEvent(makeMessageStart()); + + // First update: "Hello" + agg.handleEvent(makeMessageUpdate("Hello")); + // Second update: "Hello world" (full text, not delta) + agg.handleEvent(makeMessageUpdate("Hello world")); + + // Flush to see accumulated text + agg.handleEvent(makeMessageEnd("Hello world")); + + expect(blocks).toHaveLength(1); + expect(blocks[0].text).toBe("Hello world"); + }); + + it("ignores ThinkingContent blocks, only extracts text", () => { + const agg = new MessageAggregator(smallConfig({ minChars: 1000 }), onBlock, onPassthrough); + agg.handleEvent(makeMessageStart()); + + agg.handleEvent(makeMessageUpdateWithThinking("visible text", "internal thinking")); + + agg.handleEvent(makeMessageEnd("visible text")); + + expect(blocks).toHaveLength(1); + expect(blocks[0].text).toBe("visible text"); + expect(blocks[0].text).not.toContain("internal thinking"); + }); + + it("handles empty delta (duplicate event) gracefully", () => { + const agg = new MessageAggregator(smallConfig({ minChars: 1000 }), onBlock, onPassthrough); + agg.handleEvent(makeMessageStart()); + + agg.handleEvent(makeMessageUpdate("Hello")); + agg.handleEvent(makeMessageUpdate("Hello")); // duplicate + agg.handleEvent(makeMessageUpdate("Hello")); // duplicate again + + agg.handleEvent(makeMessageEnd("Hello")); + + expect(blocks).toHaveLength(1); + expect(blocks[0].text).toBe("Hello"); + }); + + it("handles monotonically growing text correctly", () => { + const agg = new MessageAggregator(smallConfig({ minChars: 1000 }), onBlock, onPassthrough); + agg.handleEvent(makeMessageStart()); + + agg.handleEvent(makeMessageUpdate("H")); + agg.handleEvent(makeMessageUpdate("He")); + agg.handleEvent(makeMessageUpdate("Hel")); + agg.handleEvent(makeMessageUpdate("Hell")); + agg.handleEvent(makeMessageUpdate("Hello")); + + agg.handleEvent(makeMessageEnd("Hello")); + + expect(blocks).toHaveLength(1); + expect(blocks[0].text).toBe("Hello"); + }); + }); + + describe("block emission", () => { + it("does not emit block when buffer is below minChars", () => { + const agg = new MessageAggregator(smallConfig({ minChars: 100 }), onBlock, onPassthrough); + agg.handleEvent(makeMessageStart()); + agg.handleEvent(makeMessageUpdate("Short text.")); + expect(onBlock).not.toHaveBeenCalled(); + }); + + it("emits block when buffer reaches paragraph break after minChars", () => { + const agg = new MessageAggregator(smallConfig({ minChars: 20 }), onBlock, onPassthrough); + agg.handleEvent(makeMessageStart()); + + const text = "This is the first paragraph with enough chars.\n\nSecond paragraph starts here."; + agg.handleEvent(makeMessageUpdate(text)); + + expect(blocks.length).toBeGreaterThanOrEqual(1); + expect(blocks[0].text).toContain("first paragraph"); + expect(blocks[0].isFinal).toBe(false); + }); + + it("emits multiple blocks for very long text", () => { + const agg = new MessageAggregator(smallConfig({ minChars: 20, maxChars: 50 }), onBlock, onPassthrough); + agg.handleEvent(makeMessageStart()); + + const text = "First paragraph content.\n\nSecond paragraph content.\n\nThird paragraph content."; + agg.handleEvent(makeMessageUpdate(text)); + + expect(blocks.length).toBeGreaterThanOrEqual(2); + // All blocks except the last should have isFinal=false + for (let i = 0; i < blocks.length; i++) { + expect(blocks[i].isFinal).toBe(false); + } + }); + + it("flushes remaining buffer on message_end with isFinal=true", () => { + const agg = new MessageAggregator(smallConfig({ minChars: 20, maxChars: 100 }), onBlock, onPassthrough); + agg.handleEvent(makeMessageStart()); + + const text = "First paragraph here.\n\nSmall tail."; + agg.handleEvent(makeMessageUpdate(text)); + agg.handleEvent(makeMessageEnd(text)); + + const finalBlock = blocks[blocks.length - 1]; + expect(finalBlock.isFinal).toBe(true); + }); + + it("increments block index for each emitted block", () => { + const agg = new MessageAggregator(smallConfig({ minChars: 20, maxChars: 50 }), onBlock, onPassthrough); + agg.handleEvent(makeMessageStart()); + + const text = "First paragraph content.\n\nSecond paragraph content.\n\nThird paragraph."; + agg.handleEvent(makeMessageUpdate(text)); + agg.handleEvent(makeMessageEnd(text)); + + for (let i = 0; i < blocks.length; i++) { + expect(blocks[i].index).toBe(i); + } + }); + + it("resets state on message_start", () => { + const agg = new MessageAggregator(smallConfig({ minChars: 1000 }), onBlock, onPassthrough); + + // First message cycle + agg.handleEvent(makeMessageStart("msg-1")); + agg.handleEvent(makeMessageUpdate("First message text.")); + agg.handleEvent(makeMessageEnd("First message text.")); + + expect(blocks).toHaveLength(1); + expect(blocks[0].index).toBe(0); + + // Second message cycle — index should reset + agg.handleEvent(makeMessageStart("msg-2")); + agg.handleEvent(makeMessageUpdate("Second message text.")); + agg.handleEvent(makeMessageEnd("Second message text.")); + + expect(blocks).toHaveLength(2); + expect(blocks[1].index).toBe(0); // Reset after new message_start + }); + + it("does not emit empty block on message_end with no content", () => { + const agg = new MessageAggregator(smallConfig(), onBlock, onPassthrough); + agg.handleEvent(makeMessageStart()); + agg.handleEvent(makeMessageEnd("")); + + expect(onBlock).not.toHaveBeenCalled(); + }); + }); + + describe("interleaved events", () => { + it("handles tool events interleaved with text updates", () => { + const agg = new MessageAggregator(smallConfig({ minChars: 1000 }), onBlock, onPassthrough); + agg.handleEvent(makeMessageStart()); + + agg.handleEvent(makeMessageUpdate("Before tool call.")); + agg.handleEvent(makeToolStart()); + agg.handleEvent(makeToolEnd()); + // After tool execution, the model continues generating text + // Note: message_update accumulates full text, so it includes everything + agg.handleEvent(makeMessageUpdate("Before tool call. After tool result.")); + + agg.handleEvent(makeMessageEnd("Before tool call. After tool result.")); + + // Tool events should have passed through + const toolEvents = passedThrough.filter( + (e) => e.type === "tool_execution_start" || e.type === "tool_execution_end", + ); + expect(toolEvents).toHaveLength(2); + + // Final block should contain all text + expect(blocks).toHaveLength(1); + expect(blocks[0].text).toBe("Before tool call. After tool result."); + }); + + it("handles multiple message cycles (reset between)", () => { + const agg = new MessageAggregator(smallConfig({ minChars: 1000 }), onBlock, onPassthrough); + + // Cycle 1 + agg.handleEvent(makeMessageStart("msg-1")); + agg.handleEvent(makeMessageUpdate("First response.")); + agg.handleEvent(makeMessageEnd("First response.")); + + // Cycle 2 + agg.handleEvent(makeMessageStart("msg-2")); + agg.handleEvent(makeMessageUpdate("Second response.")); + agg.handleEvent(makeMessageEnd("Second response.")); + + expect(blocks).toHaveLength(2); + expect(blocks[0].text).toBe("First response."); + expect(blocks[1].text).toBe("Second response."); + // Both should be final (flushed on message_end) + expect(blocks[0].isFinal).toBe(true); + expect(blocks[1].isFinal).toBe(true); + }); + + it("handles compaction events between messages", () => { + const agg = new MessageAggregator(smallConfig({ minChars: 1000 }), onBlock, onPassthrough); + + agg.handleEvent(makeMessageStart("msg-1")); + agg.handleEvent(makeMessageUpdate("Text before compaction.")); + agg.handleEvent(makeMessageEnd("Text before compaction.")); + + // Compaction happens + agg.handleEvent(makeCompactionStart()); + agg.handleEvent(makeCompactionEnd()); + + // New message after compaction + agg.handleEvent(makeMessageStart("msg-2")); + agg.handleEvent(makeMessageUpdate("Text after compaction.")); + agg.handleEvent(makeMessageEnd("Text after compaction.")); + + expect(blocks).toHaveLength(2); + const compactionEvents = passedThrough.filter( + (e) => e.type === "compaction_start" || e.type === "compaction_end", + ); + expect(compactionEvents).toHaveLength(2); + }); + }); + + describe("reset()", () => { + it("clears all internal state", () => { + const agg = new MessageAggregator(smallConfig({ minChars: 1000 }), onBlock, onPassthrough); + + agg.handleEvent(makeMessageStart()); + agg.handleEvent(makeMessageUpdate("Some accumulated text.")); + + // Reset externally + agg.reset(); + + // Now end the message — buffer should be empty, no block emitted + agg.handleEvent(makeMessageEnd("Some accumulated text.")); + + // Only the message_end passthrough, no block (buffer was cleared) + expect(onBlock).not.toHaveBeenCalled(); + }); + }); +}); diff --git a/src/hub/message-aggregator.ts b/src/hub/message-aggregator.ts new file mode 100644 index 00000000..788dc706 --- /dev/null +++ b/src/hub/message-aggregator.ts @@ -0,0 +1,128 @@ +/** + * Message aggregator for buffering streaming agent events + * and emitting complete "block replies" at natural text boundaries. + * + * Used for third-party messaging integrations (Discord, Telegram) + * that cannot consume raw streaming deltas. + */ + +import type { AgentEvent } from "@mariozechner/pi-agent-core"; +import type { MulticaEvent } from "../agent/events.js"; +import { extractText } from "../agent/extract-text.js"; +import { BlockChunker, DEFAULT_CHUNKER_CONFIG, type BlockChunkerConfig } from "./block-chunker.js"; + +/** A completed text block emitted by the aggregator */ +export interface BlockReply { + /** 0-based sequence number within the current message */ + index: number; + /** The text content of this block */ + text: string; + /** Whether this is the final block in the current message */ + isFinal: boolean; +} + +/** Callback for receiving aggregated block replies */ +export type BlockReplyCallback = (block: BlockReply) => void; + +/** Callback for pass-through events (tool, compaction, lifecycle) */ +export type PassthroughCallback = (event: AgentEvent | MulticaEvent) => void; + +export { type BlockChunkerConfig, DEFAULT_CHUNKER_CONFIG }; + +export class MessageAggregator { + private buffer = ""; + private previousText = ""; + private blockIndex = 0; + private readonly chunker: BlockChunker; + + constructor( + config: BlockChunkerConfig, + private readonly onBlock: BlockReplyCallback, + private readonly onPassthrough: PassthroughCallback, + ) { + this.chunker = new BlockChunker(config); + } + + /** + * Feed an agent event into the aggregator. + * Text content from message_update events is buffered and emitted as block replies. + * All other events are passed through immediately. + */ + handleEvent(event: AgentEvent | MulticaEvent): void { + switch (event.type) { + case "compaction_start": + case "compaction_end": + case "tool_execution_start": + case "tool_execution_end": + case "tool_execution_update": + this.onPassthrough(event); + return; + + case "message_start": + this.resetState(); + this.onPassthrough(event); + return; + + case "message_update": + this.handleMessageUpdate(event as AgentEvent & { type: "message_update" }); + return; + + case "message_end": + this.flushBuffer(); + this.onPassthrough(event); + return; + + default: + // agent_start, agent_end, turn_start, turn_end, etc. + this.onPassthrough(event); + return; + } + } + + /** Reset all buffering state (e.g. between messages or for external use) */ + reset(): void { + this.resetState(); + } + + private handleMessageUpdate(event: AgentEvent): void { + const message = (event as { message?: unknown }).message; + const currentText = extractText(message as Parameters[0]); + + // Compute incremental delta (monotonic accumulation) + if (currentText.length <= this.previousText.length) return; + const delta = currentText.slice(this.previousText.length); + + this.previousText = currentText; + this.buffer += delta; + + // Try to emit chunks from buffer + let result = this.chunker.tryChunk(this.buffer); + while (result !== null) { + this.emitBlock(result.chunk, false); + this.buffer = result.remainder; + result = this.chunker.tryChunk(this.buffer); + } + } + + private flushBuffer(): void { + const result = this.chunker.flush(this.buffer); + if (result) { + this.emitBlock(result.chunk, true); + this.buffer = result.remainder; + } + } + + private emitBlock(text: string, isFinal: boolean): void { + this.onBlock({ + index: this.blockIndex++, + text, + isFinal, + }); + } + + private resetState(): void { + this.buffer = ""; + this.previousText = ""; + this.blockIndex = 0; + } +} From c03a60753e09a003e779f247bc7ed1a7f654147c Mon Sep 17 00:00:00 2001 From: yushen Date: Thu, 5 Feb 2026 18:28:14 +0800 Subject: [PATCH 2/5] feat(hub): integrate message aggregator into Hub event pipeline Add enableAggregation/disableAggregation methods to Hub for per-agent aggregation control. When enabled, streaming text deltas are buffered and emitted as block-reply events instead of raw stream events. The existing streaming mode remains the default for own clients. Co-Authored-By: Claude Opus 4.5 --- src/hub/hub.ts | 110 +++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 89 insertions(+), 21 deletions(-) diff --git a/src/hub/hub.ts b/src/hub/hub.ts index 8ba68d64..1676309b 100644 --- a/src/hub/hub.ts +++ b/src/hub/hub.ts @@ -30,6 +30,7 @@ import { evaluateCommandSafety, requiresApproval } from "../agent/tools/exec-saf import { addAllowlistEntry, recordAllowlistUse, matchAllowlist } from "../agent/tools/exec-allowlist.js"; import type { ExecApprovalCallback, ExecApprovalConfig, ApprovalResult, ExecApprovalRequest } from "../agent/tools/exec-approval-types.js"; import { readProfileConfig, writeProfileConfig } from "../agent/profile/storage.js"; +import { MessageAggregator, type BlockChunkerConfig, DEFAULT_CHUNKER_CONFIG } from "./message-aggregator.js"; export class Hub { private readonly agents = new Map(); @@ -37,6 +38,7 @@ export class Hub { private readonly agentStreamIds = new Map(); private readonly agentStreamCounters = new Map(); private readonly localApprovalHandlers = new Map void>(); + private readonly agentAggregators = new Map(); private readonly rpc: RpcDispatcher; private readonly approvalManager: ExecApprovalManager; private client: GatewayClient; @@ -243,6 +245,60 @@ export class Hub { return this.approvalManager.resolveApproval(approvalId, decision); } + /** + * Enable message aggregation for an agent. + * When enabled, streaming text deltas are buffered and emitted as complete block replies + * instead of being forwarded as raw events. Useful for third-party messaging integrations. + */ + enableAggregation(agentId: string, config?: Partial): void { + const fullConfig = { ...DEFAULT_CHUNKER_CONFIG, ...config }; + const aggregator = new MessageAggregator( + fullConfig, + (block) => { + const targetDeviceId = this.agentSenders.get(agentId); + if (!targetDeviceId) return; + this.client.send(targetDeviceId, "block-reply", { + agentId, + block, + }); + }, + (event) => { + const targetDeviceId = this.agentSenders.get(agentId); + if (!targetDeviceId) return; + + const isCompactionEvent = + event.type === "compaction_start" || event.type === "compaction_end"; + if (isCompactionEvent) { + this.client.send(targetDeviceId, StreamAction, { + streamId: `compaction:${agentId}`, + agentId, + event, + }); + return; + } + + if (event.type === "message_start") { + this.beginStream(agentId, event); + } + const streamId = this.getActiveStreamId(agentId, event); + this.client.send(targetDeviceId, StreamAction, { + streamId, + agentId, + event, + }); + if (event.type === "message_end") { + this.endStream(agentId); + } + }, + ); + this.agentAggregators.set(agentId, aggregator); + } + + /** Disable message aggregation for an agent (reverts to streaming mode). */ + disableAggregation(agentId: string): void { + this.agentAggregators.delete(agentId); + } + /** Create new Agent, or rebuild with existing ID */ createAgent(id?: string, options?: { persist?: boolean; profileId?: string }): AsyncAgent { if (id) { @@ -312,20 +368,13 @@ export class Hub { agentId: agent.sessionId, content: item.content, }); - } else { - // Compaction events: forward with synthetic streamId (no stream tracking) - const isCompactionEvent = - item.type === "compaction_start" || item.type === "compaction_end"; - if (isCompactionEvent) { - this.client.send(targetDeviceId, StreamAction, { - streamId: `compaction:${agent.sessionId}`, - agentId: agent.sessionId, - event: item, - }); - continue; - } + continue; + } - // Filter: only forward events useful for frontend rendering + // Filter: only forward events useful for frontend rendering + const isCompactionEvent = + item.type === "compaction_start" || item.type === "compaction_end"; + if (!isCompactionEvent) { const maybeMessage = (item as { message?: { role?: string } }).message; const isAssistantMessage = maybeMessage?.role === "assistant"; const shouldForward = @@ -333,19 +382,36 @@ export class Hub { || item.type === "tool_execution_start" || item.type === "tool_execution_end"; if (!shouldForward) continue; + } - if (item.type === "message_start") { - this.beginStream(agent.sessionId, item); - } - const streamId = this.getActiveStreamId(agent.sessionId, item); + // Aggregated mode: buffer text deltas, emit block replies + const aggregator = this.agentAggregators.get(agent.sessionId); + if (aggregator) { + aggregator.handleEvent(item); + continue; + } + + // Streaming mode: forward events as-is (existing behavior) + if (isCompactionEvent) { this.client.send(targetDeviceId, StreamAction, { - streamId, + streamId: `compaction:${agent.sessionId}`, agentId: agent.sessionId, event: item, }); - if (item.type === "message_end") { - this.endStream(agent.sessionId); - } + continue; + } + + if (item.type === "message_start") { + this.beginStream(agent.sessionId, item); + } + const streamId = this.getActiveStreamId(agent.sessionId, item); + this.client.send(targetDeviceId, StreamAction, { + streamId, + agentId: agent.sessionId, + event: item, + }); + if (item.type === "message_end") { + this.endStream(agent.sessionId); } } } @@ -503,6 +569,7 @@ export class Hub { this.agentStreamIds.delete(id); this.agentStreamCounters.delete(id); this.localApprovalHandlers.delete(id); + this.agentAggregators.delete(id); removeAgentRecord(id); return true; } @@ -518,6 +585,7 @@ export class Hub { this.agentStreamIds.delete(id); this.agentStreamCounters.delete(id); this.localApprovalHandlers.delete(id); + this.agentAggregators.delete(id); } this.client.disconnect(); console.log("Hub shut down"); From fc0c1781c33200272dac08ce12afe98a30f78a76 Mon Sep 17 00:00:00 2001 From: yushen Date: Thu, 5 Feb 2026 20:24:25 +0800 Subject: [PATCH 3/5] refactor(hub): remove Hub-level aggregation integration MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Message aggregation should not be managed by the Hub. The aggregator is a standalone utility for client adapters to use internally — the Hub stays clean and only does event forwarding. Third-party messaging integrations (Discord, Telegram) will use MessageAggregator in their own adapter layer. Co-Authored-By: Claude Opus 4.5 --- src/hub/hub.ts | 110 ++++++++++--------------------------------------- 1 file changed, 21 insertions(+), 89 deletions(-) diff --git a/src/hub/hub.ts b/src/hub/hub.ts index 1676309b..8ba68d64 100644 --- a/src/hub/hub.ts +++ b/src/hub/hub.ts @@ -30,7 +30,6 @@ import { evaluateCommandSafety, requiresApproval } from "../agent/tools/exec-saf import { addAllowlistEntry, recordAllowlistUse, matchAllowlist } from "../agent/tools/exec-allowlist.js"; import type { ExecApprovalCallback, ExecApprovalConfig, ApprovalResult, ExecApprovalRequest } from "../agent/tools/exec-approval-types.js"; import { readProfileConfig, writeProfileConfig } from "../agent/profile/storage.js"; -import { MessageAggregator, type BlockChunkerConfig, DEFAULT_CHUNKER_CONFIG } from "./message-aggregator.js"; export class Hub { private readonly agents = new Map(); @@ -38,7 +37,6 @@ export class Hub { private readonly agentStreamIds = new Map(); private readonly agentStreamCounters = new Map(); private readonly localApprovalHandlers = new Map void>(); - private readonly agentAggregators = new Map(); private readonly rpc: RpcDispatcher; private readonly approvalManager: ExecApprovalManager; private client: GatewayClient; @@ -245,60 +243,6 @@ export class Hub { return this.approvalManager.resolveApproval(approvalId, decision); } - /** - * Enable message aggregation for an agent. - * When enabled, streaming text deltas are buffered and emitted as complete block replies - * instead of being forwarded as raw events. Useful for third-party messaging integrations. - */ - enableAggregation(agentId: string, config?: Partial): void { - const fullConfig = { ...DEFAULT_CHUNKER_CONFIG, ...config }; - const aggregator = new MessageAggregator( - fullConfig, - (block) => { - const targetDeviceId = this.agentSenders.get(agentId); - if (!targetDeviceId) return; - this.client.send(targetDeviceId, "block-reply", { - agentId, - block, - }); - }, - (event) => { - const targetDeviceId = this.agentSenders.get(agentId); - if (!targetDeviceId) return; - - const isCompactionEvent = - event.type === "compaction_start" || event.type === "compaction_end"; - if (isCompactionEvent) { - this.client.send(targetDeviceId, StreamAction, { - streamId: `compaction:${agentId}`, - agentId, - event, - }); - return; - } - - if (event.type === "message_start") { - this.beginStream(agentId, event); - } - const streamId = this.getActiveStreamId(agentId, event); - this.client.send(targetDeviceId, StreamAction, { - streamId, - agentId, - event, - }); - if (event.type === "message_end") { - this.endStream(agentId); - } - }, - ); - this.agentAggregators.set(agentId, aggregator); - } - - /** Disable message aggregation for an agent (reverts to streaming mode). */ - disableAggregation(agentId: string): void { - this.agentAggregators.delete(agentId); - } - /** Create new Agent, or rebuild with existing ID */ createAgent(id?: string, options?: { persist?: boolean; profileId?: string }): AsyncAgent { if (id) { @@ -368,13 +312,20 @@ export class Hub { agentId: agent.sessionId, content: item.content, }); - continue; - } + } else { + // Compaction events: forward with synthetic streamId (no stream tracking) + const isCompactionEvent = + item.type === "compaction_start" || item.type === "compaction_end"; + if (isCompactionEvent) { + this.client.send(targetDeviceId, StreamAction, { + streamId: `compaction:${agent.sessionId}`, + agentId: agent.sessionId, + event: item, + }); + continue; + } - // Filter: only forward events useful for frontend rendering - const isCompactionEvent = - item.type === "compaction_start" || item.type === "compaction_end"; - if (!isCompactionEvent) { + // Filter: only forward events useful for frontend rendering const maybeMessage = (item as { message?: { role?: string } }).message; const isAssistantMessage = maybeMessage?.role === "assistant"; const shouldForward = @@ -382,36 +333,19 @@ export class Hub { || item.type === "tool_execution_start" || item.type === "tool_execution_end"; if (!shouldForward) continue; - } - // Aggregated mode: buffer text deltas, emit block replies - const aggregator = this.agentAggregators.get(agent.sessionId); - if (aggregator) { - aggregator.handleEvent(item); - continue; - } - - // Streaming mode: forward events as-is (existing behavior) - if (isCompactionEvent) { + if (item.type === "message_start") { + this.beginStream(agent.sessionId, item); + } + const streamId = this.getActiveStreamId(agent.sessionId, item); this.client.send(targetDeviceId, StreamAction, { - streamId: `compaction:${agent.sessionId}`, + streamId, agentId: agent.sessionId, event: item, }); - continue; - } - - if (item.type === "message_start") { - this.beginStream(agent.sessionId, item); - } - const streamId = this.getActiveStreamId(agent.sessionId, item); - this.client.send(targetDeviceId, StreamAction, { - streamId, - agentId: agent.sessionId, - event: item, - }); - if (item.type === "message_end") { - this.endStream(agent.sessionId); + if (item.type === "message_end") { + this.endStream(agent.sessionId); + } } } } @@ -569,7 +503,6 @@ export class Hub { this.agentStreamIds.delete(id); this.agentStreamCounters.delete(id); this.localApprovalHandlers.delete(id); - this.agentAggregators.delete(id); removeAgentRecord(id); return true; } @@ -585,7 +518,6 @@ export class Hub { this.agentStreamIds.delete(id); this.agentStreamCounters.delete(id); this.localApprovalHandlers.delete(id); - this.agentAggregators.delete(id); } this.client.disconnect(); console.log("Hub shut down"); From efb149532686a17addad5270fc26b19cb321e883 Mon Sep 17 00:00:00 2001 From: yushen Date: Thu, 5 Feb 2026 22:23:55 +0800 Subject: [PATCH 4/5] fix(hub): fence close must reject info strings per CommonMark spec The closing fence regex was not checking for an empty info string, allowing e.g. ```python to incorrectly close an open fence. Also adds missing test for tool_execution_update passthrough. Co-Authored-By: Claude Opus 4.5 --- src/hub/block-chunker.test.ts | 17 +++++++++++++++++ src/hub/block-chunker.ts | 4 ++-- src/hub/message-aggregator.test.ts | 8 ++++++++ 3 files changed, 27 insertions(+), 2 deletions(-) diff --git a/src/hub/block-chunker.test.ts b/src/hub/block-chunker.test.ts index a1b36b16..dfda6cc6 100644 --- a/src/hub/block-chunker.test.ts +++ b/src/hub/block-chunker.test.ts @@ -223,6 +223,23 @@ describe("BlockChunker", () => { expect(chunk + remainder).toBe(text); }); + it("does not treat fence with info string as closing fence", () => { + const chunker = new BlockChunker(cfg({ minChars: 10, maxChars: 500 })); + // The second ```python should NOT close the first fence (CommonMark: closing fence has no info string) + const text = "Before.\n\n```python\ncode line 1\n```python\ncode line 2\n```\n\nAfter text here."; + const result = chunker.tryChunk(text); + expect(result).not.toBeNull(); + const chunk = result!.chunk; + // The split should not land between ```python and ```python (inside fence) + // It should either be before the fence or after the closing ``` + const fenceOpens = (chunk.match(/```python/g) || []).length; + const fenceCloses = (chunk.match(/^```$/gm) || []).length; + if (fenceOpens > 0) { + // If chunk includes opening fences, it must include the real close + expect(fenceCloses).toBeGreaterThanOrEqual(1); + } + }); + it("handles multiple sequential code blocks", () => { const chunker = new BlockChunker(cfg({ minChars: 10, maxChars: 500 })); const text = "```js\nfoo()\n```\n\n```py\nbar()\n```\n\nEnd."; diff --git a/src/hub/block-chunker.ts b/src/hub/block-chunker.ts index a7f04ed7..6daadad0 100644 --- a/src/hub/block-chunker.ts +++ b/src/hub/block-chunker.ts @@ -51,8 +51,8 @@ function detectFenceAt(text: string, upTo: number): FenceInfo | null { if (openFence === null) { // Opening a new fence openFence = { marker, lang }; - } else if (markerChar === openFence.marker[0] && marker.length >= openFence.marker.length) { - // Closing the current fence (same char, at least as many chars) + } else if (markerChar === openFence.marker[0] && marker.length >= openFence.marker.length && lang === "") { + // Closing the current fence (same char, at least as many chars, no info string per CommonMark) openFence = null; } // Otherwise: different char or shorter marker, not a close — ignore diff --git a/src/hub/message-aggregator.test.ts b/src/hub/message-aggregator.test.ts index 135a748c..52516648 100644 --- a/src/hub/message-aggregator.test.ts +++ b/src/hub/message-aggregator.test.ts @@ -112,6 +112,14 @@ describe("MessageAggregator", () => { expect(onBlock).not.toHaveBeenCalled(); }); + it("passes through tool_execution_update immediately", () => { + const agg = new MessageAggregator(smallConfig(), onBlock, onPassthrough); + const event = { type: "tool_execution_update", toolCallId: "tool-1", content: "output" } as unknown as AgentEvent; + agg.handleEvent(event); + expect(onPassthrough).toHaveBeenCalledWith(event); + expect(onBlock).not.toHaveBeenCalled(); + }); + it("passes through compaction_start immediately", () => { const agg = new MessageAggregator(smallConfig(), onBlock, onPassthrough); const event = makeCompactionStart(); From f6fab68873c27bb917d6b1eb04c81a7148a3baac Mon Sep 17 00:00:00 2001 From: yushen Date: Fri, 6 Feb 2026 12:44:15 +0800 Subject: [PATCH 5/5] fix(hub): include message_end text in aggregator --- src/hub/message-aggregator.ts | 26 ++++++++++++++++++-------- 1 file changed, 18 insertions(+), 8 deletions(-) diff --git a/src/hub/message-aggregator.ts b/src/hub/message-aggregator.ts index 788dc706..4f4b3738 100644 --- a/src/hub/message-aggregator.ts +++ b/src/hub/message-aggregator.ts @@ -68,7 +68,7 @@ export class MessageAggregator { return; case "message_end": - this.flushBuffer(); + this.handleMessageEnd(event as AgentEvent & { type: "message_end" }); this.onPassthrough(event); return; @@ -87,13 +87,7 @@ export class MessageAggregator { private handleMessageUpdate(event: AgentEvent): void { const message = (event as { message?: unknown }).message; const currentText = extractText(message as Parameters[0]); - - // Compute incremental delta (monotonic accumulation) - if (currentText.length <= this.previousText.length) return; - const delta = currentText.slice(this.previousText.length); - - this.previousText = currentText; - this.buffer += delta; + this.appendDelta(currentText); // Try to emit chunks from buffer let result = this.chunker.tryChunk(this.buffer); @@ -104,6 +98,22 @@ export class MessageAggregator { } } + private handleMessageEnd(event: AgentEvent): void { + const message = (event as { message?: unknown }).message; + const currentText = extractText(message as Parameters[0]); + this.appendDelta(currentText); + this.flushBuffer(); + } + + private appendDelta(currentText: string): void { + // Compute incremental delta (monotonic accumulation) + if (currentText.length <= this.previousText.length) return; + const delta = currentText.slice(this.previousText.length); + + this.previousText = currentText; + this.buffer += delta; + } + private flushBuffer(): void { const result = this.chunker.flush(this.buffer); if (result) {