Teach Telegram sender to accept an optional parse_mode and enable MarkdownV2 only for scheduled task summaries so headings, emphasis, and escaped characters render correctly without affecting other bot replies.
491 lines
15 KiB
JavaScript
491 lines
15 KiB
JavaScript
const https = require('https');
|
|
const { User, InboxItem } = require('../models');
|
|
|
|
// Create poller state
|
|
const createPollerState = () => ({
|
|
running: false,
|
|
interval: null,
|
|
pollInterval: 5000, // 5 seconds
|
|
usersToPool: [],
|
|
userStatus: {},
|
|
processedUpdates: new Set(), // Track processed update IDs to prevent duplicates
|
|
});
|
|
|
|
// Global mutable state (managed functionally)
|
|
let pollerState = createPollerState();
|
|
|
|
// Check if user exists in list
|
|
const userExistsInList = (users, userId) => users.some((u) => u.id === userId);
|
|
|
|
// Add user to list
|
|
const addUserToList = (users, user) => {
|
|
if (userExistsInList(users, user.id)) {
|
|
return users;
|
|
}
|
|
return [...users, user];
|
|
};
|
|
|
|
// Remove user from list
|
|
const removeUserFromList = (users, userId) =>
|
|
users.filter((u) => u.id !== userId);
|
|
|
|
// Remove user status
|
|
const removeUserStatus = (userStatus, userId) => {
|
|
const { [userId]: removed, ...rest } = userStatus;
|
|
return rest;
|
|
};
|
|
|
|
// Update user status
|
|
const updateUserStatus = (userStatus, userId, updates) => ({
|
|
...userStatus,
|
|
[userId]: {
|
|
...userStatus[userId],
|
|
...updates,
|
|
},
|
|
});
|
|
|
|
// Get highest update ID from updates
|
|
const getHighestUpdateId = (updates) => {
|
|
if (!updates.length) return 0;
|
|
return Math.max(...updates.map((u) => u.update_id));
|
|
};
|
|
|
|
// Create message parameters
|
|
const createMessageParams = (
|
|
chatId,
|
|
text,
|
|
replyToMessageId = null,
|
|
parseMode = undefined
|
|
) => {
|
|
const params = { chat_id: chatId, text: text };
|
|
if (replyToMessageId) {
|
|
params.reply_to_message_id = replyToMessageId;
|
|
}
|
|
if (parseMode) {
|
|
params.parse_mode = parseMode;
|
|
}
|
|
return params;
|
|
};
|
|
|
|
// Create Telegram API URL
|
|
const createTelegramUrl = (token, endpoint, params = {}) => {
|
|
const baseUrl = `https://api.telegram.org/bot${token}/${endpoint}`;
|
|
if (Object.keys(params).length === 0) return baseUrl;
|
|
|
|
const searchParams = new URLSearchParams(params);
|
|
return `${baseUrl}?${searchParams}`;
|
|
};
|
|
|
|
// Side effect function to make HTTP GET request
|
|
const makeHttpGetRequest = (url, timeout = 5000) => {
|
|
return new Promise((resolve, reject) => {
|
|
https
|
|
.get(url, { timeout }, (res) => {
|
|
let data = '';
|
|
|
|
res.on('data', (chunk) => {
|
|
data += chunk;
|
|
});
|
|
|
|
res.on('end', () => {
|
|
try {
|
|
const response = JSON.parse(data);
|
|
resolve(response);
|
|
} catch (error) {
|
|
reject(error);
|
|
}
|
|
});
|
|
})
|
|
.on('error', (error) => {
|
|
reject(error);
|
|
})
|
|
.on('timeout', () => {
|
|
reject(new Error('Request timeout'));
|
|
});
|
|
});
|
|
};
|
|
|
|
// Side effect function to make HTTP POST request
|
|
const makeHttpPostRequest = (url, postData, options) => {
|
|
return new Promise((resolve, reject) => {
|
|
const req = https.request(url, options, (res) => {
|
|
let data = '';
|
|
res.on('data', (chunk) => (data += chunk));
|
|
res.on('end', () => {
|
|
try {
|
|
const response = JSON.parse(data);
|
|
resolve(response);
|
|
} catch (error) {
|
|
reject(error);
|
|
}
|
|
});
|
|
});
|
|
|
|
req.on('error', reject);
|
|
req.write(postData);
|
|
req.end();
|
|
});
|
|
};
|
|
|
|
// Side effect function to get Telegram updates
|
|
const getTelegramUpdates = async (token, offset) => {
|
|
try {
|
|
const url = createTelegramUrl(token, 'getUpdates', {
|
|
offset: offset.toString(),
|
|
timeout: '1',
|
|
});
|
|
|
|
const response = await makeHttpGetRequest(url, 5000);
|
|
|
|
if (response.ok && Array.isArray(response.result)) {
|
|
return response.result;
|
|
} else {
|
|
return [];
|
|
}
|
|
} catch (error) {
|
|
throw error;
|
|
}
|
|
};
|
|
|
|
// Side effect function to send Telegram message
|
|
const sendTelegramMessage = async (
|
|
token,
|
|
chatId,
|
|
text,
|
|
replyToMessageId = null,
|
|
options = {}
|
|
) => {
|
|
try {
|
|
const messageParams = createMessageParams(
|
|
chatId,
|
|
text,
|
|
replyToMessageId,
|
|
options.parseMode
|
|
);
|
|
const postData = JSON.stringify(messageParams);
|
|
const url = createTelegramUrl(token, 'sendMessage');
|
|
|
|
const requestOptions = {
|
|
method: 'POST',
|
|
headers: {
|
|
'Content-Type': 'application/json',
|
|
'Content-Length': Buffer.byteLength(postData),
|
|
},
|
|
};
|
|
|
|
return await makeHttpPostRequest(url, postData, requestOptions);
|
|
} catch (error) {
|
|
throw error;
|
|
}
|
|
};
|
|
|
|
// Side effect function to update user chat ID
|
|
const updateUserChatId = async (userId, chatId) => {
|
|
await User.update({ telegram_chat_id: chatId }, { where: { id: userId } });
|
|
};
|
|
|
|
// Side effect function to create inbox item
|
|
const createInboxItem = async (content, userId, messageId) => {
|
|
// Check if a similar item was created recently (within last 30 seconds)
|
|
// to prevent duplicates from network issues or multiple processing
|
|
const recentCutoff = new Date(Date.now() - 30000); // 30 seconds ago
|
|
|
|
const existingItem = await InboxItem.findOne({
|
|
where: {
|
|
content: content,
|
|
user_id: userId,
|
|
source: 'telegram',
|
|
created_at: {
|
|
[require('sequelize').Op.gte]: recentCutoff,
|
|
},
|
|
},
|
|
});
|
|
|
|
if (existingItem) {
|
|
console.log(
|
|
`Duplicate inbox item detected for user ${userId}, content: "${content}". Skipping creation.`
|
|
);
|
|
return existingItem;
|
|
}
|
|
|
|
return await InboxItem.create({
|
|
content: content,
|
|
source: 'telegram',
|
|
user_id: userId,
|
|
metadata: { telegram_message_id: messageId }, // Store message ID for reference
|
|
});
|
|
};
|
|
|
|
// Function to handle bot commands
|
|
const handleBotCommand = async (command, user, chatId, messageId) => {
|
|
const botToken = user.telegram_bot_token;
|
|
|
|
switch (command.toLowerCase()) {
|
|
case '/start':
|
|
await sendTelegramMessage(
|
|
botToken,
|
|
chatId,
|
|
`🎉 Welcome to tududi!\n\nYour personal task management bot is now connected and ready to help!\n\n📝 Simply send me any message and I'll add it to your tududi inbox as an item.\n\n✨ Commands:\n• /help - Show help information\n• /start - Show welcome message\n• Just type any text - Add it as an inbox item\n\nLet's get organized! 🚀`,
|
|
messageId
|
|
);
|
|
break;
|
|
case '/help':
|
|
await sendTelegramMessage(
|
|
botToken,
|
|
chatId,
|
|
`📋 tududi Bot Help\n\nSend me any text message and I'll add it to your tududi inbox as an inbox item.\n\nCommands:\n/start - Welcome message\n/help - Show this help message\n\nJust type your item and I'll take care of the rest!`,
|
|
messageId
|
|
);
|
|
break;
|
|
default:
|
|
await sendTelegramMessage(
|
|
botToken,
|
|
chatId,
|
|
`❓ Unknown command: ${command}\n\nUse /help to see available commands or just send a regular message to add it to your inbox.`,
|
|
messageId
|
|
);
|
|
break;
|
|
}
|
|
};
|
|
|
|
// Function to process a single message (contains side effects)
|
|
const processMessage = async (user, update) => {
|
|
const message = update.message;
|
|
const text = message.text;
|
|
const chatId = message.chat.id.toString();
|
|
const messageId = message.message_id;
|
|
|
|
// Update chat ID if needed and send welcome message for new users
|
|
if (!user.telegram_chat_id) {
|
|
await updateUserChatId(user.id, chatId);
|
|
user.telegram_chat_id = chatId; // Update local object
|
|
|
|
// Send welcome message for first-time users
|
|
await sendTelegramMessage(
|
|
user.telegram_bot_token,
|
|
chatId,
|
|
`🎉 Welcome to tududi!\n\nYour personal task management bot is now connected and ready to help!\n\n📝 Simply send me any message and I'll add it to your tududi inbox as an inbox item.\n\n✨ Commands:\n• /help - Show help information\n• /start - Show welcome message\n• Just type any text - Add it as an inbox item\n\nLet's get organized! 🚀`
|
|
);
|
|
|
|
console.log(
|
|
`Sent welcome message to new user ${user.id} in chat ${chatId}`
|
|
);
|
|
|
|
// If the first message was just /start, don't process it further
|
|
if (text.toLowerCase() === '/start') {
|
|
return;
|
|
}
|
|
}
|
|
|
|
try {
|
|
// Check if message is a bot command
|
|
if (text.startsWith('/')) {
|
|
await handleBotCommand(text, user, chatId, messageId);
|
|
console.log(
|
|
`Successfully processed command ${messageId} for user ${user.id}: "${text}"`
|
|
);
|
|
return;
|
|
}
|
|
|
|
// Create inbox item for regular messages (with duplicate check)
|
|
const inboxItem = await createInboxItem(text, user.id, messageId);
|
|
|
|
// Send confirmation
|
|
await sendTelegramMessage(
|
|
user.telegram_bot_token,
|
|
chatId,
|
|
`✅ Added to tududi inbox: "${text}"`,
|
|
messageId
|
|
);
|
|
|
|
console.log(
|
|
`Successfully processed message ${messageId} for user ${user.id}: "${text}"`
|
|
);
|
|
} catch (error) {
|
|
// Send error message
|
|
await sendTelegramMessage(
|
|
user.telegram_bot_token,
|
|
chatId,
|
|
`❌ Failed to add to inbox: ${error.message}`,
|
|
messageId
|
|
);
|
|
}
|
|
};
|
|
|
|
// Function to process updates (contains side effects)
|
|
const processUpdates = async (user, updates) => {
|
|
if (!updates.length) return;
|
|
|
|
// Filter out already processed updates
|
|
const newUpdates = updates.filter((update) => {
|
|
const updateKey = `${user.id}-${update.update_id}`;
|
|
return !pollerState.processedUpdates.has(updateKey);
|
|
});
|
|
|
|
if (!newUpdates.length) return;
|
|
|
|
// Get highest update ID from new updates
|
|
const highestUpdateId = getHighestUpdateId(newUpdates);
|
|
|
|
// Update user status
|
|
pollerState = {
|
|
...pollerState,
|
|
userStatus: updateUserStatus(pollerState.userStatus, user.id, {
|
|
lastUpdateId: highestUpdateId,
|
|
}),
|
|
};
|
|
|
|
// Process each new update
|
|
for (const update of newUpdates) {
|
|
try {
|
|
const updateKey = `${user.id}-${update.update_id}`;
|
|
|
|
if (update.message && update.message.text) {
|
|
await processMessage(user, update);
|
|
|
|
// Mark update as processed
|
|
pollerState.processedUpdates.add(updateKey);
|
|
|
|
// Clean up old processed updates (keep only last 1000 to prevent memory leak)
|
|
if (pollerState.processedUpdates.size > 1000) {
|
|
const oldestEntries = Array.from(
|
|
pollerState.processedUpdates
|
|
).slice(0, 100);
|
|
oldestEntries.forEach((entry) =>
|
|
pollerState.processedUpdates.delete(entry)
|
|
);
|
|
}
|
|
}
|
|
} catch (error) {
|
|
console.error(
|
|
`Error processing update ${update.update_id} for user ${user.id}:`,
|
|
error
|
|
);
|
|
}
|
|
}
|
|
};
|
|
|
|
// Function to poll updates for all users (contains side effects)
|
|
const pollUpdates = async () => {
|
|
for (const user of pollerState.usersToPool) {
|
|
const token = user.telegram_bot_token;
|
|
if (!token) continue;
|
|
|
|
try {
|
|
const lastUpdateId =
|
|
pollerState.userStatus[user.id]?.lastUpdateId || 0;
|
|
const updates = await getTelegramUpdates(token, lastUpdateId + 1);
|
|
|
|
if (updates && updates.length > 0) {
|
|
console.log(
|
|
`Processing ${updates.length} updates for user ${user.id}, starting from update ID ${lastUpdateId + 1}`
|
|
);
|
|
await processUpdates(user, updates);
|
|
}
|
|
} catch (error) {
|
|
console.error(`Error getting updates for user ${user.id}:`, error);
|
|
}
|
|
}
|
|
};
|
|
|
|
// Function to start polling (contains side effects)
|
|
const startPolling = () => {
|
|
if (pollerState.running) return;
|
|
|
|
const interval = setInterval(async () => {
|
|
try {
|
|
await pollUpdates();
|
|
} catch (error) {
|
|
// Error polling Telegram
|
|
}
|
|
}, pollerState.pollInterval);
|
|
|
|
pollerState = {
|
|
...pollerState,
|
|
running: true,
|
|
interval,
|
|
};
|
|
};
|
|
|
|
// Function to stop polling (contains side effects)
|
|
const stopPolling = () => {
|
|
if (!pollerState.running) return;
|
|
|
|
if (pollerState.interval) {
|
|
clearInterval(pollerState.interval);
|
|
}
|
|
|
|
pollerState = {
|
|
...pollerState,
|
|
running: false,
|
|
interval: null,
|
|
};
|
|
};
|
|
|
|
// Function to add user (contains side effects)
|
|
const addUser = async (user) => {
|
|
if (!user || !user.telegram_bot_token) {
|
|
return false;
|
|
}
|
|
|
|
// Add user to list
|
|
const newUsersList = addUserToList(pollerState.usersToPool, user);
|
|
|
|
pollerState = {
|
|
...pollerState,
|
|
usersToPool: newUsersList,
|
|
};
|
|
|
|
// Start polling if not already running and we have users
|
|
if (pollerState.usersToPool.length > 0 && !pollerState.running) {
|
|
startPolling();
|
|
}
|
|
|
|
return true;
|
|
};
|
|
|
|
// Function to remove user (contains side effects)
|
|
const removeUser = (userId) => {
|
|
// Remove user from list and status
|
|
const newUsersList = removeUserFromList(pollerState.usersToPool, userId);
|
|
const newUserStatus = removeUserStatus(pollerState.userStatus, userId);
|
|
|
|
pollerState = {
|
|
...pollerState,
|
|
usersToPool: newUsersList,
|
|
userStatus: newUserStatus,
|
|
};
|
|
|
|
// Stop polling if no users left
|
|
if (pollerState.usersToPool.length === 0 && pollerState.running) {
|
|
stopPolling();
|
|
}
|
|
|
|
return true;
|
|
};
|
|
|
|
// Get poller status
|
|
const getStatus = () => ({
|
|
running: pollerState.running,
|
|
usersCount: pollerState.usersToPool.length,
|
|
pollInterval: pollerState.pollInterval,
|
|
userStatus: pollerState.userStatus,
|
|
});
|
|
|
|
// Export functional interface
|
|
module.exports = {
|
|
addUser,
|
|
removeUser,
|
|
startPolling,
|
|
stopPolling,
|
|
getStatus,
|
|
sendTelegramMessage,
|
|
// For testing
|
|
_createPollerState: createPollerState,
|
|
_userExistsInList: userExistsInList,
|
|
_addUserToList: addUserToList,
|
|
_removeUserFromList: removeUserFromList,
|
|
_getHighestUpdateId: getHighestUpdateId,
|
|
_createMessageParams: createMessageParams,
|
|
_createTelegramUrl: createTelegramUrl,
|
|
};
|