9router/src/app/api/usage/stream/route.js
2026-03-14 09:37:29 +07:00

81 lines
2.9 KiB
JavaScript

import { getUsageStats, statsEmitter, getActiveRequests } from "@/lib/usageDb";
export const dynamic = "force-dynamic";
export async function GET() {
const encoder = new TextEncoder();
const state = { closed: false, keepalive: null, send: null, sendPending: null, cachedStats: null };
const stream = new ReadableStream({
async start(controller) {
// Full stats refresh (heavy) + immediate lightweight push
state.send = async () => {
if (state.closed) return;
try {
// Push lightweight update immediately so UI reflects changes fast
if (state.cachedStats) {
const { activeRequests, recentRequests, errorProvider } = await getActiveRequests();
const quickStats = { ...state.cachedStats, activeRequests, recentRequests, errorProvider };
controller.enqueue(encoder.encode(`data: ${JSON.stringify(quickStats)}\n\n`));
}
// Then do full recalc and update cache
const stats = await getUsageStats();
state.cachedStats = stats;
controller.enqueue(encoder.encode(`data: ${JSON.stringify(stats)}\n\n`));
} catch {
state.closed = true;
statsEmitter.off("update", state.send);
statsEmitter.off("pending", state.sendPending);
clearInterval(state.keepalive);
}
};
// Lightweight push: only refresh activeRequests + recentRequests on pending changes
state.sendPending = async () => {
if (state.closed || !state.cachedStats) return;
try {
const { activeRequests, recentRequests, errorProvider } = await getActiveRequests();
const stats = { ...state.cachedStats, activeRequests, recentRequests, errorProvider };
controller.enqueue(encoder.encode(`data: ${JSON.stringify(stats)}\n\n`));
} catch {
state.closed = true;
statsEmitter.off("update", state.send);
statsEmitter.off("pending", state.sendPending);
clearInterval(state.keepalive);
}
};
await state.send();
console.log(`[SSE] Client connected | listeners=${statsEmitter.listenerCount("update") + 1}`);
statsEmitter.on("update", state.send);
statsEmitter.on("pending", state.sendPending);
state.keepalive = setInterval(() => {
if (state.closed) { clearInterval(state.keepalive); return; }
try {
controller.enqueue(encoder.encode(": ping\n\n"));
} catch {
state.closed = true;
clearInterval(state.keepalive);
}
}, 25000);
},
cancel() {
state.closed = true;
statsEmitter.off("update", state.send);
statsEmitter.off("pending", state.sendPending);
clearInterval(state.keepalive);
console.log("[SSE] Client disconnected");
},
});
return new Response(stream, {
headers: {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
Connection: "keep-alive",
},
});
}