diff --git a/src/agent/async-agent.ts b/src/agent/async-agent.ts index be090e4b..29ad1631 100644 --- a/src/agent/async-agent.ts +++ b/src/agent/async-agent.ts @@ -1,6 +1,5 @@ import { v7 as uuidv7 } from "uuid"; import type { AgentEvent, AgentMessage } from "@mariozechner/pi-agent-core"; -import type { ImageContent } from "@mariozechner/pi-ai"; import { Agent } from "./runner.js"; import { Channel } from "./channel.js"; import type { AgentOptions, Message } from "./types.js"; @@ -38,12 +37,17 @@ export class AsyncAgent { /** Write message to agent (non-blocking, serialized queue) */ write(content: string): void { + this.enqueue(() => this.agent.run(content)); + } + + /** Enqueue an agent run, handling errors and session flush */ + private enqueue(runFn: () => ReturnType): void { if (this._closed) throw new Error("Agent is closed"); this.queue = this.queue .then(async () => { if (this._closed) return; - const result = await this.agent.run(content); + const result = await runFn(); // Flush pending session writes so waitForIdle() callers // can safely read session data from disk. await this.agent.flushSession(); @@ -62,29 +66,6 @@ export class AsyncAgent { }); } - /** Write message with images to agent (non-blocking, serialized queue) */ - writeWithImages(content: string, images: ImageContent[]): void { - if (this._closed) throw new Error("Agent is closed"); - - this.queue = this.queue - .then(async () => { - if (this._closed) return; - const result = await this.agent.run(content, images); - await this.agent.flushSession(); - if (result.error) { - console.error(`[AsyncAgent] Agent run error: ${result.error}`); - this.channel.send({ id: uuidv7(), content: `[error] ${result.error}` }); - this.agent.emitMulticaEvent({ type: "agent_error", error: result.error }); - } - }) - .catch((err) => { - const message = err instanceof Error ? err.message : String(err); - console.error(`[AsyncAgent] Agent run exception: ${message}`); - this.channel.send({ id: uuidv7(), content: `[error] ${message}` }); - this.agent.emitMulticaEvent({ type: "agent_error", error: message }); - }); - } - /** Continuously read channel stream (AgentEvent + error Messages) */ read(): AsyncIterable { return this.channel; diff --git a/src/channels/manager.ts b/src/channels/manager.ts index 3c3dd92f..e54b97ad 100644 --- a/src/channels/manager.ts +++ b/src/channels/manager.ts @@ -48,6 +48,8 @@ export class ChannelManager { private lastRoute: LastRoute | null = null; /** Unsubscribe function for the agent subscriber */ private agentUnsubscribe: (() => void) | null = null; + /** Session ID of the currently subscribed agent (for stale detection) */ + private subscribedAgentId: string | null = null; /** Current aggregator for buffering streaming responses */ private aggregator: MessageAggregator | null = null; /** Typing indicator interval (repeats every 5s to keep Telegram typing visible) */ @@ -85,8 +87,8 @@ export class ChannelManager { } } - // Subscribe to the Hub's agent for outbound routing - this.subscribeToAgent(); + // Try to subscribe eagerly; if no agent yet, routeIncoming will retry lazily + this.ensureSubscribed(); } /** Start a specific channel account */ @@ -154,17 +156,25 @@ export class ChannelManager { } /** - * Subscribe to the Hub's agent events (once, persistent). - * When AI replies and lastRoute points to a channel, forward the reply there. + * Ensure we're subscribed to the current Hub agent for outbound routing. + * Lazily called from routeIncoming — handles agent not yet available at + * startup and re-subscribes if the agent has changed. */ - private subscribeToAgent(): void { + private ensureSubscribed(): void { const agent = this.getHubAgent(); - if (!agent) { - console.warn("[Channels] No agent to subscribe to, channel replies will not be routed"); - return; + if (!agent) return; + + // Already subscribed to the current agent + if (this.subscribedAgentId === agent.sessionId) return; + + // Unsubscribe from stale agent + if (this.agentUnsubscribe) { + console.log(`[Channels] Agent changed, re-subscribing (${this.subscribedAgentId} → ${agent.sessionId})`); + this.agentUnsubscribe(); } console.log(`[Channels] Subscribing to agent ${agent.sessionId} for outbound routing`); + this.subscribedAgentId = agent.sessionId; this.agentUnsubscribe = agent.subscribe((event) => { // No active channel route — skip (reply goes to desktop/gateway only) @@ -257,6 +267,9 @@ export class ChannelManager { return; } + // Ensure we're subscribed to this agent (handles late startup / agent change) + this.ensureSubscribed(); + // Update last route — replies will go back here this.lastRoute = { plugin, diff --git a/src/channels/plugins/telegram-format.ts b/src/channels/plugins/telegram-format.ts index c5f22cc7..13711921 100644 --- a/src/channels/plugins/telegram-format.ts +++ b/src/channels/plugins/telegram-format.ts @@ -50,8 +50,10 @@ export function markdownToTelegramHtml(markdown: string): string { // 3. Escape HTML in remaining text text = escapeHtml(text); - // 4. Links: [text](url) - text = text.replace(/\[([^\]]+)\]\(([^)]+)\)/g, '$1'); + // 4. Links: [text](url) — escape quotes in URL to prevent attribute breakout + text = text.replace(/\[([^\]]+)\]\(([^)]+)\)/g, (_m, label: string, url: string) => + `${label}`, + ); // 5. Bold: **text** or __text__ text = text.replace(/\*\*(.+?)\*\*/g, "$1"); diff --git a/src/channels/plugins/telegram.ts b/src/channels/plugins/telegram.ts index a07512b4..ecfa33a5 100644 --- a/src/channels/plugins/telegram.ts +++ b/src/channels/plugins/telegram.ts @@ -57,7 +57,7 @@ export const telegramChannel: ChannelPlugin = { description: "Telegram bot integration via long polling", }, chunkerConfig: { - minChars: 200, + minChars: 3800, // Buffer the full response; only chunk when approaching platform limit maxChars: 4000, // Telegram API limit: 4096; leave room for HTML formatting overhead breakPreference: "paragraph", }, @@ -89,10 +89,11 @@ export const telegramChannel: ChannelPlugin = { const bot = new Bot(botToken); bots.set(accountId, bot); - // Get bot info for mention detection + // Get bot info for mention/reply detection const botInfo = await bot.api.getMe(); + const botId = botInfo.id; const botUsername = botInfo.username; - console.log(`[Telegram] Starting bot: @${botUsername}`); + console.log(`[Telegram] Starting bot: @${botUsername} (id=${botId})`); // Handle text messages bot.on("message:text", (ctx) => { @@ -106,7 +107,7 @@ export const telegramChannel: ChannelPlugin = { e.type === "mention" && msg.text.substring(e.offset, e.offset + e.length).toLowerCase() === `@${botUsername?.toLowerCase()}`, ); - const isReplyToBot = msg.reply_to_message?.from?.is_bot === true; + const isReplyToBot = msg.reply_to_message?.from?.id === botId; if (!isMentioned && !isReplyToBot) { return; // Ignore group messages not directed at bot @@ -175,8 +176,12 @@ export const telegramChannel: ChannelPlugin = { const isGroup = msg.chat.type === "group" || msg.chat.type === "supergroup"; if (isGroup) { - const isReplyToBot = msg.reply_to_message?.from?.is_bot === true; - if (!isReplyToBot) return; + const isReplyToBot = msg.reply_to_message?.from?.id === botId; + const caption = (msg as any).caption as string | undefined; + const isMentionedInCaption = caption && botUsername + ? caption.toLowerCase().includes(`@${botUsername.toLowerCase()}`) + : false; + if (!isReplyToBot && !isMentionedInCaption) return; } const media = getMedia(msg); diff --git a/src/hub/hub.ts b/src/hub/hub.ts index 780a3c1b..71f22127 100644 --- a/src/hub/hub.ts +++ b/src/hub/hub.ts @@ -112,6 +112,9 @@ export class Hub { this.channelManager = new ChannelManager(this); void this.channelManager.startAll().then(() => { console.log("[Hub] Channel system started"); + }).catch((err) => { + const msg = err instanceof Error ? err.message : String(err); + console.error(`[Hub] Channel system failed to start: ${msg}`); }); } diff --git a/src/media/describe-image.ts b/src/media/describe-image.ts index 700f5ca2..eda3ef58 100644 --- a/src/media/describe-image.ts +++ b/src/media/describe-image.ts @@ -7,10 +7,13 @@ * @see docs/channels/media-handling.md — Media processing pipeline */ -import { readFile } from "node:fs/promises"; +import { readFile, stat } from "node:fs/promises"; import { extname } from "node:path"; import { credentialManager } from "../agent/credentials.js"; +/** Max image file size: 20MB (OpenAI API limit) */ +const MAX_IMAGE_SIZE = 20 * 1024 * 1024; + /** Map file extension to MIME type for common image formats */ function mimeFromExt(filePath: string): string { const ext = extname(filePath).toLowerCase(); @@ -33,6 +36,13 @@ export async function describeImage(filePath: string): Promise { const apiKey = config?.apiKey; if (!apiKey) return null; + // Check file size to avoid OOM and API payload limits + const fileStat = await stat(filePath); + if (fileStat.size > MAX_IMAGE_SIZE) { + console.warn(`[DescribeImage] File too large (${(fileStat.size / 1024 / 1024).toFixed(1)}MB), skipping`); + return null; + } + const buffer = await readFile(filePath); const base64 = buffer.toString("base64"); const mimeType = mimeFromExt(filePath); diff --git a/src/media/describe-video.ts b/src/media/describe-video.ts index 5ea55f18..4b67672d 100644 --- a/src/media/describe-video.ts +++ b/src/media/describe-video.ts @@ -9,7 +9,7 @@ import { join } from "node:path"; import { execFile } from "node:child_process"; -import { unlink } from "node:fs/promises"; +import { mkdir, unlink } from "node:fs/promises"; import { v7 as uuidv7 } from "uuid"; import { MEDIA_CACHE_DIR } from "../shared/paths.js"; import { describeImage } from "./describe-image.js"; @@ -24,6 +24,9 @@ export async function describeVideo(filePath: string): Promise { const framePath = join(MEDIA_CACHE_DIR, `${uuidv7()}.jpg`); try { + // Ensure output directory exists + await mkdir(MEDIA_CACHE_DIR, { recursive: true }); + // Extract first frame with ffmpeg await new Promise((resolve, reject) => { execFile( diff --git a/src/media/transcribe.ts b/src/media/transcribe.ts index 5f09c9a8..84214f9a 100644 --- a/src/media/transcribe.ts +++ b/src/media/transcribe.ts @@ -138,7 +138,12 @@ export async function transcribeAudio(filePath: string): Promise const config = credentialManager.getLlmProviderConfig("openai"); const apiKey = config?.apiKey; if (apiKey) { - return await transcribeApi(apiKey, filePath); + try { + return await transcribeApi(apiKey, filePath); + } catch (err) { + const msg = err instanceof Error ? err.message : String(err); + console.error(`[Transcribe] Whisper API failed: ${msg}`); + } } // 3. No provider available