feat(hub): propagate compaction events to all frontend transports

Forward compaction_start/compaction_end events through Hub (Gateway path)
and Desktop IPC (local path) to the Zustand messages store. Adds
CompactionEvent types to the SDK, compacting/lastCompaction state to
useMessagesStore, and event routing in both connection-store and
use-local-chat.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
yushen 2026-02-05 14:55:00 +08:00
parent 70f3755552
commit 1316d329ee
8 changed files with 113 additions and 6 deletions

View file

@ -280,6 +280,19 @@ export function registerHubIpcHandlers(): void {
return
}
// Compaction events: forward with no stream tracking
const isCompactionEvent =
event.type === 'compaction_start' || event.type === 'compaction_end'
if (isCompactionEvent) {
safeLog(`[IPC] Sending compaction event to renderer: ${event.type}`)
mainWindowRef.webContents.send('localChat:event', {
agentId,
streamId: null,
event,
})
return
}
// Filter events same as Hub.consumeAgent()
const maybeMessage = (event as { message?: { role?: string } }).message
const isAssistantMessage = maybeMessage?.role === 'assistant'

View file

@ -78,7 +78,26 @@ export function useLocalChat({ agentId }: UseLocalChatOptions): UseLocalChatRetu
// Handle agent events - same logic as connection-store.ts
const agentEvent = event.event
const streamId = event.streamId
if (!agentEvent || !streamId) return
if (!agentEvent) return
// Handle compaction events (no streamId required)
if (agentEvent.type === 'compaction_start') {
store.startCompaction()
return
}
if (agentEvent.type === 'compaction_end') {
const evt = agentEvent as { removed: number; kept: number; tokensRemoved?: number; tokensKept?: number; reason: string }
store.endCompaction({
removed: evt.removed,
kept: evt.kept,
tokensRemoved: evt.tokensRemoved,
tokensKept: evt.tokensKept,
reason: evt.reason,
})
return
}
if (!streamId) return
if (agentEvent.type === 'message_start') {
currentStreamRef.current = streamId

View file

@ -34,6 +34,9 @@ export {
StreamAction,
type StreamPayload,
type AgentEvent,
type CompactionEvent,
type CompactionStartEvent,
type CompactionEndEvent,
type ContentBlock,
type TextContent,
type ThinkingContent,

View file

@ -25,16 +25,36 @@ export type { AgentEvent };
*/
export type ContentBlock = TextContent | ThinkingContent | ToolCall | ImageContent;
// --- Compaction event types (Multica-specific, not from pi-agent-core) ---
/** Emitted when context compaction begins */
export type CompactionStartEvent = {
type: "compaction_start";
};
/** Emitted when context compaction completes */
export type CompactionEndEvent = {
type: "compaction_end";
removed: number;
kept: number;
tokensRemoved?: number;
tokensKept?: number;
reason: string;
};
/** Union of all compaction events */
export type CompactionEvent = CompactionStartEvent | CompactionEndEvent;
// --- Stream event types ---
/**
* Hub forwards AgentEvent from pi-agent-core as-is.
* StreamPayload wraps it with routing metadata.
* Hub forwards AgentEvent from pi-agent-core and CompactionEvent as-is.
* StreamPayload wraps them with routing metadata.
*/
export interface StreamPayload {
streamId: string;
agentId: string;
event: AgentEvent;
event: AgentEvent | CompactionEvent;
}
/** Extract thinking/reasoning content from an AgentEvent that carries a message */

View file

@ -143,6 +143,21 @@ function createClient(
case "tool_execution_update":
// Partial results — not rendered yet, ignored for now
break
case "compaction_start": {
store.startCompaction()
break
}
case "compaction_end": {
const evt = event as { removed: number; kept: number; tokensRemoved?: number; tokensKept?: number; reason: string }
store.endCompaction({
removed: evt.removed,
kept: evt.kept,
tokensRemoved: evt.tokensRemoved,
tokensKept: evt.tokensKept,
reason: evt.reason,
})
break
}
}
return
}

View file

@ -2,6 +2,6 @@ export { useConnectionStore } from "./connection-store"
export type { ConnectionStore } from "./connection-store"
export { useAutoConnect } from "./use-auto-connect"
export { useMessagesStore } from "./messages"
export type { Message, MessagesStore, SendContext, ToolStatus } from "./messages"
export type { Message, MessagesStore, SendContext, ToolStatus, CompactionStats } from "./messages"
export { parseConnectionCode, saveConnection, loadConnection, clearConnection } from "./connection"
export type { ConnectionInfo } from "./connection"

View file

@ -15,6 +15,14 @@ import type { ContentBlock } from "@multica/sdk"
export type ToolStatus = "running" | "success" | "error" | "interrupted"
export interface CompactionStats {
removed: number
kept: number
tokensRemoved?: number
tokensKept?: number
reason: string
}
export interface Message {
id: string
role: "user" | "assistant" | "toolResult"
@ -40,6 +48,8 @@ export interface SendContext {
interface MessagesState {
messages: Message[]
streamingIds: Set<string>
compacting: boolean
lastCompaction: CompactionStats | null
}
interface MessagesActions {
@ -56,6 +66,9 @@ interface MessagesActions {
// Tool execution lifecycle
startToolExecution: (agentId: string, toolCallId: string, toolName: string, args?: unknown) => void
endToolExecution: (toolCallId: string, result?: unknown, isError?: boolean) => void
// Compaction lifecycle
startCompaction: () => void
endCompaction: (stats: CompactionStats) => void
}
export type MessagesStore = MessagesState & MessagesActions
@ -63,6 +76,8 @@ export type MessagesStore = MessagesState & MessagesActions
export const useMessagesStore = create<MessagesStore>()((set, get) => ({
messages: [],
streamingIds: new Set<string>(),
compacting: false,
lastCompaction: null,
sendMessage: (text, ctx) => {
get().addUserMessage(text, ctx.agentId)
@ -102,7 +117,7 @@ export const useMessagesStore = create<MessagesStore>()((set, get) => ({
},
clearMessages: () => {
set({ messages: [], streamingIds: new Set() })
set({ messages: [], streamingIds: new Set(), compacting: false, lastCompaction: null })
},
// --- Streaming: build assistant message incrementally ---
@ -180,4 +195,14 @@ export const useMessagesStore = create<MessagesStore>()((set, get) => ({
),
}))
},
// --- Compaction lifecycle ---
startCompaction: () => {
set({ compacting: true })
},
endCompaction: (stats) => {
set({ compacting: false, lastCompaction: stats })
},
}))

View file

@ -256,6 +256,18 @@ export class Hub {
content: item.content,
});
} else {
// Compaction events: forward with synthetic streamId (no stream tracking)
const isCompactionEvent =
item.type === "compaction_start" || item.type === "compaction_end";
if (isCompactionEvent) {
this.client.send(targetDeviceId, StreamAction, {
streamId: `compaction:${agent.sessionId}`,
agentId: agent.sessionId,
event: item,
});
continue;
}
// Filter: only forward events useful for frontend rendering
const maybeMessage = (item as { message?: { role?: string } }).message;
const isAssistantMessage = maybeMessage?.role === "assistant";