feat(hub): integrate message aggregator into Hub event pipeline

Add enableAggregation/disableAggregation methods to Hub for per-agent
aggregation control. When enabled, streaming text deltas are buffered
and emitted as block-reply events instead of raw stream events. The
existing streaming mode remains the default for own clients.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
yushen 2026-02-05 18:28:14 +08:00
parent 1859e32a30
commit c03a60753e

View file

@ -30,6 +30,7 @@ import { evaluateCommandSafety, requiresApproval } from "../agent/tools/exec-saf
import { addAllowlistEntry, recordAllowlistUse, matchAllowlist } from "../agent/tools/exec-allowlist.js";
import type { ExecApprovalCallback, ExecApprovalConfig, ApprovalResult, ExecApprovalRequest } from "../agent/tools/exec-approval-types.js";
import { readProfileConfig, writeProfileConfig } from "../agent/profile/storage.js";
import { MessageAggregator, type BlockChunkerConfig, DEFAULT_CHUNKER_CONFIG } from "./message-aggregator.js";
export class Hub {
private readonly agents = new Map<string, AsyncAgent>();
@ -37,6 +38,7 @@ export class Hub {
private readonly agentStreamIds = new Map<string, string>();
private readonly agentStreamCounters = new Map<string, number>();
private readonly localApprovalHandlers = new Map<string, (payload: ExecApprovalRequest) => void>();
private readonly agentAggregators = new Map<string, MessageAggregator>();
private readonly rpc: RpcDispatcher;
private readonly approvalManager: ExecApprovalManager;
private client: GatewayClient;
@ -243,6 +245,60 @@ export class Hub {
return this.approvalManager.resolveApproval(approvalId, decision);
}
/**
* Enable message aggregation for an agent.
* When enabled, streaming text deltas are buffered and emitted as complete block replies
* instead of being forwarded as raw events. Useful for third-party messaging integrations.
*/
enableAggregation(agentId: string, config?: Partial<BlockChunkerConfig>): void {
const fullConfig = { ...DEFAULT_CHUNKER_CONFIG, ...config };
const aggregator = new MessageAggregator(
fullConfig,
(block) => {
const targetDeviceId = this.agentSenders.get(agentId);
if (!targetDeviceId) return;
this.client.send(targetDeviceId, "block-reply", {
agentId,
block,
});
},
(event) => {
const targetDeviceId = this.agentSenders.get(agentId);
if (!targetDeviceId) return;
const isCompactionEvent =
event.type === "compaction_start" || event.type === "compaction_end";
if (isCompactionEvent) {
this.client.send(targetDeviceId, StreamAction, {
streamId: `compaction:${agentId}`,
agentId,
event,
});
return;
}
if (event.type === "message_start") {
this.beginStream(agentId, event);
}
const streamId = this.getActiveStreamId(agentId, event);
this.client.send(targetDeviceId, StreamAction, {
streamId,
agentId,
event,
});
if (event.type === "message_end") {
this.endStream(agentId);
}
},
);
this.agentAggregators.set(agentId, aggregator);
}
/** Disable message aggregation for an agent (reverts to streaming mode). */
disableAggregation(agentId: string): void {
this.agentAggregators.delete(agentId);
}
/** Create new Agent, or rebuild with existing ID */
createAgent(id?: string, options?: { persist?: boolean; profileId?: string }): AsyncAgent {
if (id) {
@ -312,20 +368,13 @@ export class Hub {
agentId: agent.sessionId,
content: item.content,
});
} else {
// Compaction events: forward with synthetic streamId (no stream tracking)
const isCompactionEvent =
item.type === "compaction_start" || item.type === "compaction_end";
if (isCompactionEvent) {
this.client.send(targetDeviceId, StreamAction, {
streamId: `compaction:${agent.sessionId}`,
agentId: agent.sessionId,
event: item,
});
continue;
}
continue;
}
// Filter: only forward events useful for frontend rendering
// Filter: only forward events useful for frontend rendering
const isCompactionEvent =
item.type === "compaction_start" || item.type === "compaction_end";
if (!isCompactionEvent) {
const maybeMessage = (item as { message?: { role?: string } }).message;
const isAssistantMessage = maybeMessage?.role === "assistant";
const shouldForward =
@ -333,19 +382,36 @@ export class Hub {
|| item.type === "tool_execution_start"
|| item.type === "tool_execution_end";
if (!shouldForward) continue;
}
if (item.type === "message_start") {
this.beginStream(agent.sessionId, item);
}
const streamId = this.getActiveStreamId(agent.sessionId, item);
// Aggregated mode: buffer text deltas, emit block replies
const aggregator = this.agentAggregators.get(agent.sessionId);
if (aggregator) {
aggregator.handleEvent(item);
continue;
}
// Streaming mode: forward events as-is (existing behavior)
if (isCompactionEvent) {
this.client.send(targetDeviceId, StreamAction, {
streamId,
streamId: `compaction:${agent.sessionId}`,
agentId: agent.sessionId,
event: item,
});
if (item.type === "message_end") {
this.endStream(agent.sessionId);
}
continue;
}
if (item.type === "message_start") {
this.beginStream(agent.sessionId, item);
}
const streamId = this.getActiveStreamId(agent.sessionId, item);
this.client.send(targetDeviceId, StreamAction, {
streamId,
agentId: agent.sessionId,
event: item,
});
if (item.type === "message_end") {
this.endStream(agent.sessionId);
}
}
}
@ -503,6 +569,7 @@ export class Hub {
this.agentStreamIds.delete(id);
this.agentStreamCounters.delete(id);
this.localApprovalHandlers.delete(id);
this.agentAggregators.delete(id);
removeAgentRecord(id);
return true;
}
@ -518,6 +585,7 @@ export class Hub {
this.agentStreamIds.delete(id);
this.agentStreamCounters.delete(id);
this.localApprovalHandlers.delete(id);
this.agentAggregators.delete(id);
}
this.client.disconnect();
console.log("Hub shut down");