From 23da5a35ffd30631cbb6cbbacd9749fc61cb52c2 Mon Sep 17 00:00:00 2001 From: Naiyuan Qing <145280634+NevilleQingNY@users.noreply.github.com> Date: Mon, 9 Feb 2026 09:44:39 +0800 Subject: [PATCH] feat(channels): route media messages through agent Add writeWithImages() to AsyncAgent for passing images directly to the LLM via ImageContent. Extend Agent.run() to accept optional images parameter. Update ChannelManager.routeIncoming() to download media files and forward them: images as ImageContent to the LLM, audio/video/document as file paths for agent-driven processing. Co-Authored-By: Claude Opus 4.6 --- src/agent/async-agent.ts | 24 +++++++++++++++++++++ src/agent/runner.ts | 5 +++-- src/channels/manager.ts | 46 ++++++++++++++++++++++++++++++++++++++-- 3 files changed, 71 insertions(+), 4 deletions(-) diff --git a/src/agent/async-agent.ts b/src/agent/async-agent.ts index 910e2742..be090e4b 100644 --- a/src/agent/async-agent.ts +++ b/src/agent/async-agent.ts @@ -1,5 +1,6 @@ import { v7 as uuidv7 } from "uuid"; import type { AgentEvent, AgentMessage } from "@mariozechner/pi-agent-core"; +import type { ImageContent } from "@mariozechner/pi-ai"; import { Agent } from "./runner.js"; import { Channel } from "./channel.js"; import type { AgentOptions, Message } from "./types.js"; @@ -61,6 +62,29 @@ export class AsyncAgent { }); } + /** Write message with images to agent (non-blocking, serialized queue) */ + writeWithImages(content: string, images: ImageContent[]): void { + if (this._closed) throw new Error("Agent is closed"); + + this.queue = this.queue + .then(async () => { + if (this._closed) return; + const result = await this.agent.run(content, images); + await this.agent.flushSession(); + if (result.error) { + console.error(`[AsyncAgent] Agent run error: ${result.error}`); + this.channel.send({ id: uuidv7(), content: `[error] ${result.error}` }); + this.agent.emitMulticaEvent({ type: "agent_error", error: result.error }); + } + }) + .catch((err) => { + const message = err instanceof Error ? err.message : String(err); + console.error(`[AsyncAgent] Agent run exception: ${message}`); + this.channel.send({ id: uuidv7(), content: `[error] ${message}` }); + this.agent.emitMulticaEvent({ type: "agent_error", error: message }); + }); + } + /** Continuously read channel stream (AgentEvent + error Messages) */ read(): AsyncIterable { return this.channel; diff --git a/src/agent/runner.ts b/src/agent/runner.ts index 479cd473..1276f927 100644 --- a/src/agent/runner.ts +++ b/src/agent/runner.ts @@ -1,4 +1,5 @@ import { Agent as PiAgentCore, type AgentEvent, type AgentMessage } from "@mariozechner/pi-agent-core"; +import type { ImageContent } from "@mariozechner/pi-ai"; import { v7 as uuidv7 } from "uuid"; import type { AgentOptions, AgentRunResult, ReasoningMode } from "./types.js"; import type { MulticaEvent } from "./events.js"; @@ -352,7 +353,7 @@ export class Agent { } } - async run(prompt: string): Promise { + async run(prompt: string, images?: ImageContent[]): Promise { await this.ensureInitialized(); this.output.state.lastAssistantText = ""; @@ -362,7 +363,7 @@ export class Agent { // Loop to exhaust all candidate profiles on rotatable errors while (true) { try { - await this.agent.prompt(prompt); + await this.agent.prompt(prompt, images); break; // success — exit loop } catch (error) { lastError = error; diff --git a/src/channels/manager.ts b/src/channels/manager.ts index bccd05b8..8ecd1da7 100644 --- a/src/channels/manager.ts +++ b/src/channels/manager.ts @@ -8,6 +8,7 @@ * Uses "last route" pattern: whoever sent the last message gets the reply. */ +import { readFile } from "node:fs/promises"; import type { Hub } from "../hub/hub.js"; import type { ChannelPlugin, @@ -266,8 +267,49 @@ export class ChannelManager { // Show typing indicator while agent processes this.startTyping(); - // Same as typing in the desktop chat - agent.write(text); + // Handle media messages + if (message.media && plugin.downloadMedia) { + void this.routeMedia(plugin, accountId, message, agent); + } else { + agent.write(text); + } + } + + /** Download media file and forward to agent */ + private async routeMedia( + plugin: ChannelPlugin, + accountId: string, + message: ChannelMessage, + agent: AsyncAgent, + ): Promise { + const media = message.media!; + + try { + const filePath = await plugin.downloadMedia!(media.fileId, accountId); + + if (media.type === "image") { + // Images: pass directly to LLM as ImageContent + const buffer = await readFile(filePath); + const base64 = buffer.toString("base64"); + const mimeType = media.mimeType ?? "image/jpeg"; + const caption = media.caption || "User sent an image."; + agent.writeWithImages(caption, [{ type: "image", data: base64, mimeType }]); + } else { + // Audio/video/document: tell agent the file path, let it handle via skills + const parts: string[] = []; + parts.push(`[${media.type} message received]`); + parts.push(`File: ${filePath}`); + if (media.mimeType) parts.push(`Type: ${media.mimeType}`); + if (media.duration) parts.push(`Duration: ${media.duration}s`); + if (media.caption) parts.push(`Caption: ${media.caption}`); + agent.write(parts.join("\n")); + } + } catch (err) { + const msg = err instanceof Error ? err.message : String(err); + console.error(`[Channels] Failed to download media: ${msg}`); + // Fallback: send text-only if download fails + agent.write(message.text || `[Failed to download ${media.type}]`); + } } /** Start sending typing indicators (repeats every 5s until stopped) */