diff --git a/backend/modules/telegram/telegramInitializer.js b/backend/modules/telegram/telegramInitializer.js index 0be0851..929720a 100644 --- a/backend/modules/telegram/telegramInitializer.js +++ b/backend/modules/telegram/telegramInitializer.js @@ -8,25 +8,37 @@ async function initializeTelegramPolling() { return; } - try { - // Find users with configured Telegram tokens - const usersWithTelegram = await User.findAll({ - where: { - telegram_bot_token: { - [require('sequelize').Op.ne]: null, - }, - }, - }); + // Add a delay before starting Telegram polling to allow the system to settle + // and prevent immediate error floods if Telegram is temporarily unreachable + const startupDelay = 10000; // 10 seconds - if (usersWithTelegram.length > 0) { - // Add each user to the polling list - for (const user of usersWithTelegram) { - await telegramPoller.addUser(user); + setTimeout(async () => { + try { + // Find users with configured Telegram tokens + const usersWithTelegram = await User.findAll({ + where: { + telegram_bot_token: { + [require('sequelize').Op.ne]: null, + }, + }, + }); + + if (usersWithTelegram.length > 0) { + console.log( + `Initializing Telegram polling for ${usersWithTelegram.length} user(s)...` + ); + // Add each user to the polling list + for (const user of usersWithTelegram) { + await telegramPoller.addUser(user); + } } + } catch (error) { + console.error( + 'Error initializing Telegram polling:', + error.message + ); } - } catch (error) { - // Telegram polling will be initialized later when the database is available - } + }, startupDelay); } module.exports = { initializeTelegramPolling }; diff --git a/backend/modules/telegram/telegramPoller.js b/backend/modules/telegram/telegramPoller.js index f4d558e..212b41b 100644 --- a/backend/modules/telegram/telegramPoller.js +++ b/backend/modules/telegram/telegramPoller.js @@ -9,6 +9,7 @@ const createPollerState = () => ({ usersToPool: [], userStatus: {}, processedUpdates: new Set(), // Track processed update IDs to prevent duplicates + userErrorState: {}, // Track error count and backoff per user }); // Global mutable state (managed functionally) @@ -35,6 +36,55 @@ const removeUserStatus = (userStatus, userId) => { return rest; }; +// Initialize error state for a user +const initializeUserErrorState = (userId) => ({ + consecutiveErrors: 0, + lastErrorTime: null, + nextPollTime: Date.now(), + lastLoggedErrorTime: null, +}); + +// Update error state for a user +const updateUserErrorState = (userErrorState, userId, updates) => ({ + ...userErrorState, + [userId]: { + ...(userErrorState[userId] || initializeUserErrorState(userId)), + ...updates, + }, +}); + +// Remove user error state +const removeUserErrorState = (userErrorState, userId) => { + const { [userId]: removed, ...rest } = userErrorState; + return rest; +}; + +// Calculate backoff delay using exponential backoff (max 5 minutes) +const calculateBackoffDelay = (consecutiveErrors) => { + const baseDelay = 5000; // 5 seconds + const maxDelay = 300000; // 5 minutes + const delay = Math.min( + baseDelay * Math.pow(2, consecutiveErrors), + maxDelay + ); + return delay; +}; + +// Check if user should be polled based on backoff state +const shouldPollUser = (userId) => { + const errorState = pollerState.userErrorState[userId]; + if (!errorState) return true; + return Date.now() >= errorState.nextPollTime; +}; + +// Check if error should be logged (rate limit: once per minute per user) +const shouldLogError = (userId) => { + const errorState = pollerState.userErrorState[userId]; + if (!errorState || !errorState.lastLoggedErrorTime) return true; + const timeSinceLastLog = Date.now() - errorState.lastLoggedErrorTime; + return timeSinceLastLog >= 60000; // 1 minute +}; + // Update user status const updateUserStatus = (userStatus, userId, updates) => ({ ...userStatus, @@ -458,6 +508,11 @@ const pollUpdates = async () => { const token = user.telegram_bot_token; if (!token) continue; + // Check if we should poll this user based on backoff state + if (!shouldPollUser(user.id)) { + continue; + } + try { const lastUpdateId = pollerState.userStatus[user.id]?.lastUpdateId || 0; @@ -469,8 +524,54 @@ const pollUpdates = async () => { ); await processUpdates(user, updates); } + + // Reset error state on successful poll + if (pollerState.userErrorState[user.id]?.consecutiveErrors > 0) { + pollerState = { + ...pollerState, + userErrorState: updateUserErrorState( + pollerState.userErrorState, + user.id, + { + consecutiveErrors: 0, + lastErrorTime: null, + nextPollTime: Date.now(), + } + ), + }; + } } catch (error) { - console.error(`Error getting updates for user ${user.id}:`, error); + // Get current error state or initialize it + const currentErrorState = + pollerState.userErrorState[user.id] || + initializeUserErrorState(user.id); + const consecutiveErrors = currentErrorState.consecutiveErrors + 1; + const backoffDelay = calculateBackoffDelay(consecutiveErrors); + + // Update error state with exponential backoff + pollerState = { + ...pollerState, + userErrorState: updateUserErrorState( + pollerState.userErrorState, + user.id, + { + consecutiveErrors, + lastErrorTime: Date.now(), + nextPollTime: Date.now() + backoffDelay, + lastLoggedErrorTime: shouldLogError(user.id) + ? Date.now() + : currentErrorState.lastLoggedErrorTime, + } + ), + }; + + // Only log error if enough time has passed (rate limiting) + if (shouldLogError(user.id)) { + console.error( + `Error getting updates for user ${user.id} (${consecutiveErrors} consecutive errors, backing off for ${backoffDelay / 1000}s):`, + error.message || error + ); + } } } }; @@ -533,14 +634,19 @@ const addUser = async (user) => { // Function to remove user (contains side effects) const removeUser = (userId) => { - // Remove user from list and status + // Remove user from list, status, and error state const newUsersList = removeUserFromList(pollerState.usersToPool, userId); const newUserStatus = removeUserStatus(pollerState.userStatus, userId); + const newUserErrorState = removeUserErrorState( + pollerState.userErrorState, + userId + ); pollerState = { ...pollerState, usersToPool: newUsersList, userStatus: newUserStatus, + userErrorState: newUserErrorState, }; // Stop polling if no users left diff --git a/backend/tests/unit/services/telegramPoller.test.js b/backend/tests/unit/services/telegramPoller.test.js index 9cffe75..a4954d2 100644 --- a/backend/tests/unit/services/telegramPoller.test.js +++ b/backend/tests/unit/services/telegramPoller.test.js @@ -191,6 +191,7 @@ describe('TelegramPoller Duplicate Prevention', () => { usersToPool: [], userStatus: {}, processedUpdates: expect.any(Set), + userErrorState: {}, }); });