Merge pull request #151 from multica-ai/forrestchang/telegram-file-send

feat(gateway): handle send_file action in Telegram service
This commit is contained in:
Jiayuan Zhang 2026-02-13 04:28:56 +08:00 committed by GitHub
commit ad26eec7d2
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
13 changed files with 442 additions and 9 deletions

View file

@ -9,7 +9,7 @@
import { Inject, Injectable, Logger } from "@nestjs/common";
import type { OnModuleInit } from "@nestjs/common";
import { Bot, webhookCallback } from "grammy";
import { Bot, InputFile, webhookCallback } from "grammy";
import type { Context } from "grammy";
import { v7 as uuidv7 } from "uuid";
import { parseConnectionCode } from "@multica/store/connection";
@ -121,6 +121,46 @@ export class TelegramService implements OnModuleInit {
}
}
/** Send a file (photo/document/video/audio) to a Telegram user */
private async sendFileToTelegram(
telegramUserId: string,
data: Buffer,
type: string,
caption?: string,
filename?: string,
): Promise<void> {
if (!this.bot) return;
const chatId = Number(telegramUserId);
const inputFile = new InputFile(data, filename);
const extra = caption ? { caption: caption.slice(0, 1024) } : {};
try {
switch (type) {
case "photo":
await this.bot.api.sendPhoto(chatId, inputFile, extra);
break;
case "video":
await this.bot.api.sendVideo(chatId, inputFile, extra);
break;
case "audio":
await this.bot.api.sendAudio(chatId, inputFile, extra);
break;
case "voice":
await this.bot.api.sendVoice(chatId, inputFile, extra);
break;
case "document":
default:
await this.bot.api.sendDocument(chatId, inputFile, extra);
break;
}
this.logger.debug(`Sent ${type} to Telegram: telegramUserId=${telegramUserId}`);
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
this.logger.error(`Failed to send ${type} to Telegram: telegramUserId=${telegramUserId}, error=${message}`);
}
}
/** Setup bot message handlers */
private setupHandlers(): void {
if (!this.bot) return;
@ -390,6 +430,26 @@ export class TelegramService implements OnModuleInit {
return;
}
// Send file — Hub agent wants to send a file to the Telegram user
if (msg.action === "send_file") {
const payload = msg.payload as {
data?: string;
type?: string;
caption?: string;
filename?: string;
};
if (payload?.data) {
void this.sendFileToTelegram(
telegramUserId,
Buffer.from(payload.data, "base64"),
payload.type ?? "document",
payload.caption,
payload.filename,
);
}
return;
}
// Regular message (e.g., "message" action from Hub)
if (msg.action === "message") {
const payload = msg.payload as { content?: string; agentId?: string };

View file

@ -1192,6 +1192,7 @@ export class Agent {
tools: toolNames,
skillsPrompt,
runtime,
channels: this.toolsOptions.channels,
});
}
}

View file

@ -10,6 +10,7 @@ import type {
SystemPromptReport,
} from "./types.js";
import {
buildChannelsSection,
buildHeartbeatSection,
buildConditionalToolSections,
buildExtraPromptSection,
@ -51,6 +52,7 @@ export function buildSystemPromptWithReport(options: SystemPromptOptions): {
skillsPrompt,
runtime,
subagent,
channels,
extraSystemPrompt,
includeSafety = true,
} = options;
@ -70,6 +72,7 @@ export function buildSystemPromptWithReport(options: SystemPromptOptions): {
{ name: "runtime", lines: buildRuntimeSection(runtime, mode) },
{ name: "time-awareness", lines: buildTimeAwarenessSection(tools, mode) },
{ name: "profile-dir", lines: buildProfileDirSection(profileDir, mode) },
{ name: "channels", lines: buildChannelsSection(channels, mode) },
{ name: "subagent", lines: buildSubagentSection(subagent, mode) },
{ name: "extra", lines: buildExtraPromptSection(extraSystemPrompt, mode) },
];

View file

@ -8,6 +8,7 @@ export { formatPromptReport } from "./report.js";
export { SAFETY_CONSTITUTION } from "./constitution.js";
export type {
ChannelInfo,
ProfileContent,
PromptSection,
RuntimeInfo,

View file

@ -8,6 +8,7 @@ import { SAFETY_CONSTITUTION } from "./constitution.js";
import { formatRuntimeLine } from "./runtime-info.js";
import { resolveHeartbeatPrompt } from "../../heartbeat/heartbeat-text.js";
import type {
ChannelInfo,
ProfileContent,
RuntimeInfo,
SubagentContext,
@ -461,6 +462,40 @@ export function buildSubagentSection(
return lines;
}
/**
* Connected channels section tells the agent which messaging channels are active
* and what capabilities they have (e.g. send files). Full mode only.
*/
export function buildChannelsSection(
channels: ChannelInfo[] | undefined,
mode: SystemPromptMode,
): string[] {
if (mode !== "full" || !channels || channels.length === 0) return [];
const lines: string[] = ["## Connected Channels", ""];
for (const ch of channels) {
lines.push(`- **${ch.name}**`);
if (ch.canSendMedia) {
lines.push(
" Capabilities: receive text/voice/image/video/document, send text, send files (photo, document, video, audio)",
);
lines.push(" Use the `send_file` tool to send files to channel users.");
} else {
lines.push(" Capabilities: receive text, send text");
}
}
lines.push(
"",
"Messages from channels are prefixed with `[ChannelName · private]` or `[ChannelName · group]`.",
"When responding to channel messages, adapt your formatting for messaging platforms (shorter paragraphs, no complex markdown).",
"",
);
return lines;
}
/**
* Extra system prompt appended at the end if provided.
*/

View file

@ -37,6 +37,14 @@ export interface RuntimeInfo {
cwd?: string | undefined;
}
/** Describes a connected messaging channel and its capabilities */
export interface ChannelInfo {
/** Human-readable channel name (e.g. "Telegram") */
name: string;
/** Whether the channel supports outbound media via send_file */
canSendMedia: boolean;
}
/** Subagent context for minimal/none modes */
export interface SubagentContext {
/** Parent session that spawned this subagent */
@ -77,6 +85,8 @@ export interface SystemPromptOptions {
subagent?: SubagentContext | undefined;
/** Workspace directory path (for agent working directory info) */
workspaceDir?: string | undefined;
/** Connected messaging channels (for channel awareness section) */
channels?: ChannelInfo[] | undefined;
/** Extra system prompt to append */
extraSystemPrompt?: string | undefined;
/** Whether to include the safety constitution (default: true) */

View file

@ -11,6 +11,8 @@ import { createSessionsListTool } from "./tools/sessions-list.js";
import { createMemorySearchTool } from "./tools/memory-search.js";
import { createCronTool } from "./tools/cron/index.js";
import { createDataTool } from "./tools/data/index.js";
import { createSendFileTool } from "./tools/send-file.js";
import type { SendFileCallback } from "./tools/send-file.js";
import { filterTools } from "./tools/policy.js";
import { isMulticaError, isRetryableError } from "@multica/utils";
import type { ExecApprovalCallback } from "./tools/exec-approval-types.js";
@ -31,6 +33,8 @@ export interface CreateToolsOptions {
provider?: string | undefined;
/** Callback invoked when exec tool needs approval before running a command */
onExecApprovalNeeded?: ExecApprovalCallback | undefined;
/** Callback for sending files through messaging channels */
onChannelSendFile?: SendFileCallback | undefined;
}
type ToolErrorPayload = {
@ -132,6 +136,12 @@ export function createAllTools(options: CreateToolsOptions | string): AgentTool<
tools.push(memorySearchTool as AgentTool<any>);
}
// Add send_file tool if channel send callback is provided
if (opts.onChannelSendFile) {
const sendFileTool = createSendFileTool(opts.onChannelSendFile);
tools.push(sendFileTool as AgentTool<any>);
}
// Add sessions_spawn tool (will be filtered by policy for subagents)
const sessionsSpawnTool = createSessionsSpawnTool({
isSubagent: isSubagent ?? false,
@ -173,6 +183,7 @@ export function resolveTools(options: ResolveToolsOptions): AgentTool<any>[] {
sessionId: options.sessionId,
provider: options.provider,
onExecApprovalNeeded: options.onExecApprovalNeeded,
onChannelSendFile: options.onChannelSendFile,
});
// Apply policy filtering

View file

@ -0,0 +1,120 @@
/**
* send_file tool sends a file to the active messaging channel.
*
* Available when the agent is connected to a channel (Telegram, etc.).
* Auto-detects media type from file extension if not specified.
*/
import { Type } from "@sinclair/typebox";
import type { AgentTool } from "@mariozechner/pi-agent-core";
import { stat } from "node:fs/promises";
import { basename, extname } from "node:path";
const SendFileSchema = Type.Object({
file_path: Type.String({
description: "Absolute path to the file to send.",
}),
caption: Type.Optional(
Type.String({
description: "Optional caption text to accompany the file.",
}),
),
type: Type.Optional(
Type.Union(
[
Type.Literal("auto"),
Type.Literal("photo"),
Type.Literal("document"),
Type.Literal("video"),
Type.Literal("audio"),
Type.Literal("voice"),
],
{
description:
'Media type. "auto" (default) detects from file extension. Use "document" to force file attachment.',
},
),
),
});
type SendFileArgs = {
file_path: string;
caption?: string;
type?: "auto" | "photo" | "document" | "video" | "audio" | "voice";
};
type SendFileResult = {
sent: boolean;
file_path: string;
detected_type: string;
error?: string;
};
/** Callback provided by the Hub to route files through channels or gateway. */
export type SendFileCallback = (
filePath: string,
caption: string | undefined,
type: string,
) => Promise<boolean>;
const PHOTO_EXTENSIONS = new Set([".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp"]);
const VIDEO_EXTENSIONS = new Set([".mp4", ".webm", ".mov", ".avi", ".mkv"]);
const AUDIO_EXTENSIONS = new Set([".mp3", ".ogg", ".wav", ".m4a", ".flac", ".aac"]);
/** Detect outbound media type from file extension. */
function detectMediaType(filePath: string): string {
const ext = extname(filePath).toLowerCase();
if (PHOTO_EXTENSIONS.has(ext)) return "photo";
if (VIDEO_EXTENSIONS.has(ext)) return "video";
if (AUDIO_EXTENSIONS.has(ext)) return "audio";
return "document";
}
export function createSendFileTool(
onSendFile: SendFileCallback,
): AgentTool<typeof SendFileSchema, SendFileResult> {
return {
name: "send_file",
label: "Send File",
description:
"Send a file (photo, document, video, audio) to the active messaging channel (e.g. Telegram). " +
"The file must exist on the local filesystem. " +
'Type is auto-detected from extension unless overridden. Use type="document" to force file attachment.',
parameters: SendFileSchema,
execute: async (_toolCallId, args) => {
const { file_path, caption, type } = args as SendFileArgs;
// Validate file exists
try {
const fileStat = await stat(file_path);
if (!fileStat.isFile()) {
return {
content: [{ type: "text", text: `Error: ${file_path} is not a file` }],
details: { sent: false, file_path, detected_type: "unknown", error: "Not a file" },
};
}
} catch {
return {
content: [{ type: "text", text: `Error: File not found: ${file_path}` }],
details: { sent: false, file_path, detected_type: "unknown", error: "File not found" },
};
}
const mediaType = type && type !== "auto" ? type : detectMediaType(file_path);
const sent = await onSendFile(file_path, caption, mediaType);
if (!sent) {
return {
content: [{ type: "text", text: "No active channel conversation to send the file to." }],
details: { sent: false, file_path, detected_type: mediaType, error: "No active channel" },
};
}
const filename = basename(file_path);
return {
content: [{ type: "text", text: `File sent: ${filename} (${mediaType})` }],
details: { sent: true, file_path, detected_type: mediaType },
};
},
};
}

View file

@ -2,6 +2,8 @@ import type { ThinkingLevel } from "@mariozechner/pi-agent-core";
import type { SkillsConfig } from "./skills/types.js";
import type { ToolsConfig } from "./tools/policy.js";
import type { ExecApprovalCallback, ExecApprovalConfig } from "./tools/exec-approval-types.js";
import type { SendFileCallback } from "./tools/send-file.js";
import type { ChannelInfo } from "./system-prompt/types.js";
/** Controls how reasoning/thinking content blocks are handled */
export type ReasoningMode = "off" | "on" | "stream";
@ -82,6 +84,12 @@ export type AgentOptions = {
onExecApprovalNeeded?: ExecApprovalCallback | undefined;
/** Exec approval configuration (security level, ask mode, allowlist) */
execApproval?: ExecApprovalConfig | undefined;
// === Channel Configuration ===
/** Connected messaging channels (for system prompt awareness) */
channels?: ChannelInfo[] | undefined;
/** Callback for sending files through messaging channels (Telegram, etc.) */
onChannelSendFile?: SendFileCallback | undefined;
};
export interface Message {

View file

@ -24,10 +24,12 @@ import { loadChannelsConfig } from "./config.js";
import { MessageAggregator, DEFAULT_CHUNKER_CONFIG } from "../hub/message-aggregator.js";
import { isHeartbeatAckEvent } from "../hub/heartbeat-filter.js";
import type { AsyncAgent } from "../agent/async-agent.js";
import type { ChannelInfo } from "../agent/system-prompt/types.js";
import { transcribeAudio } from "../media/transcribe.js";
import { describeImage } from "../media/describe-image.js";
import { describeVideo } from "../media/describe-video.js";
import { InboundDebouncer } from "./inbound-debouncer.js";
import { extname } from "node:path";
interface AccountHandle {
channelId: string;
@ -40,6 +42,8 @@ interface AccountHandle {
interface LastRoute {
plugin: ChannelPlugin;
deliveryCtx: DeliveryContext;
/** Chat type of the originating message (for source prefix) */
chatType?: "direct" | "group" | undefined;
}
export class ChannelManager {
@ -348,6 +352,7 @@ export class ChannelManager {
conversationId,
replyToMessageId: messageId,
},
chatType: message.chatType,
};
console.log(`[Channels] lastRoute updated → ${plugin.id}:${conversationId} replyTo=${messageId}`);
console.log(`[Channels] Forwarding to agent ${agent.sessionId}`);
@ -477,15 +482,58 @@ export class ChannelManager {
timestamp: Date.now(),
});
}
// Prepend source context so the LLM knows which platform/chat type the message came from
const channelName = route?.plugin.meta.name ?? "Channel";
const chatLabel = route?.chatType === "group" ? "group" : "private";
const prefixedText = `[${channelName} · ${chatLabel}]\n${combinedText}`;
const replyTo = route?.deliveryCtx.replyToMessageId ?? "?";
console.log(`[Channels] Debouncer flushing ${combinedText.length} chars to agent (queued route replyTo=${replyTo}, acks=${acks.length})`);
agent.write(combinedText, { source });
agent.write(prefixedText, { source });
},
);
}
return this.debouncer;
}
/**
* Send a file to the active channel conversation.
* Returns true if the file was sent, false if no active route or plugin doesn't support media.
*/
async sendFile(filePath: string, caption?: string, type?: string): Promise<boolean> {
const route = this.activeRoute ?? this.lastRoute;
if (!route) return false;
const { plugin, deliveryCtx } = route;
if (!plugin.outbound.sendMedia) return false;
const mediaType = type || this.detectMediaType(filePath);
try {
await plugin.outbound.sendMedia(deliveryCtx, {
type: mediaType as import("./types.js").OutboundMediaType,
source: filePath,
caption,
});
console.log(`[Channels] Sent ${mediaType} to ${deliveryCtx.channel}:${deliveryCtx.conversationId}`);
return true;
} catch (err) {
console.error(`[Channels] Failed to send file: ${err}`);
return false;
}
}
/** Detect outbound media type from file extension */
private detectMediaType(filePath: string): string {
const ext = extname(filePath).toLowerCase();
const photoExts = new Set([".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp"]);
const videoExts = new Set([".mp4", ".webm", ".mov", ".avi", ".mkv"]);
const audioExts = new Set([".mp3", ".ogg", ".wav", ".m4a", ".flac", ".aac"]);
if (photoExts.has(ext)) return "photo";
if (videoExts.has(ext)) return "video";
if (audioExts.has(ext)) return "audio";
return "document";
}
/** Start sending typing indicators (repeats every 5s until stopped) */
private startTyping(): void {
this.stopTyping();
@ -577,4 +625,21 @@ export class ChannelManager {
listAccountStates(): ChannelAccountState[] {
return Array.from(this.accounts.values()).map((h) => ({ ...h.state }));
}
/** Get channel info for connected channels (for system prompt awareness) */
listChannelInfos(): ChannelInfo[] {
const seen = new Set<string>();
const infos: ChannelInfo[] = [];
for (const handle of this.accounts.values()) {
if (handle.state.status !== "running" || seen.has(handle.channelId)) continue;
seen.add(handle.channelId);
const plugin = listChannels().find((p) => p.id === handle.channelId);
if (!plugin) continue;
infos.push({
name: plugin.meta.name,
canSendMedia: typeof plugin.outbound.sendMedia === "function",
});
}
return infos;
}
}

View file

@ -12,8 +12,8 @@
import { writeFile, mkdir } from "node:fs/promises";
import { join, extname } from "node:path";
import { v7 as uuidv7 } from "uuid";
import { Bot, GrammyError } from "grammy";
import type { ChannelPlugin, ChannelMessage, ChannelConfigAdapter, ChannelsConfig, DeliveryContext } from "../types.js";
import { Bot, GrammyError, InputFile } from "grammy";
import type { ChannelPlugin, ChannelMessage, ChannelConfigAdapter, ChannelsConfig, DeliveryContext, OutboundMedia } from "../types.js";
import { markdownToTelegramHtml } from "./telegram-format.js";
import { MEDIA_CACHE_DIR } from "@multica/utils";
@ -321,6 +321,67 @@ export const telegramChannel: ChannelPlugin = {
// Best-effort
}
},
async sendMedia(ctx: DeliveryContext, media: OutboundMedia): Promise<void> {
const bot = bots.get(ctx.accountId);
if (!bot) throw new Error(`No Telegram bot for account ${ctx.accountId}`);
const chatId = Number(ctx.conversationId);
const inputFile = new InputFile(media.source);
// Telegram caption limit: 1024 chars. Truncate if needed.
const caption = media.caption?.slice(0, 1024);
const captionHtml = caption ? markdownToTelegramHtml(caption) : undefined;
const extra = captionHtml ? { caption: captionHtml, parse_mode: "HTML" as const } : {};
console.log(`[Telegram] Sending ${media.type} to chatId=${chatId}`);
try {
switch (media.type) {
case "photo":
await bot.api.sendPhoto(chatId, inputFile, extra);
break;
case "video":
await bot.api.sendVideo(chatId, inputFile, extra);
break;
case "audio":
await bot.api.sendAudio(chatId, inputFile, extra);
break;
case "voice":
await bot.api.sendVoice(chatId, inputFile, extra);
break;
case "document":
default:
await bot.api.sendDocument(chatId, inputFile, extra);
break;
}
} catch (err) {
// If HTML caption fails, retry without formatting
if (isParseError(err) && caption) {
console.warn("[Telegram] Media caption HTML parse failed, retrying as plain text");
const plainExtra = { caption };
switch (media.type) {
case "photo":
await bot.api.sendPhoto(chatId, inputFile, plainExtra);
break;
case "video":
await bot.api.sendVideo(chatId, inputFile, plainExtra);
break;
case "audio":
await bot.api.sendAudio(chatId, inputFile, plainExtra);
break;
case "voice":
await bot.api.sendVoice(chatId, inputFile, plainExtra);
break;
case "document":
default:
await bot.api.sendDocument(chatId, inputFile, plainExtra);
break;
}
} else {
throw err;
}
}
},
},
async downloadMedia(fileId: string, accountId: string): Promise<string> {

View file

@ -88,6 +88,23 @@ export interface ChannelGatewayAdapter {
): Promise<void>;
}
// ─── Outbound Media ───
/** Media type for outbound messages */
export type OutboundMediaType = "photo" | "document" | "video" | "audio" | "voice";
/** Media payload for sending files back to the platform */
export interface OutboundMedia {
/** Media type (determines which API method to use) */
type: OutboundMediaType;
/** Local file path */
source: string;
/** Caption text (optional, may be truncated per platform limits) */
caption?: string | undefined;
/** Filename hint (optional, used for documents) */
filename?: string | undefined;
}
// ─── Outbound Adapter ───
/** Sends messages back to the platform */
@ -96,6 +113,8 @@ export interface ChannelOutboundAdapter {
sendText(ctx: DeliveryContext, text: string): Promise<void>;
/** Reply to a specific message */
replyText(ctx: DeliveryContext, text: string): Promise<void>;
/** Send a media file (photo, document, video, audio, voice) to a conversation (optional) */
sendMedia?(ctx: DeliveryContext, media: OutboundMedia): Promise<void>;
/** Send "typing" indicator (optional, not all platforms support it) */
sendTyping?(ctx: DeliveryContext): Promise<void>;
/**

View file

@ -1,3 +1,5 @@
import { readFile } from "node:fs/promises";
import { basename } from "node:path";
import { v7 as uuidv7 } from "uuid";
import {
GatewayClient,
@ -150,14 +152,16 @@ export class Hub {
this.initCronService();
this.initHeartbeatService();
// Initialize channel plugin system (before restoreAgents so channelManager is available)
console.log("[Hub] Initializing channel system...");
initChannels();
this.channelManager = new ChannelManager(this);
this.client = this.createClient(this.url);
this.client.connect();
this.restoreAgents();
// Initialize channel plugin system
console.log("[Hub] Initializing channel system...");
initChannels();
this.channelManager = new ChannelManager(this);
// Start channel accounts (async — bot connections happen in background)
void this.channelManager.startAll().then(() => {
console.log("[Hub] Channel system started");
}).catch((err) => {
@ -375,7 +379,9 @@ export class Hub {
const profileId = options?.profileId ?? "default";
const sessionId = id ?? uuidv7();
const onExecApprovalNeeded = this.createExecApprovalCallback(sessionId, profileId);
const agent = new AsyncAgent({ sessionId, profileId, onExecApprovalNeeded });
const onChannelSendFile = this.createChannelSendFileCallback(sessionId);
const channels = this.channelManager.listChannelInfos();
const agent = new AsyncAgent({ sessionId, profileId, onExecApprovalNeeded, onChannelSendFile, channels });
this.agents.set(agent.sessionId, agent);
// Persist to agent store (skip during restore to avoid duplicates)
@ -663,6 +669,39 @@ export class Hub {
};
}
/**
* Create a callback for the send_file tool that routes files through
* the channel plugin (local) or gateway (remote) path.
*/
private createChannelSendFileCallback(sessionId: string): (filePath: string, caption: string | undefined, type: string) => Promise<boolean> {
return async (filePath: string, caption: string | undefined, type: string): Promise<boolean> => {
// Path 1: Channel plugin (local bot — file on same machine)
const sentViaChannel = await this.channelManager.sendFile(filePath, caption, type);
if (sentViaChannel) return true;
// Path 2: Gateway (remote bot — read file, base64 encode, send via RoutedMessage)
const deviceId = this.agentSenders.get(sessionId);
if (deviceId) {
try {
const fileBuffer = await readFile(filePath);
this.client.send(deviceId, "send_file", {
data: fileBuffer.toString("base64"),
type,
caption,
filename: basename(filePath),
});
console.log(`[Hub] Sent file via gateway: ${basename(filePath)}${deviceId}`);
return true;
} catch (err) {
console.error(`[Hub] Failed to send file via gateway: ${err}`);
return false;
}
}
return false;
};
}
getAgent(id: string): AsyncAgent | undefined {
return this.agents.get(id);
}