refactor(channels): rewrite ChannelManager with lastRoute pattern
Replace per-conversation agent creation with single Hub agent model. Messages from channels are routed to the existing Hub agent via agent.write(), and replies are sent back through the lastRoute context. Desktop and Gateway paths call clearLastRoute() so channel replies stop when the user switches input surface. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
ee95102613
commit
9bb1fd6e12
2 changed files with 160 additions and 97 deletions
|
|
@ -1,14 +1,11 @@
|
|||
/**
|
||||
* Channel Manager — orchestrates channel plugin lifecycles and message routing.
|
||||
* Channel Manager — bridges messaging channels to the Hub's agent.
|
||||
*
|
||||
* For each configured channel account:
|
||||
* 1. Starts the gateway adapter (receive messages)
|
||||
* 2. Routes incoming messages to per-conversation Agents
|
||||
* 3. Collects Agent responses via MessageAggregator
|
||||
* 4. Sends responses back via the outbound adapter
|
||||
* Design: One Hub, one Agent. Channels are just alternative input/output surfaces.
|
||||
* - Incoming: channel message → agent.write(text) (same as desktop/gateway)
|
||||
* - Outgoing: agent reply → check lastRoute → forward to originating channel
|
||||
*
|
||||
* Channel is just a messenger — it doesn't manage context or history.
|
||||
* That's the Agent's job.
|
||||
* Uses "last route" pattern: whoever sent the last message gets the reply.
|
||||
*/
|
||||
|
||||
import type { Hub } from "../hub/hub.js";
|
||||
|
|
@ -30,12 +27,22 @@ interface AccountHandle {
|
|||
state: ChannelAccountState;
|
||||
}
|
||||
|
||||
/** Tracks where the last message came from, so replies go back there. */
|
||||
interface LastRoute {
|
||||
plugin: ChannelPlugin;
|
||||
deliveryCtx: DeliveryContext;
|
||||
}
|
||||
|
||||
export class ChannelManager {
|
||||
private readonly hub: Hub;
|
||||
/** Running accounts keyed by "channelId:accountId" */
|
||||
private readonly accounts = new Map<string, AccountHandle>();
|
||||
/** Agents keyed by "channelId:conversationId" for per-conversation isolation */
|
||||
private readonly conversationAgents = new Map<string, AsyncAgent>();
|
||||
/** Where the last channel message came from (reply target) */
|
||||
private lastRoute: LastRoute | null = null;
|
||||
/** Unsubscribe function for the agent subscriber */
|
||||
private agentUnsubscribe: (() => void) | null = null;
|
||||
/** Current aggregator for buffering streaming responses */
|
||||
private aggregator: MessageAggregator | null = null;
|
||||
|
||||
constructor(hub: Hub) {
|
||||
this.hub = hub;
|
||||
|
|
@ -68,6 +75,9 @@ export class ChannelManager {
|
|||
await this.startAccount(plugin.id, accountId, account);
|
||||
}
|
||||
}
|
||||
|
||||
// Subscribe to the Hub's agent for outbound routing
|
||||
this.subscribeToAgent();
|
||||
}
|
||||
|
||||
/** Start a specific channel account */
|
||||
|
|
@ -100,8 +110,6 @@ export class ChannelManager {
|
|||
console.log(`[Channels] Starting ${key}`);
|
||||
|
||||
try {
|
||||
// Start gateway — this begins receiving messages
|
||||
// The promise may resolve immediately (polling started) or stay pending (long-connection)
|
||||
const startPromise = plugin.gateway.start(
|
||||
accountId,
|
||||
accountConfig,
|
||||
|
|
@ -111,8 +119,6 @@ export class ChannelManager {
|
|||
abortController.signal,
|
||||
);
|
||||
|
||||
// Don't await forever — the start() might be long-running (e.g. polling loop)
|
||||
// Give it a moment to fail fast if credentials are wrong
|
||||
await Promise.race([
|
||||
startPromise,
|
||||
new Promise<void>((resolve) => setTimeout(resolve, 3000)),
|
||||
|
|
@ -127,36 +133,102 @@ export class ChannelManager {
|
|||
}
|
||||
}
|
||||
|
||||
/** Stop all running channel accounts */
|
||||
stopAll(): void {
|
||||
console.log("[Channels] Stopping all channels...");
|
||||
for (const [key, handle] of this.accounts) {
|
||||
handle.abortController.abort();
|
||||
handle.state = { ...handle.state, status: "stopped" };
|
||||
console.log(`[Channels] Stopped ${key}`);
|
||||
/** Get the Hub's current agent (the first active one) */
|
||||
private getHubAgent(): AsyncAgent | undefined {
|
||||
const agentIds = this.hub.listAgents();
|
||||
if (agentIds.length === 0) {
|
||||
console.warn("[Channels] No agent available in Hub");
|
||||
return undefined;
|
||||
}
|
||||
this.accounts.clear();
|
||||
this.conversationAgents.clear();
|
||||
}
|
||||
|
||||
/** Get or create an Agent for a specific conversation */
|
||||
private getOrCreateAgent(channelId: string, conversationId: string): AsyncAgent {
|
||||
const key = `${channelId}:${conversationId}`;
|
||||
const existing = this.conversationAgents.get(key);
|
||||
if (existing && !existing.closed) {
|
||||
return existing;
|
||||
}
|
||||
|
||||
const agent = this.hub.createAgent();
|
||||
this.conversationAgents.set(key, agent);
|
||||
const agent = this.hub.getAgent(agentIds[0]!);
|
||||
return agent;
|
||||
}
|
||||
|
||||
/**
|
||||
* Route an incoming message to the appropriate Agent and wire the response
|
||||
* back to the channel via MessageAggregator.
|
||||
*
|
||||
* This is the core bridge logic — generalized for any channel.
|
||||
* Subscribe to the Hub's agent events (once, persistent).
|
||||
* When AI replies and lastRoute points to a channel, forward the reply there.
|
||||
*/
|
||||
private subscribeToAgent(): void {
|
||||
const agent = this.getHubAgent();
|
||||
if (!agent) {
|
||||
console.warn("[Channels] No agent to subscribe to, channel replies will not be routed");
|
||||
return;
|
||||
}
|
||||
|
||||
console.log(`[Channels] Subscribing to agent ${agent.sessionId} for outbound routing`);
|
||||
|
||||
this.agentUnsubscribe = agent.subscribe((event) => {
|
||||
// No active channel route — skip (reply goes to desktop/gateway only)
|
||||
if (!this.lastRoute) return;
|
||||
|
||||
// Handle agent errors — notify the channel user
|
||||
if (event.type === "agent_error") {
|
||||
const errorMsg = (event as { error?: string }).error ?? "Unknown error";
|
||||
console.error(`[Channels] Agent error: ${errorMsg}`);
|
||||
const route = this.lastRoute;
|
||||
if (route) {
|
||||
void route.plugin.outbound.sendText(route.deliveryCtx, `[Error] ${errorMsg}`).catch((err) => {
|
||||
console.error(`[Channels] Failed to send error to channel: ${err}`);
|
||||
});
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
const maybeMessage = (event as { message?: { role?: string } }).message;
|
||||
const role = maybeMessage?.role;
|
||||
|
||||
// Only forward assistant message events
|
||||
if (event.type === "message_start" || event.type === "message_update" || event.type === "message_end") {
|
||||
if (role !== "assistant") return;
|
||||
} else {
|
||||
// Non-message events (tool_execution etc.) — skip for channels
|
||||
return;
|
||||
}
|
||||
|
||||
// Ensure aggregator exists for this response
|
||||
if (event.type === "message_start") {
|
||||
this.createAggregator();
|
||||
}
|
||||
|
||||
if (this.aggregator) {
|
||||
this.aggregator.handleEvent(event);
|
||||
}
|
||||
|
||||
// Clean up after response complete
|
||||
if (event.type === "message_end" && role === "assistant") {
|
||||
this.aggregator = null;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/** Create a fresh aggregator wired to the current lastRoute */
|
||||
private createAggregator(): void {
|
||||
const route = this.lastRoute;
|
||||
if (!route) return;
|
||||
|
||||
const { plugin, deliveryCtx } = route;
|
||||
const chunkerConfig = plugin.chunkerConfig ?? DEFAULT_CHUNKER_CONFIG;
|
||||
|
||||
this.aggregator = new MessageAggregator(
|
||||
chunkerConfig,
|
||||
async (block) => {
|
||||
try {
|
||||
console.log(`[Channels] Sending block ${block.index} (${block.text.length} chars${block.isFinal ? ", final" : ""}) → ${deliveryCtx.channel}:${deliveryCtx.conversationId}`);
|
||||
if (block.index === 0) {
|
||||
await plugin.outbound.replyText(deliveryCtx, block.text);
|
||||
} else {
|
||||
await plugin.outbound.sendText(deliveryCtx, block.text);
|
||||
}
|
||||
} catch (err) {
|
||||
console.error(`[Channels] Failed to send reply: ${err}`);
|
||||
}
|
||||
},
|
||||
() => {},
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Incoming channel message → update lastRoute → forward to Hub's agent.
|
||||
*/
|
||||
private routeIncoming(
|
||||
plugin: ChannelPlugin,
|
||||
|
|
@ -165,67 +237,57 @@ export class ChannelManager {
|
|||
): void {
|
||||
const { conversationId, senderId, text, messageId } = message;
|
||||
console.log(
|
||||
`[Channels] Incoming message: channel=${plugin.id} conv=${conversationId} sender=${senderId} text="${text.slice(0, 50)}${text.length > 50 ? "..." : ""}"`,
|
||||
`[Channels] Incoming: channel=${plugin.id} conv=${conversationId} sender=${senderId} text="${text.slice(0, 50)}${text.length > 50 ? "..." : ""}"`,
|
||||
);
|
||||
|
||||
// Find or create Agent for this conversation
|
||||
const agent = this.getOrCreateAgent(plugin.id, conversationId);
|
||||
const isNew = !this.conversationAgents.has(`${plugin.id}:${conversationId}`) ? "new" : "existing";
|
||||
console.log(`[Channels] Routing to agent: key=${plugin.id}:${conversationId} agentId=${agent.sessionId} (${isNew})`);
|
||||
const agent = this.getHubAgent();
|
||||
if (!agent) {
|
||||
console.error("[Channels] No agent available, dropping message");
|
||||
return;
|
||||
}
|
||||
|
||||
// Build delivery context for outbound replies
|
||||
const deliveryCtx: DeliveryContext = {
|
||||
channel: plugin.id,
|
||||
accountId,
|
||||
conversationId,
|
||||
replyToMessageId: messageId,
|
||||
// Update last route — replies will go back here
|
||||
this.lastRoute = {
|
||||
plugin,
|
||||
deliveryCtx: {
|
||||
channel: plugin.id,
|
||||
accountId,
|
||||
conversationId,
|
||||
replyToMessageId: messageId,
|
||||
},
|
||||
};
|
||||
console.log(`[Channels] lastRoute updated → ${plugin.id}:${conversationId}`);
|
||||
console.log(`[Channels] Forwarding to agent ${agent.sessionId}`);
|
||||
|
||||
// Use channel-specific chunker config or defaults
|
||||
const chunkerConfig = plugin.chunkerConfig ?? DEFAULT_CHUNKER_CONFIG;
|
||||
|
||||
// Create a fresh aggregator for this message's response
|
||||
const aggregator = new MessageAggregator(
|
||||
chunkerConfig,
|
||||
async (block) => {
|
||||
try {
|
||||
console.log(`[Channels] Block ${block.index} ready (${block.text.length} chars${block.isFinal ? ", final" : ""}), sending reply`);
|
||||
if (block.index === 0) {
|
||||
await plugin.outbound.replyText(deliveryCtx, block.text);
|
||||
} else {
|
||||
await plugin.outbound.sendText(deliveryCtx, block.text);
|
||||
}
|
||||
if (block.isFinal) {
|
||||
console.log(`[Channels] Response complete: channel=${plugin.id} conv=${conversationId} blocks=${block.index + 1}`);
|
||||
}
|
||||
} catch (err) {
|
||||
console.error(`[Channels] Failed to send reply: ${err}`);
|
||||
}
|
||||
},
|
||||
(_event) => {
|
||||
// Pass-through events (tool_execution, compaction, etc.)
|
||||
// Could add typing indicators per-channel later
|
||||
},
|
||||
);
|
||||
|
||||
// Subscribe to agent events BEFORE writing the message
|
||||
console.log("[Channels] Agent subscribed, sending message to agent");
|
||||
const unsubscribe = agent.subscribe((event) => {
|
||||
aggregator.handleEvent(event);
|
||||
|
||||
// Unsubscribe after the response is complete
|
||||
if (event.type === "message_end") {
|
||||
const maybeMessage = (event as { message?: { role?: string } }).message;
|
||||
if (maybeMessage?.role === "assistant") {
|
||||
unsubscribe();
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// Send user message to the agent
|
||||
// Same as typing in the desktop chat
|
||||
agent.write(text);
|
||||
}
|
||||
|
||||
/** Stop all running channel accounts */
|
||||
stopAll(): void {
|
||||
console.log("[Channels] Stopping all channels...");
|
||||
if (this.agentUnsubscribe) {
|
||||
this.agentUnsubscribe();
|
||||
this.agentUnsubscribe = null;
|
||||
}
|
||||
for (const [key, handle] of this.accounts) {
|
||||
handle.abortController.abort();
|
||||
handle.state = { ...handle.state, status: "stopped" };
|
||||
console.log(`[Channels] Stopped ${key}`);
|
||||
}
|
||||
this.accounts.clear();
|
||||
this.lastRoute = null;
|
||||
this.aggregator = null;
|
||||
}
|
||||
|
||||
/** Clear the last route (e.g. when desktop user sends a message) */
|
||||
clearLastRoute(): void {
|
||||
if (this.lastRoute) {
|
||||
console.log("[Channels] lastRoute cleared (non-channel message received)");
|
||||
this.lastRoute = null;
|
||||
}
|
||||
}
|
||||
|
||||
/** Get status of all accounts */
|
||||
listAccountStates(): ChannelAccountState[] {
|
||||
return Array.from(this.accounts.values()).map((h) => ({ ...h.state }));
|
||||
|
|
|
|||
|
|
@ -197,6 +197,7 @@ export class Hub {
|
|||
const agent = this.agents.get(agentId);
|
||||
if (agent && !agent.closed) {
|
||||
this.agentSenders.set(agentId, msg.from);
|
||||
this.channelManager.clearLastRoute();
|
||||
agent.write(content);
|
||||
} else {
|
||||
console.warn(`[Hub] Agent not found or closed: ${agentId}`);
|
||||
|
|
@ -323,12 +324,12 @@ export class Hub {
|
|||
content: item.content,
|
||||
});
|
||||
} else {
|
||||
// Compaction events: forward with synthetic streamId (no stream tracking)
|
||||
const isCompactionEvent =
|
||||
item.type === "compaction_start" || item.type === "compaction_end";
|
||||
if (isCompactionEvent) {
|
||||
// Passthrough events: forward with synthetic streamId (no stream tracking)
|
||||
const isPassthroughEvent =
|
||||
item.type === "compaction_start" || item.type === "compaction_end" || item.type === "agent_error";
|
||||
if (isPassthroughEvent) {
|
||||
this.client.send(targetDeviceId, StreamAction, {
|
||||
streamId: `compaction:${agent.sessionId}`,
|
||||
streamId: `system:${agent.sessionId}`,
|
||||
agentId: agent.sessionId,
|
||||
event: item,
|
||||
});
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue