fix(channels): address code review issues

Critical:
- describe-video: add mkdir for MEDIA_CACHE_DIR before ffmpeg write
- telegram: check bot ID (not is_bot) for reply-to detection in groups

Important:
- telegram: check @mention in caption for media messages in groups
- hub: add .catch() to channelManager.startAll()
- describe-image: add 20MB file size check to prevent OOM
- async-agent: remove dead writeWithImages, refactor with enqueue()
- manager: lazy agent subscription via ensureSubscribed() to handle
  late agent availability and agent replacement

Suggestions:
- telegram-format: escape quotes in link URLs to prevent HTML breakout
- transcribe: catch API errors and return null (match local fallback)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Naiyuan Qing 2026-02-09 11:11:28 +08:00
parent 49623b4779
commit 614d2cfd88
8 changed files with 66 additions and 44 deletions

View file

@ -1,6 +1,5 @@
import { v7 as uuidv7 } from "uuid";
import type { AgentEvent, AgentMessage } from "@mariozechner/pi-agent-core";
import type { ImageContent } from "@mariozechner/pi-ai";
import { Agent } from "./runner.js";
import { Channel } from "./channel.js";
import type { AgentOptions, Message } from "./types.js";
@ -38,12 +37,17 @@ export class AsyncAgent {
/** Write message to agent (non-blocking, serialized queue) */
write(content: string): void {
this.enqueue(() => this.agent.run(content));
}
/** Enqueue an agent run, handling errors and session flush */
private enqueue(runFn: () => ReturnType<Agent["run"]>): void {
if (this._closed) throw new Error("Agent is closed");
this.queue = this.queue
.then(async () => {
if (this._closed) return;
const result = await this.agent.run(content);
const result = await runFn();
// Flush pending session writes so waitForIdle() callers
// can safely read session data from disk.
await this.agent.flushSession();
@ -62,29 +66,6 @@ export class AsyncAgent {
});
}
/** Write message with images to agent (non-blocking, serialized queue) */
writeWithImages(content: string, images: ImageContent[]): void {
if (this._closed) throw new Error("Agent is closed");
this.queue = this.queue
.then(async () => {
if (this._closed) return;
const result = await this.agent.run(content, images);
await this.agent.flushSession();
if (result.error) {
console.error(`[AsyncAgent] Agent run error: ${result.error}`);
this.channel.send({ id: uuidv7(), content: `[error] ${result.error}` });
this.agent.emitMulticaEvent({ type: "agent_error", error: result.error });
}
})
.catch((err) => {
const message = err instanceof Error ? err.message : String(err);
console.error(`[AsyncAgent] Agent run exception: ${message}`);
this.channel.send({ id: uuidv7(), content: `[error] ${message}` });
this.agent.emitMulticaEvent({ type: "agent_error", error: message });
});
}
/** Continuously read channel stream (AgentEvent + error Messages) */
read(): AsyncIterable<ChannelItem> {
return this.channel;

View file

@ -48,6 +48,8 @@ export class ChannelManager {
private lastRoute: LastRoute | null = null;
/** Unsubscribe function for the agent subscriber */
private agentUnsubscribe: (() => void) | null = null;
/** Session ID of the currently subscribed agent (for stale detection) */
private subscribedAgentId: string | null = null;
/** Current aggregator for buffering streaming responses */
private aggregator: MessageAggregator | null = null;
/** Typing indicator interval (repeats every 5s to keep Telegram typing visible) */
@ -85,8 +87,8 @@ export class ChannelManager {
}
}
// Subscribe to the Hub's agent for outbound routing
this.subscribeToAgent();
// Try to subscribe eagerly; if no agent yet, routeIncoming will retry lazily
this.ensureSubscribed();
}
/** Start a specific channel account */
@ -154,17 +156,25 @@ export class ChannelManager {
}
/**
* Subscribe to the Hub's agent events (once, persistent).
* When AI replies and lastRoute points to a channel, forward the reply there.
* Ensure we're subscribed to the current Hub agent for outbound routing.
* Lazily called from routeIncoming handles agent not yet available at
* startup and re-subscribes if the agent has changed.
*/
private subscribeToAgent(): void {
private ensureSubscribed(): void {
const agent = this.getHubAgent();
if (!agent) {
console.warn("[Channels] No agent to subscribe to, channel replies will not be routed");
return;
if (!agent) return;
// Already subscribed to the current agent
if (this.subscribedAgentId === agent.sessionId) return;
// Unsubscribe from stale agent
if (this.agentUnsubscribe) {
console.log(`[Channels] Agent changed, re-subscribing (${this.subscribedAgentId}${agent.sessionId})`);
this.agentUnsubscribe();
}
console.log(`[Channels] Subscribing to agent ${agent.sessionId} for outbound routing`);
this.subscribedAgentId = agent.sessionId;
this.agentUnsubscribe = agent.subscribe((event) => {
// No active channel route — skip (reply goes to desktop/gateway only)
@ -257,6 +267,9 @@ export class ChannelManager {
return;
}
// Ensure we're subscribed to this agent (handles late startup / agent change)
this.ensureSubscribed();
// Update last route — replies will go back here
this.lastRoute = {
plugin,

View file

@ -50,8 +50,10 @@ export function markdownToTelegramHtml(markdown: string): string {
// 3. Escape HTML in remaining text
text = escapeHtml(text);
// 4. Links: [text](url)
text = text.replace(/\[([^\]]+)\]\(([^)]+)\)/g, '<a href="$2">$1</a>');
// 4. Links: [text](url) — escape quotes in URL to prevent attribute breakout
text = text.replace(/\[([^\]]+)\]\(([^)]+)\)/g, (_m, label: string, url: string) =>
`<a href="${url.replace(/"/g, "&quot;")}">${label}</a>`,
);
// 5. Bold: **text** or __text__
text = text.replace(/\*\*(.+?)\*\*/g, "<b>$1</b>");

View file

@ -57,7 +57,7 @@ export const telegramChannel: ChannelPlugin = {
description: "Telegram bot integration via long polling",
},
chunkerConfig: {
minChars: 200,
minChars: 3800, // Buffer the full response; only chunk when approaching platform limit
maxChars: 4000, // Telegram API limit: 4096; leave room for HTML formatting overhead
breakPreference: "paragraph",
},
@ -89,10 +89,11 @@ export const telegramChannel: ChannelPlugin = {
const bot = new Bot(botToken);
bots.set(accountId, bot);
// Get bot info for mention detection
// Get bot info for mention/reply detection
const botInfo = await bot.api.getMe();
const botId = botInfo.id;
const botUsername = botInfo.username;
console.log(`[Telegram] Starting bot: @${botUsername}`);
console.log(`[Telegram] Starting bot: @${botUsername} (id=${botId})`);
// Handle text messages
bot.on("message:text", (ctx) => {
@ -106,7 +107,7 @@ export const telegramChannel: ChannelPlugin = {
e.type === "mention" &&
msg.text.substring(e.offset, e.offset + e.length).toLowerCase() === `@${botUsername?.toLowerCase()}`,
);
const isReplyToBot = msg.reply_to_message?.from?.is_bot === true;
const isReplyToBot = msg.reply_to_message?.from?.id === botId;
if (!isMentioned && !isReplyToBot) {
return; // Ignore group messages not directed at bot
@ -175,8 +176,12 @@ export const telegramChannel: ChannelPlugin = {
const isGroup = msg.chat.type === "group" || msg.chat.type === "supergroup";
if (isGroup) {
const isReplyToBot = msg.reply_to_message?.from?.is_bot === true;
if (!isReplyToBot) return;
const isReplyToBot = msg.reply_to_message?.from?.id === botId;
const caption = (msg as any).caption as string | undefined;
const isMentionedInCaption = caption && botUsername
? caption.toLowerCase().includes(`@${botUsername.toLowerCase()}`)
: false;
if (!isReplyToBot && !isMentionedInCaption) return;
}
const media = getMedia(msg);

View file

@ -112,6 +112,9 @@ export class Hub {
this.channelManager = new ChannelManager(this);
void this.channelManager.startAll().then(() => {
console.log("[Hub] Channel system started");
}).catch((err) => {
const msg = err instanceof Error ? err.message : String(err);
console.error(`[Hub] Channel system failed to start: ${msg}`);
});
}

View file

@ -7,10 +7,13 @@
* @see docs/channels/media-handling.md Media processing pipeline
*/
import { readFile } from "node:fs/promises";
import { readFile, stat } from "node:fs/promises";
import { extname } from "node:path";
import { credentialManager } from "../agent/credentials.js";
/** Max image file size: 20MB (OpenAI API limit) */
const MAX_IMAGE_SIZE = 20 * 1024 * 1024;
/** Map file extension to MIME type for common image formats */
function mimeFromExt(filePath: string): string {
const ext = extname(filePath).toLowerCase();
@ -33,6 +36,13 @@ export async function describeImage(filePath: string): Promise<string | null> {
const apiKey = config?.apiKey;
if (!apiKey) return null;
// Check file size to avoid OOM and API payload limits
const fileStat = await stat(filePath);
if (fileStat.size > MAX_IMAGE_SIZE) {
console.warn(`[DescribeImage] File too large (${(fileStat.size / 1024 / 1024).toFixed(1)}MB), skipping`);
return null;
}
const buffer = await readFile(filePath);
const base64 = buffer.toString("base64");
const mimeType = mimeFromExt(filePath);

View file

@ -9,7 +9,7 @@
import { join } from "node:path";
import { execFile } from "node:child_process";
import { unlink } from "node:fs/promises";
import { mkdir, unlink } from "node:fs/promises";
import { v7 as uuidv7 } from "uuid";
import { MEDIA_CACHE_DIR } from "../shared/paths.js";
import { describeImage } from "./describe-image.js";
@ -24,6 +24,9 @@ export async function describeVideo(filePath: string): Promise<string | null> {
const framePath = join(MEDIA_CACHE_DIR, `${uuidv7()}.jpg`);
try {
// Ensure output directory exists
await mkdir(MEDIA_CACHE_DIR, { recursive: true });
// Extract first frame with ffmpeg
await new Promise<void>((resolve, reject) => {
execFile(

View file

@ -138,7 +138,12 @@ export async function transcribeAudio(filePath: string): Promise<string | null>
const config = credentialManager.getLlmProviderConfig("openai");
const apiKey = config?.apiKey;
if (apiKey) {
return await transcribeApi(apiKey, filePath);
try {
return await transcribeApi(apiKey, filePath);
} catch (err) {
const msg = err instanceof Error ? err.message : String(err);
console.error(`[Transcribe] Whisper API failed: ${msg}`);
}
}
// 3. No provider available