diff --git a/src/hub/hub.ts b/src/hub/hub.ts index 8ba68d64..1676309b 100644 --- a/src/hub/hub.ts +++ b/src/hub/hub.ts @@ -30,6 +30,7 @@ import { evaluateCommandSafety, requiresApproval } from "../agent/tools/exec-saf import { addAllowlistEntry, recordAllowlistUse, matchAllowlist } from "../agent/tools/exec-allowlist.js"; import type { ExecApprovalCallback, ExecApprovalConfig, ApprovalResult, ExecApprovalRequest } from "../agent/tools/exec-approval-types.js"; import { readProfileConfig, writeProfileConfig } from "../agent/profile/storage.js"; +import { MessageAggregator, type BlockChunkerConfig, DEFAULT_CHUNKER_CONFIG } from "./message-aggregator.js"; export class Hub { private readonly agents = new Map(); @@ -37,6 +38,7 @@ export class Hub { private readonly agentStreamIds = new Map(); private readonly agentStreamCounters = new Map(); private readonly localApprovalHandlers = new Map void>(); + private readonly agentAggregators = new Map(); private readonly rpc: RpcDispatcher; private readonly approvalManager: ExecApprovalManager; private client: GatewayClient; @@ -243,6 +245,60 @@ export class Hub { return this.approvalManager.resolveApproval(approvalId, decision); } + /** + * Enable message aggregation for an agent. + * When enabled, streaming text deltas are buffered and emitted as complete block replies + * instead of being forwarded as raw events. Useful for third-party messaging integrations. + */ + enableAggregation(agentId: string, config?: Partial): void { + const fullConfig = { ...DEFAULT_CHUNKER_CONFIG, ...config }; + const aggregator = new MessageAggregator( + fullConfig, + (block) => { + const targetDeviceId = this.agentSenders.get(agentId); + if (!targetDeviceId) return; + this.client.send(targetDeviceId, "block-reply", { + agentId, + block, + }); + }, + (event) => { + const targetDeviceId = this.agentSenders.get(agentId); + if (!targetDeviceId) return; + + const isCompactionEvent = + event.type === "compaction_start" || event.type === "compaction_end"; + if (isCompactionEvent) { + this.client.send(targetDeviceId, StreamAction, { + streamId: `compaction:${agentId}`, + agentId, + event, + }); + return; + } + + if (event.type === "message_start") { + this.beginStream(agentId, event); + } + const streamId = this.getActiveStreamId(agentId, event); + this.client.send(targetDeviceId, StreamAction, { + streamId, + agentId, + event, + }); + if (event.type === "message_end") { + this.endStream(agentId); + } + }, + ); + this.agentAggregators.set(agentId, aggregator); + } + + /** Disable message aggregation for an agent (reverts to streaming mode). */ + disableAggregation(agentId: string): void { + this.agentAggregators.delete(agentId); + } + /** Create new Agent, or rebuild with existing ID */ createAgent(id?: string, options?: { persist?: boolean; profileId?: string }): AsyncAgent { if (id) { @@ -312,20 +368,13 @@ export class Hub { agentId: agent.sessionId, content: item.content, }); - } else { - // Compaction events: forward with synthetic streamId (no stream tracking) - const isCompactionEvent = - item.type === "compaction_start" || item.type === "compaction_end"; - if (isCompactionEvent) { - this.client.send(targetDeviceId, StreamAction, { - streamId: `compaction:${agent.sessionId}`, - agentId: agent.sessionId, - event: item, - }); - continue; - } + continue; + } - // Filter: only forward events useful for frontend rendering + // Filter: only forward events useful for frontend rendering + const isCompactionEvent = + item.type === "compaction_start" || item.type === "compaction_end"; + if (!isCompactionEvent) { const maybeMessage = (item as { message?: { role?: string } }).message; const isAssistantMessage = maybeMessage?.role === "assistant"; const shouldForward = @@ -333,19 +382,36 @@ export class Hub { || item.type === "tool_execution_start" || item.type === "tool_execution_end"; if (!shouldForward) continue; + } - if (item.type === "message_start") { - this.beginStream(agent.sessionId, item); - } - const streamId = this.getActiveStreamId(agent.sessionId, item); + // Aggregated mode: buffer text deltas, emit block replies + const aggregator = this.agentAggregators.get(agent.sessionId); + if (aggregator) { + aggregator.handleEvent(item); + continue; + } + + // Streaming mode: forward events as-is (existing behavior) + if (isCompactionEvent) { this.client.send(targetDeviceId, StreamAction, { - streamId, + streamId: `compaction:${agent.sessionId}`, agentId: agent.sessionId, event: item, }); - if (item.type === "message_end") { - this.endStream(agent.sessionId); - } + continue; + } + + if (item.type === "message_start") { + this.beginStream(agent.sessionId, item); + } + const streamId = this.getActiveStreamId(agent.sessionId, item); + this.client.send(targetDeviceId, StreamAction, { + streamId, + agentId: agent.sessionId, + event: item, + }); + if (item.type === "message_end") { + this.endStream(agent.sessionId); } } } @@ -503,6 +569,7 @@ export class Hub { this.agentStreamIds.delete(id); this.agentStreamCounters.delete(id); this.localApprovalHandlers.delete(id); + this.agentAggregators.delete(id); removeAgentRecord(id); return true; } @@ -518,6 +585,7 @@ export class Hub { this.agentStreamIds.delete(id); this.agentStreamCounters.delete(id); this.localApprovalHandlers.delete(id); + this.agentAggregators.delete(id); } this.client.disconnect(); console.log("Hub shut down");