From 85b7a0b136715f41f09cb80d9b4790fb954685ed Mon Sep 17 00:00:00 2001 From: Blade <46746496+Blade096@users.noreply.github.com> Date: Mon, 9 Feb 2026 11:30:42 +0800 Subject: [PATCH] Feature/ai observability dashboard (#79) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat: add AI request details feature with latency tracking Add comprehensive request history and debugging capability to the Usage dashboard: **Storage Layer** (usageDb.js): - Add saveRequestDetail() for storing full request/response details - Implement FIFO queue with 1000-record limit in request-details.json - Auto-sanitize sensitive headers (authorization, api-key, cookie, token) - Add getRequestDetails() with pagination and filtering support - Add getRequestDetailById() for single record lookup **Pipeline Integration** (chatCore.js): - Track request start time and calculate total latency - Record TTFT (Time To First Token) and total latency for all requests - Capture full request details (messages, model, parameters) - Save response content for non-streaming, mark streaming responses - Handle error cases with detailed error information - Async non-blocking saves to avoid impacting request performance **API Layer** (/api/usage/request-details): - GET endpoint with pagination (page, pageSize: 1-100) - Filter by provider, model, connectionId, status, date range - Returns { details: [...], pagination: {...} } format **UI Components**: - Drawer.js: Right slide-out panel with backdrop blur and ESC close - Pagination.js: Full pagination with page size selector (10/20/50) - RequestDetailsTab.js: Complete table view with filters and detail drawer **Dashboard Integration**: - Add "Details" tab to Usage page (4th tab after Overview/Logger/Limits) - Table columns: Timestamp, Model, Provider, Input Tokens, Output Tokens, Latency (TTFT/Total), Action - Provider filter dropdown (9 providers supported) - Date range filters (start/end datetime) - Click "Detail" button to view full request/response JSON in slide-out drawer **Features**: - Real-time latency monitoring (TTFT & Total) - Complete request/response inspection for debugging - Filterable and searchable request history - Responsive design with mobile-friendly filters - Data security with automatic header sanitization - Performance: async saves don't block request pipeline **Files Created/Modified**: - src/lib/usageDb.js (modified) - open-sse/handlers/chatCore.js (modified) - src/app/api/usage/request-details/route.js (new) - src/shared/components/Drawer.js (new) - src/shared/components/Pagination.js (new) - src/app/(dashboard)/dashboard/usage/components/RequestDetailsTab.js (new) - src/app/(dashboard)/dashboard/usage/page.js (modified) Closes: AI Observability Dashboard feature * feat: enhance request details with full config and streaming content capture Improve Request Details feature to capture comprehensive request parameters and actual streaming response content: **Request Configuration Enhancement** (chatCore.js): - Add extractRequestConfig() helper function to capture all request parameters - Include temperature controls: temperature, top_p, top_k - Include token limits: max_tokens, max_completion_tokens - Include thinking/reasoning modes: thinking, reasoning, enable_thinking - Include OpenAI parameters: presence_penalty, frequency_penalty, seed, stop, tools, tool_choice, response_format, n, logprobs, top_logprobs, logit_bias, user, parallel_tool_calls, prediction, store, metadata - Apply to all request types: non-streaming, streaming, and error cases **Streaming Content Capture** (chatCore.js & stream.js): - Add onStreamComplete callback mechanism to stream processors - Accumulate content from all formats: OpenAI, Claude, Gemini - Track content from delta.content, delta.reasoning_content, delta.text, delta.thinking, and Gemini content.parts - Save initial record with "[Streaming in progress...]" marker - Update record with actual content when stream completes - Include usage tokens when available from stream **Files Modified**: - open-sse/handlers/chatCore.js - extractRequestConfig() + streaming capture - open-sse/utils/stream.js - onStreamComplete callback + content accumulation **Benefits**: - View complete request configuration in Request Details (thinking mode, etc.) - See actual streaming response content instead of placeholder - Better debugging and observability for AI requests Refs: #request-details-enhancement * feat: separate thinking/reasoning content from response content Improve Request Details to display thinking process separately from final response: **Backend Changes**: - stream.js: Capture content and thinking separately in streaming mode - Add accumulatedThinking variable alongside accumulatedContent - Route delta.content to content, delta.reasoning_content to thinking - Support OpenAI (reasoning_content), Claude (thinking), Gemini (part.thought) - Update onStreamComplete callback to return { content, thinking } object - chatCore.js: Update response structure to include thinking field - Non-streaming: Extract thinking from reasoning_content field - Streaming: Receive { content, thinking } from stream callback - Error responses: Include thinking: null - Initial streaming save: Include thinking: null **Frontend Changes**: - RequestDetailsTab.js: Display thinking and content in separate sections - Add amber/yellow themed "Thinking Process" section with psychology icon - Show "Final Response" label when thinking is present - Use distinct visual styling for thinking (amber bg) vs content (gray bg) - Only show thinking section when thinking content exists **Benefits**: - Users can clearly see model's reasoning process vs final answer - Better debugging for models with thinking capabilities (Claude, o1, etc.) - Visual distinction makes it easy to identify thinking vs response Refs: #thinking-content-separation * fix: map Claude thinking to reasoning_content field Fix Claude thinking content to be properly captured as reasoning_content instead of regular content, enabling separate display in Request Details: **Changes**: - claude-to-openai.js: Use reasoning_content field for thinking blocks - thinking start: send { reasoning_content: "" } instead of { content: "```\n```" } - thinking delta: map to reasoning_content instead of content - thinking stop: send { reasoning_content: "" } instead of { content: "```\n```" } **Why This Matters**: - Previously Claude thinking was sent as `content` field, mixed with actual response - Now thinking uses `reasoning_content` field, matching OpenAI's o1 format - stream.js can now properly route thinking to accumulatedThinking variable - Request Details UI will show Claude thinking in separate "Thinking Process" section **Supported Thinking Formats**: - OpenAI: delta.reasoning_content → thinking - Claude: delta.thinking → reasoning_content (now fixed) - Gemini: part.thought === true → thinking Refs: #claude-thinking-fix * feat(observability): capture and display full 4-layer request chain Capture complete request/response chain in AI Request Details: - Add providerRequest field (translated request sent to provider) - Add providerResponse field (raw provider response, streaming indicator) - Update chatCore.js at all 5 saveRequestDetail() call sites - Reorganize UI into 4 collapsible sections with Material icons - Preserve backward compatibility for old records - Add distinct styling for streaming indicator * fix(observability): resolve React duplicate key warning in request details table - Use composite key (detail.id + index) to ensure unique keys - Prevents React warnings when database contains duplicate IDs from old ID generation * fix(observability): display actual content in streaming request details Change providerResponse field for streaming requests from placeholder "[Streaming - raw response not captured]" to actual final content. This improves debugging experience by showing the real AI response in the "Provider Response (Raw)" section instead of a confusing placeholder message. Files changed: - open-sse/handlers/chatCore.js: Save contentObj.content to providerResponse - src/app/.../RequestDetailsTab.js: Remove special handling for placeholder * refactor(observability): migrate request details to SQLite for improved concurrency - Replace LowDB JSON storage with better-sqlite3 - Enable WAL mode for true concurrent read/write support - Add 5 indexes to accelerate queries (timestamp, provider, model, connection_id, status) - Perform pagination at the database level to reduce memory footprint - Maintain 1000 record limit with automatic cleanup of old data - Ensure API compatibility via re-exports, requiring no caller changes Performance improvements: - Concurrent Writes: Lock-free WAL mode prevents data contention - Query Efficiency: Index-based searches replace full dataset loading - Data Integrity: Atomic operations prevent file corruption * fix(observability): resolve pagination statistics display issues - Fix issue where totalItems=0 showed 'Showing 1 to 0 of 0 results' - Hide pagination controls when totalItems=0 or totalPages<=1 - Standardize API response fields: pagination.total -> pagination.totalItems Before: Incorrect stats shown for empty data, and pager visible even for single-page results After: Stats hidden for empty data, pager hidden when navigation is unnecessary * feat(observability): display friendly provider names in request details - Add /api/usage/providers endpoint to dynamically fetch provider list with names - Replace hardcoded provider options with dynamic loading from database - Display friendly provider names instead of IDs in both table and detail drawer - Support custom provider nodes (e.g., OpenAI-compatible) with user-defined names - Add provider name caching to optimize performance * fix(observability): use INSERT OR REPLACE for request details to handle streaming updates * fix(observability): resolve zero-token display issue by ensuring streaming usage capture and fixing key mismatch * fix(observability): separate TTFT and total latency calculation for streaming requests * feat(observability): implement SQLite write queue and JSON size limits - Added in-memory buffer and batch writing for SQLite to prevent lock contention - Implemented with configurable 1MB limit to prevent DB bloat - Added dashboard UI for observability performance and data management settings - Integrated graceful shutdown handlers to prevent data loss * fix(observability): resolve ReferenceError by declaring dbInstance --- open-sse/handlers/chatCore.js | 196 ++++++- .../translator/response/claude-to-openai.js | 4 +- open-sse/utils/stream.js | 73 ++- open-sse/utils/usageTracking.js | 10 +- src/app/(dashboard)/dashboard/profile/page.js | 110 ++++ .../usage/components/RequestDetailsTab.js | 425 +++++++++++++++ src/app/(dashboard)/dashboard/usage/page.js | 3 + src/app/api/usage/providers/route.js | 62 +++ src/app/api/usage/request-details/route.js | 57 ++ src/lib/localDb.js | 10 +- src/lib/requestDetailsDb.js | 499 ++++++++++++++++++ src/lib/usageDb.js | 3 + src/shared/components/Drawer.js | 89 ++++ src/shared/components/Pagination.js | 146 +++++ 14 files changed, 1647 insertions(+), 40 deletions(-) create mode 100644 src/app/(dashboard)/dashboard/usage/components/RequestDetailsTab.js create mode 100644 src/app/api/usage/providers/route.js create mode 100644 src/app/api/usage/request-details/route.js create mode 100644 src/lib/requestDetailsDb.js create mode 100644 src/shared/components/Drawer.js create mode 100644 src/shared/components/Pagination.js diff --git a/open-sse/handlers/chatCore.js b/open-sse/handlers/chatCore.js index f502c4b..914a822 100644 --- a/open-sse/handlers/chatCore.js +++ b/open-sse/handlers/chatCore.js @@ -10,7 +10,7 @@ import { getModelTargetFormat, PROVIDER_ID_TO_ALIAS } from "../config/providerMo import { createErrorResult, parseUpstreamError, formatProviderError } from "../utils/error.js"; import { HTTP_STATUS } from "../config/constants.js"; import { handleBypassRequest } from "../utils/bypassHandler.js"; -import { saveRequestUsage, trackPendingRequest, appendRequestLog } from "@/lib/usageDb.js"; +import { saveRequestUsage, trackPendingRequest, appendRequestLog, saveRequestDetail } from "@/lib/usageDb.js"; import { getExecutor } from "../executors/index.js"; /** @@ -225,6 +225,38 @@ function extractUsageFromResponse(responseBody, provider) { return null; } +/** + * Extract full request configuration from body + * Captures all relevant parameters for request details + */ +function extractRequestConfig(body, stream) { + const config = { + messages: body.messages || [], + model: body.model, + stream: stream + }; + + // Add all optional configuration parameters + const optionalParams = [ + 'temperature', 'top_p', 'top_k', + 'max_tokens', 'max_completion_tokens', + 'thinking', 'reasoning', 'enable_thinking', + 'presence_penalty', 'frequency_penalty', + 'seed', 'stop', 'tools', 'tool_choice', + 'response_format', 'prediction', 'store', 'metadata', + 'n', 'logprobs', 'top_logprobs', 'logit_bias', + 'user', 'parallel_tool_calls' + ]; + + for (const param of optionalParams) { + if (body[param] !== undefined) { + config[param] = body[param]; + } + } + + return config; +} + /** * Convert OpenAI-style SSE chunks into a single non-streaming JSON response. * Used as a fallback when upstream returns text/event-stream for stream=false. @@ -315,6 +347,7 @@ function parseSSEToOpenAIResponse(rawSSE, fallbackModel) { */ export async function handleChatCore({ body, modelInfo, credentials, log, onCredentialsRefreshed, onRequestSuccess, onDisconnect, clientRawRequest, connectionId, userAgent }) { const { provider, model } = modelInfo; + const requestStartTime = Date.now(); const sourceFormat = detectFormat(body); @@ -407,6 +440,26 @@ export async function handleChatCore({ body, modelInfo, credentials, log, onCred } catch (error) { trackPendingRequest(model, provider, connectionId, false); appendRequestLog({ model, provider, connectionId, status: `FAILED ${error.name === "AbortError" ? 499 : HTTP_STATUS.BAD_GATEWAY}` }).catch(() => { }); + + const errorDetail = { + provider: provider || "unknown", + model: model || "unknown", + connectionId: connectionId || undefined, + timestamp: new Date().toISOString(), + latency: { ttft: 0, total: Date.now() - requestStartTime }, + tokens: { prompt_tokens: 0, completion_tokens: 0 }, + request: extractRequestConfig(body, stream), + providerRequest: translatedBody || null, + providerResponse: null, + response: { + error: error.message || String(error), + status: error.name === "AbortError" ? 499 : 502, + thinking: null + }, + status: "error" + }; + saveRequestDetail(errorDetail).catch(() => {}); + if (error.name === "AbortError") { streamController.handleError(error); return createErrorResult(499, "Request aborted"); @@ -463,6 +516,26 @@ export async function handleChatCore({ body, modelInfo, credentials, log, onCred trackPendingRequest(model, provider, connectionId, false); const { statusCode, message, retryAfterMs } = await parseUpstreamError(providerResponse, provider); appendRequestLog({ model, provider, connectionId, status: `FAILED ${statusCode}` }).catch(() => { }); + + const errorDetail = { + provider: provider || "unknown", + model: model || "unknown", + connectionId: connectionId || undefined, + timestamp: new Date().toISOString(), + latency: { ttft: 0, total: Date.now() - requestStartTime }, + tokens: { prompt_tokens: 0, completion_tokens: 0 }, + request: extractRequestConfig(body, stream), + providerRequest: finalBody || translatedBody || null, + providerResponse: null, + response: { + error: message, + status: statusCode, + thinking: null + }, + status: "error" + }; + saveRequestDetail(errorDetail).catch(() => {}); + const errMsg = formatProviderError(new Error(message), provider, model, statusCode); console.log(`${COLORS.red}[ERROR] ${errMsg}${COLORS.reset}`); @@ -531,6 +604,37 @@ export async function handleChatCore({ body, modelInfo, credentials, log, onCred translatedResponse.usage = filterUsageForFormat(buffered, sourceFormat); } + const totalLatency = Date.now() - requestStartTime; + const requestDetail = { + provider: provider || "unknown", + model: model || "unknown", + connectionId: connectionId || undefined, + timestamp: new Date().toISOString(), + latency: { + ttft: totalLatency, + total: totalLatency + }, + tokens: usage || { prompt_tokens: 0, completion_tokens: 0 }, + request: extractRequestConfig(body, stream), + providerRequest: finalBody || translatedBody || null, + providerResponse: responseBody || null, + response: { + content: translatedResponse?.choices?.[0]?.message?.content || + translatedResponse?.content || + null, + thinking: translatedResponse?.choices?.[0]?.message?.reasoning_content || + translatedResponse?.reasoning_content || + null, + finish_reason: translatedResponse?.choices?.[0]?.finish_reason || "unknown" + }, + status: "success" + }; + + // Async save (don't block response) + saveRequestDetail(requestDetail).catch(err => { + console.error("[RequestDetail] Failed to save:", err.message); + }); + return { success: true, response: new Response(JSON.stringify(translatedResponse), { @@ -556,31 +660,103 @@ export async function handleChatCore({ body, modelInfo, credentials, log, onCred "Access-Control-Allow-Origin": "*" }; - // Create transform stream with logger for streaming response + let streamContent = ""; + let streamUsage = null; + const streamDetailId = `${Date.now()}-${Math.random().toString(36).slice(2, 11)}`; + + const onStreamComplete = (contentObj, usage, ttftAt) => { + // contentObj is object { content, thinking } + streamUsage = usage; + + const updatedDetail = { + provider: provider || "unknown", + model: model || "unknown", + connectionId: connectionId || undefined, + timestamp: new Date().toISOString(), + latency: { + ttft: ttftAt ? ttftAt - requestStartTime : Date.now() - requestStartTime, + total: Date.now() - requestStartTime + }, + tokens: usage || { prompt_tokens: 0, completion_tokens: 0 }, + request: extractRequestConfig(body, stream), + providerRequest: finalBody || translatedBody || null, + providerResponse: contentObj.content || "[Empty streaming response]", + response: { + content: contentObj.content || "[Empty streaming response]", + thinking: contentObj.thinking || null, + type: "streaming" + }, + status: "success", + id: streamDetailId + }; + + saveRequestDetail(updatedDetail).catch(err => { + console.error("[RequestDetail] Failed to update streaming content:", err.message); + }); + + // Save usage stats for dashboard + if (usage && typeof usage === 'object') { + const msg = `[${new Date().toLocaleTimeString("en-US", { hour12: false, hour: "2-digit", minute: "2-digit" })}] 📊 [STREAM USAGE] ${provider.toUpperCase()} | in=${usage?.prompt_tokens || 0} | out=${usage?.completion_tokens || 0}${connectionId ? ` | account=${connectionId.slice(0, 8)}...` : ""}`; + console.log(`${COLORS.green}${msg}${COLORS.reset}`); + + saveRequestUsage({ + provider: provider || "unknown", + model: model || "unknown", + tokens: usage, + timestamp: new Date().toISOString(), + connectionId: connectionId || undefined + }).catch(err => { + console.error("Failed to save streaming usage stats:", err.message); + }); + } + }; + let transformStream; - // For Codex provider, translate response from openai-responses to openai (Chat Completions) format - // UNLESS client is Droid CLI which expects openai-responses format back const isDroidCLI = userAgent?.toLowerCase().includes('droid') || userAgent?.toLowerCase().includes('codex-cli'); const needsCodexTranslation = provider === 'codex' && targetFormat === 'openai-responses' && !isDroidCLI; if (needsCodexTranslation) { - // Codex returns openai-responses, translate to openai (Chat Completions) that clients expect log?.debug?.("STREAM", `Codex translation mode: openai-responses → openai`); - transformStream = createSSETransformStreamWithLogger('openai-responses', 'openai', provider, reqLogger, toolNameMap, model, connectionId, body); + transformStream = createSSETransformStreamWithLogger('openai-responses', 'openai', provider, reqLogger, toolNameMap, model, connectionId, body, onStreamComplete); } else if (needsTranslation(targetFormat, sourceFormat)) { - // Standard translation for other providers log?.debug?.("STREAM", `Translation mode: ${targetFormat} → ${sourceFormat}`); - transformStream = createSSETransformStreamWithLogger(targetFormat, sourceFormat, provider, reqLogger, toolNameMap, model, connectionId, body); + transformStream = createSSETransformStreamWithLogger(targetFormat, sourceFormat, provider, reqLogger, toolNameMap, model, connectionId, body, onStreamComplete); } else { log?.debug?.("STREAM", `Standard passthrough mode`); - transformStream = createPassthroughStreamWithLogger(provider, reqLogger, model, connectionId, body); + transformStream = createPassthroughStreamWithLogger(provider, reqLogger, model, connectionId, body, onStreamComplete); } - // Pipe response through transform with disconnect detection const transformedBody = pipeWithDisconnect(providerResponse, transformStream, streamController); + const totalLatency = Date.now() - requestStartTime; + const streamingDetail = { + provider: provider || "unknown", + model: model || "unknown", + connectionId: connectionId || undefined, + timestamp: new Date().toISOString(), + latency: { + ttft: 0, + total: Date.now() - requestStartTime + }, + tokens: { prompt_tokens: 0, completion_tokens: 0 }, + request: extractRequestConfig(body, stream), + providerRequest: finalBody || translatedBody || null, + providerResponse: "[Streaming - raw response not captured]", + response: { + content: "[Streaming in progress...]", + thinking: null, + type: "streaming" + }, + status: "success", + id: streamDetailId + }; + + saveRequestDetail(streamingDetail).catch(err => { + console.error("[RequestDetail] Failed to save streaming request:", err.message); + }); + return { success: true, response: new Response(transformedBody, { diff --git a/open-sse/translator/response/claude-to-openai.js b/open-sse/translator/response/claude-to-openai.js index aa8ccd6..0501393 100644 --- a/open-sse/translator/response/claude-to-openai.js +++ b/open-sse/translator/response/claude-to-openai.js @@ -64,7 +64,7 @@ export function claudeToOpenAIResponse(chunk, state) { if (delta?.type === "text_delta" && delta.text) { results.push(createChunk(state, { content: delta.text })); } else if (delta?.type === "thinking_delta" && delta.thinking) { - results.push(createChunk(state, { content: delta.thinking })); + results.push(createChunk(state, { reasoning_content: delta.thinking })); } else if (delta?.type === "input_json_delta" && delta.partial_json) { const toolCall = state.toolCalls.get(chunk.index); if (toolCall) { @@ -83,7 +83,7 @@ export function claudeToOpenAIResponse(chunk, state) { case "content_block_stop": { if (state.inThinkingBlock && chunk.index === state.currentBlockIndex) { - results.push(createChunk(state, { content: "" })); + results.push(createChunk(state, { reasoning_content: "" })); state.inThinkingBlock = false; } state.textBlockStarted = false; diff --git a/open-sse/utils/stream.js b/open-sse/utils/stream.js index 76f4e7a..1e7938a 100644 --- a/open-sse/utils/stream.js +++ b/open-sse/utils/stream.js @@ -28,6 +28,7 @@ const STREAM_MODE = { * @param {string} options.model - Model name * @param {string} options.connectionId - Connection ID for usage tracking * @param {object} options.body - Request body (for input token estimation) + * @param {function} options.onStreamComplete - Callback when stream completes (content, usage) */ export function createSSEStream(options = {}) { const { @@ -39,20 +40,25 @@ export function createSSEStream(options = {}) { toolNameMap = null, model = null, connectionId = null, - body = null + body = null, + onStreamComplete = null } = options; let buffer = ""; let usage = null; - // State for translate mode const state = mode === STREAM_MODE.TRANSLATE ? { ...initState(sourceFormat), provider, toolNameMap } : null; - // Track content length for usage estimation (both modes) let totalContentLength = 0; + let accumulatedContent = ""; + let accumulatedThinking = ""; + let ttftAt = null; return new TransformStream({ transform(chunk, controller) { + if (!ttftAt) { + ttftAt = Date.now(); + } const text = sharedDecoder.decode(chunk, { stream: true }); buffer += text; reqLogger?.appendProviderChunk?.(text); @@ -79,9 +85,15 @@ export function createSSEStream(options = {}) { } const delta = parsed.choices?.[0]?.delta; - const content = delta?.content || delta?.reasoning_content; + const content = delta?.content; + const reasoning = delta?.reasoning_content; if (content && typeof content === "string") { totalContentLength += content.length; + accumulatedContent += content; + } + if (reasoning && typeof reasoning === "string") { + totalContentLength += reasoning.length; + accumulatedThinking += reasoning; } const extracted = extractUsage(parsed); @@ -134,30 +146,39 @@ export function createSSEStream(options = {}) { continue; } - // Track content length for estimation (from various formats) - // Include both regular content and reasoning/thinking content - - // Claude format + // Claude format - content if (parsed.delta?.text) { totalContentLength += parsed.delta.text.length; + accumulatedContent += parsed.delta.text; } + // Claude format - thinking if (parsed.delta?.thinking) { totalContentLength += parsed.delta.thinking.length; + accumulatedThinking += parsed.delta.thinking; } - // OpenAI format + // OpenAI format - content if (parsed.choices?.[0]?.delta?.content) { totalContentLength += parsed.choices[0].delta.content.length; + accumulatedContent += parsed.choices[0].delta.content; } + // OpenAI format - reasoning if (parsed.choices?.[0]?.delta?.reasoning_content) { totalContentLength += parsed.choices[0].delta.reasoning_content.length; + accumulatedThinking += parsed.choices[0].delta.reasoning_content; } - // Gemini format - may have multiple parts + // Gemini format if (parsed.candidates?.[0]?.content?.parts) { for (const part of parsed.candidates[0].content.parts) { if (part.text && typeof part.text === "string") { totalContentLength += part.text.length; + // Check if this is thinking content + if (part.thought === true) { + accumulatedThinking += part.text; + } else { + accumulatedContent += part.text; + } } } } @@ -220,7 +241,6 @@ export function createSSEStream(options = {}) { controller.enqueue(sharedEncoder.encode(output)); } - // Estimate usage if provider didn't return valid usage (PASSTHROUGH is always OpenAI format) if (!hasValidUsage(usage) && totalContentLength > 0) { usage = estimateUsage(body, totalContentLength, FORMATS.OPENAI); } @@ -230,16 +250,21 @@ export function createSSEStream(options = {}) { } else { appendRequestLog({ model, provider, connectionId, tokens: null, status: "200 OK" }).catch(() => { }); } + + if (onStreamComplete) { + onStreamComplete({ + content: accumulatedContent, + thinking: accumulatedThinking + }, usage, ttftAt); + } return; } - // Translate mode: process remaining buffer if (buffer.trim()) { const parsed = parseSSELine(buffer.trim()); if (parsed && !parsed.done) { const translated = translateResponse(targetFormat, sourceFormat, parsed, state); - // Log OpenAI intermediate chunks if (translated?._openaiIntermediate) { for (const item of translated._openaiIntermediate) { const openaiOutput = formatSSE(item, FORMATS.OPENAI); @@ -257,10 +282,8 @@ export function createSSEStream(options = {}) { } } - // Flush remaining events (only once at stream end) const flushed = translateResponse(targetFormat, sourceFormat, null, state); - // Log OpenAI intermediate chunks for flushed events if (flushed?._openaiIntermediate) { for (const item of flushed._openaiIntermediate) { const openaiOutput = formatSSE(item, FORMATS.OPENAI); @@ -276,12 +299,10 @@ export function createSSEStream(options = {}) { } } - // Send [DONE] and log usage const doneOutput = "data: [DONE]\n\n"; reqLogger?.appendConvertedChunk?.(doneOutput); controller.enqueue(sharedEncoder.encode(doneOutput)); - // Estimate usage if provider didn't return valid usage (for translate mode) if (!hasValidUsage(state?.usage) && totalContentLength > 0) { state.usage = estimateUsage(body, totalContentLength, sourceFormat); } @@ -291,6 +312,13 @@ export function createSSEStream(options = {}) { } else { appendRequestLog({ model, provider, connectionId, tokens: null, status: "200 OK" }).catch(() => { }); } + + if (onStreamComplete) { + onStreamComplete({ + content: accumulatedContent, + thinking: accumulatedThinking + }, state?.usage, ttftAt); + } } catch (error) { console.log("Error in flush:", error); } @@ -298,8 +326,7 @@ export function createSSEStream(options = {}) { }); } -// Convenience functions for backward compatibility -export function createSSETransformStreamWithLogger(targetFormat, sourceFormat, provider = null, reqLogger = null, toolNameMap = null, model = null, connectionId = null, body = null) { +export function createSSETransformStreamWithLogger(targetFormat, sourceFormat, provider = null, reqLogger = null, toolNameMap = null, model = null, connectionId = null, body = null, onStreamComplete = null) { return createSSEStream({ mode: STREAM_MODE.TRANSLATE, targetFormat, @@ -309,17 +336,19 @@ export function createSSETransformStreamWithLogger(targetFormat, sourceFormat, p toolNameMap, model, connectionId, - body + body, + onStreamComplete }); } -export function createPassthroughStreamWithLogger(provider = null, reqLogger = null, model = null, connectionId = null, body = null) { +export function createPassthroughStreamWithLogger(provider = null, reqLogger = null, model = null, connectionId = null, body = null, onStreamComplete = null) { return createSSEStream({ mode: STREAM_MODE.PASSTHROUGH, provider, reqLogger, model, connectionId, - body + body, + onStreamComplete }); } diff --git a/open-sse/utils/usageTracking.js b/open-sse/utils/usageTracking.js index b6646e2..36054b3 100644 --- a/open-sse/utils/usageTracking.js +++ b/open-sse/utils/usageTracking.js @@ -312,11 +312,11 @@ export function logUsage(provider, usage, model = null, connectionId = null) { // Save to usage DB const tokens = { - input: inTokens, - output: outTokens, - cacheRead: cacheRead || 0, - cacheCreation: cacheCreation || 0, - reasoning: reasoning || 0 + prompt_tokens: inTokens, + completion_tokens: outTokens, + cache_read_input_tokens: cacheRead || 0, + cache_creation_input_tokens: cacheCreation || 0, + reasoning_tokens: reasoning || 0 }; saveRequestUsage({ model, provider, connectionId, tokens }).catch(() => { }); appendRequestLog({ model, provider, connectionId, tokens, status: "200 OK" }).catch(() => { }); diff --git a/src/app/(dashboard)/dashboard/profile/page.js b/src/app/(dashboard)/dashboard/profile/page.js index fa6387c..32bae26 100644 --- a/src/app/(dashboard)/dashboard/profile/page.js +++ b/src/app/(dashboard)/dashboard/profile/page.js @@ -110,6 +110,24 @@ export default function ProfilePage() { } }; + const updateObservabilitySetting = async (key, value) => { + const numValue = parseInt(value); + if (isNaN(numValue) || numValue < 1) return; + + try { + const res = await fetch("/api/settings", { + method: "PATCH", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ [key]: numValue }), + }); + if (res.ok) { + setSettings(prev => ({ ...prev, [key]: numValue })); + } + } catch (err) { + console.error(`Failed to update ${key}:`, err); + } + }; + return (
@@ -293,6 +311,7 @@ export default function ProfilePage() { {["light", "dark", "system"].map((option) => (
+ {/* Observability Settings */} + +
+
+ monitoring +
+

Observability

+
+
+
+
+

Max Records

+

+ Maximum request detail records to keep (older records are auto-deleted) +

+
+ updateObservabilitySetting("observabilityMaxRecords", parseInt(e.target.value))} + disabled={loading} + className="w-28 text-center" + /> +
+ +
+
+

Batch Size

+

+ Number of items to accumulate before writing to database (higher = better performance) +

+
+ updateObservabilitySetting("observabilityBatchSize", parseInt(e.target.value))} + disabled={loading} + className="w-28 text-center" + /> +
+ +
+
+

Flush Interval (ms)

+

+ Maximum time to wait before flushing buffer (prevents data loss during low traffic) +

+
+ updateObservabilitySetting("observabilityFlushIntervalMs", parseInt(e.target.value))} + disabled={loading} + className="w-28 text-center" + /> +
+ +
+
+

Max JSON Size (KB)

+

+ Maximum size for each JSON field (request/response) before truncation +

+
+ updateObservabilitySetting("observabilityMaxJsonSize", parseInt(e.target.value))} + disabled={loading} + className="w-28 text-center" + /> +
+ +

+ Current: Keeps {settings.observabilityMaxRecords || 1000} records, batches every {settings.observabilityBatchSize || 20} requests, max {settings.observabilityMaxJsonSize || 1024}KB per field +

+
+
+ {/* App Info */}

{APP_CONFIG.name} v{APP_CONFIG.version}

diff --git a/src/app/(dashboard)/dashboard/usage/components/RequestDetailsTab.js b/src/app/(dashboard)/dashboard/usage/components/RequestDetailsTab.js new file mode 100644 index 0000000..4f52736 --- /dev/null +++ b/src/app/(dashboard)/dashboard/usage/components/RequestDetailsTab.js @@ -0,0 +1,425 @@ +"use client"; + +import { useState, useEffect, useCallback } from "react"; +import Card from "@/shared/components/Card"; +import Button from "@/shared/components/Button"; +import Drawer from "@/shared/components/Drawer"; +import Pagination from "@/shared/components/Pagination"; +import { cn } from "@/shared/utils/cn"; +import { AI_PROVIDERS, getProviderByAlias } from "@/shared/constants/providers"; + +let providerNameCache = null; +let providerNodesCache = null; + +async function fetchProviderNames() { + if (providerNameCache && providerNodesCache) { + return { providerNameCache, providerNodesCache }; + } + + const nodesRes = await fetch("/api/provider-nodes"); + const nodesData = await nodesRes.json(); + const nodes = nodesData.nodes || []; + providerNodesCache = {}; + + for (const node of nodes) { + providerNodesCache[node.id] = node.name; + } + + providerNameCache = { + ...AI_PROVIDERS, + ...providerNodesCache + }; + + return { providerNameCache, providerNodesCache }; +} + +function getProviderName(providerId, cache) { + if (!providerId) return providerId; + if (!cache) return providerId; + + const cached = cache[providerId]; + + if (typeof cached === 'string') { + return cached; + } + + if (cached?.name) { + return cached.name; + } + + const providerConfig = getProviderByAlias(providerId) || AI_PROVIDERS[providerId]; + return providerConfig?.name || providerId; +} + +function CollapsibleSection({ title, children, defaultOpen = false, icon = null }) { + const [isOpen, setIsOpen] = useState(defaultOpen); + + return ( +
+ + + {isOpen && ( +
+ {children} +
+ )} +
+ ); +} + +export default function RequestDetailsTab() { + const [details, setDetails] = useState([]); + const [pagination, setPagination] = useState({ + page: 1, + pageSize: 20, + totalItems: 0, + totalPages: 0 + }); + const [loading, setLoading] = useState(false); + const [selectedDetail, setSelectedDetail] = useState(null); + const [isDrawerOpen, setIsDrawerOpen] = useState(false); + const [providers, setProviders] = useState([]); + const [providerNameCache, setProviderNameCache] = useState(null); + const [filters, setFilters] = useState({ + provider: "", + startDate: "", + endDate: "" + }); + + const fetchProviders = useCallback(async () => { + try { + const res = await fetch("/api/usage/providers"); + const data = await res.json(); + setProviders(data.providers || []); + + const cache = await fetchProviderNames(); + setProviderNameCache(cache.providerNameCache); + } catch (error) { + console.error("Failed to fetch providers:", error); + } + }, []); + + const fetchDetails = useCallback(async () => { + setLoading(true); + try { + const params = new URLSearchParams({ + page: pagination.page.toString(), + pageSize: pagination.pageSize.toString() + }); + if (filters.provider) params.append("provider", filters.provider); + if (filters.startDate) params.append("startDate", filters.startDate); + if (filters.endDate) params.append("endDate", filters.endDate); + + const res = await fetch(`/api/usage/request-details?${params}`); + const data = await res.json(); + + setDetails(data.details || []); + setPagination(prev => ({ ...prev, ...data.pagination })); + } catch (error) { + console.error("Failed to fetch request details:", error); + } finally { + setLoading(false); + } + }, [pagination.page, pagination.pageSize, filters]); + + useEffect(() => { + fetchProviders(); + }, [fetchProviders]); + + useEffect(() => { + fetchDetails(); + }, [fetchDetails]); + + const handleViewDetail = (detail) => { + setSelectedDetail(detail); + setIsDrawerOpen(true); + }; + + const handlePageChange = (newPage) => { + setPagination(prev => ({ ...prev, page: newPage })); + }; + + const handlePageSizeChange = (newPageSize) => { + setPagination(prev => ({ ...prev, pageSize: newPageSize, page: 1 })); + }; + + const handleClearFilters = () => { + setFilters({ provider: "", startDate: "", endDate: "" }); + }; + + return ( +
+ +
+
+ + +
+ +
+ + setFilters({ ...filters, startDate: e.target.value })} + className={cn( + "h-9 px-3 rounded-lg border border-black/10 dark:border-white/10 bg-surface", + "text-sm text-text-main focus:outline-none focus:ring-2 focus:ring-primary/20" + )} + /> +
+ +
+ + setFilters({ ...filters, endDate: e.target.value })} + className={cn( + "h-9 px-3 rounded-lg border border-black/10 dark:border-white/10 bg-surface", + "text-sm text-text-main focus:outline-none focus:ring-2 focus:ring-primary/20" + )} + /> +
+ +
+ + +
+
+
+ + +
+ + + + + + + + + + + + + + {loading ? ( + + + + ) : details.length === 0 ? ( + + + + ) : ( + details.map((detail, index) => ( + + + + + + + + + + )) + )} + +
TimestampModelProviderInput TokensOutput TokensLatencyAction
+
+ progress_activity + Loading... +
+
+ No request details found +
+ {new Date(detail.timestamp).toLocaleString()} + + {detail.model} + + + {getProviderName(detail.provider, providerNameCache)} + + + {detail.tokens?.prompt_tokens?.toLocaleString() || 0} + + {detail.tokens?.completion_tokens?.toLocaleString() || 0} + +
+
TTFT: {detail.latency?.ttft || 0}ms
+
Total: {detail.latency?.total || 0}ms
+
+
+ +
+
+ + {!loading && details.length > 0 && ( +
+ +
+ )} +
+ + setIsDrawerOpen(false)} + title="Request Details" + width="lg" + > + {selectedDetail && ( +
+
+
+ ID:{" "} + {selectedDetail.id} +
+
+ Timestamp:{" "} + {new Date(selectedDetail.timestamp).toLocaleString()} +
+
+ Provider:{" "} + {getProviderName(selectedDetail.provider, providerNameCache)} +
+
+ Model:{" "} + {selectedDetail.model} +
+
+ Status:{" "} + + {selectedDetail.status} + +
+
+ Latency:{" "} + + TTFT {selectedDetail.latency?.ttft || 0}ms / Total {selectedDetail.latency?.total || 0}ms + +
+
+ Input Tokens:{" "} + + {selectedDetail.tokens?.prompt_tokens?.toLocaleString() || 0} + +
+
+ Output Tokens:{" "} + + {selectedDetail.tokens?.completion_tokens?.toLocaleString() || 0} + +
+
+ +
+ +
+                  {JSON.stringify(selectedDetail.request, null, 2)}
+                
+
+ + {selectedDetail.providerRequest && ( + +
+                    {JSON.stringify(selectedDetail.providerRequest, null, 2)}
+                  
+
+ )} + + {selectedDetail.providerResponse && ( + +
+                    {typeof selectedDetail.providerResponse === 'object'
+                      ? JSON.stringify(selectedDetail.providerResponse, null, 2)
+                      : selectedDetail.providerResponse
+                    }
+                  
+
+ )} + + + {selectedDetail.response?.thinking && ( +
+

+ psychology + Thinking Process +

+
+                      {selectedDetail.response.thinking}
+                    
+
+ )} + +

+ Content +

+
+                  {selectedDetail.response?.content || "[No content]"}
+                
+
+
+
+ )} +
+
+ ); +} diff --git a/src/app/(dashboard)/dashboard/usage/page.js b/src/app/(dashboard)/dashboard/usage/page.js index 90e372b..b674e97 100644 --- a/src/app/(dashboard)/dashboard/usage/page.js +++ b/src/app/(dashboard)/dashboard/usage/page.js @@ -3,6 +3,7 @@ import { useState, Suspense } from "react"; import { UsageStats, RequestLogger, CardSkeleton, SegmentedControl } from "@/shared/components"; import ProviderLimits from "./components/ProviderLimits"; +import RequestDetailsTab from "./components/RequestDetailsTab"; export default function UsagePage() { const [activeTab, setActiveTab] = useState("overview"); @@ -14,6 +15,7 @@ export default function UsagePage() { { value: "overview", label: "Overview" }, { value: "logs", label: "Logger" }, { value: "limits", label: "Limits" }, + { value: "details", label: "Details" }, ]} value={activeTab} onChange={setActiveTab} @@ -31,6 +33,7 @@ export default function UsagePage() { )} + {activeTab === "details" && }
); } diff --git a/src/app/api/usage/providers/route.js b/src/app/api/usage/providers/route.js new file mode 100644 index 0000000..baa1cff --- /dev/null +++ b/src/app/api/usage/providers/route.js @@ -0,0 +1,62 @@ +import { NextResponse } from "next/server"; +import { getRequestDetailsDb } from "@/lib/requestDetailsDb"; +import { getProviderNodes } from "@/lib/localDb"; +import { AI_PROVIDERS, getProviderByAlias } from "@/shared/constants/providers"; + +/** + * GET /api/usage/providers + * Returns list of unique providers from request details + */ +export async function GET() { + try { + const db = await getRequestDetailsDb(); + + const stmt = db.prepare(` + SELECT DISTINCT provider + FROM request_details + WHERE provider IS NOT NULL AND provider != '' + ORDER BY provider ASC + `); + + const rows = stmt.all(); + + // Fetch all provider nodes to get names for custom providers + const providerNodes = await getProviderNodes(); + const nodeMap = {}; + for (const node of providerNodes) { + nodeMap[node.id] = node.name; + } + + const providers = rows.map(row => { + const providerId = row.provider; + + // Try to find name from various sources + let name = providerId; + + // 1. Check if it's a custom provider node + if (nodeMap[providerId]) { + name = nodeMap[providerId]; + } + // 2. Check predefined providers + else { + const providerConfig = getProviderByAlias(providerId) || AI_PROVIDERS[providerId]; + if (providerConfig?.name) { + name = providerConfig.name; + } + } + + return { + id: providerId, + name + }; + }); + + return NextResponse.json({ providers }); + } catch (error) { + console.error("[API] Failed to get providers:", error); + return NextResponse.json( + { error: "Failed to fetch providers" }, + { status: 500 } + ); + } +} diff --git a/src/app/api/usage/request-details/route.js b/src/app/api/usage/request-details/route.js new file mode 100644 index 0000000..73a3ceb --- /dev/null +++ b/src/app/api/usage/request-details/route.js @@ -0,0 +1,57 @@ +import { NextResponse } from "next/server"; +import { getRequestDetails } from "@/lib/usageDb"; + +/** + * GET /api/usage/request-details + * Query parameters: page, pageSize (1-100), provider, model, connectionId, status, startDate, endDate + */ +export async function GET(request) { + try { + const { searchParams } = new URL(request.url); + + const page = parseInt(searchParams.get("page")) || 1; + const pageSize = parseInt(searchParams.get("pageSize")) || 20; + const provider = searchParams.get("provider"); + const model = searchParams.get("model"); + const connectionId = searchParams.get("connectionId"); + const status = searchParams.get("status"); + const startDate = searchParams.get("startDate"); + const endDate = searchParams.get("endDate"); + + if (page < 1) { + return NextResponse.json( + { error: "Page must be >= 1" }, + { status: 400 } + ); + } + + if (pageSize < 1 || pageSize > 100) { + return NextResponse.json( + { error: "PageSize must be between 1 and 100" }, + { status: 400 } + ); + } + + const filter = { + page, + pageSize + }; + + if (provider) filter.provider = provider; + if (model) filter.model = model; + if (connectionId) filter.connectionId = connectionId; + if (status) filter.status = status; + if (startDate) filter.startDate = startDate; + if (endDate) filter.endDate = endDate; + + const result = await getRequestDetails(filter); + + return NextResponse.json(result); + } catch (error) { + console.error("[API] Failed to get request details:", error); + return NextResponse.json( + { error: "Failed to fetch request details" }, + { status: 500 } + ); + } +} diff --git a/src/lib/localDb.js b/src/lib/localDb.js index 615d608..f15df97 100644 --- a/src/lib/localDb.js +++ b/src/lib/localDb.js @@ -50,7 +50,11 @@ const defaultData = { settings: { cloudEnabled: false, stickyRoundRobinLimit: 3, - requireLogin: true + requireLogin: true, + observabilityMaxRecords: 1000, + observabilityBatchSize: 20, + observabilityFlushIntervalMs: 5000, + observabilityMaxJsonSize: 1024 }, pricing: {} // NEW: pricing configuration }; @@ -67,6 +71,10 @@ function cloneDefaultData() { cloudEnabled: false, stickyRoundRobinLimit: 3, requireLogin: true, + observabilityMaxRecords: 1000, + observabilityBatchSize: 20, + observabilityFlushIntervalMs: 5000, + observabilityMaxJsonSize: 1024 }, pricing: {}, }; diff --git a/src/lib/requestDetailsDb.js b/src/lib/requestDetailsDb.js new file mode 100644 index 0000000..85b86c5 --- /dev/null +++ b/src/lib/requestDetailsDb.js @@ -0,0 +1,499 @@ +import Database from "better-sqlite3"; +import path from "path"; +import os from "os"; +import fs from "fs"; + +const isCloud = typeof caches !== 'undefined' || typeof caches === 'object'; + +// ============================================================================ +// CONFIGURATION: Batch Processing Settings +// ============================================================================ + +/** + * Get observability configuration from settings. + * Falls back to environment variables, then defaults. + */ +async function getObservabilityConfig() { + try { + const { getSettings } = await import("@/lib/localDb"); + const settings = await getSettings(); + + return { + maxRecords: settings.observabilityMaxRecords || parseInt(process.env.OBSERVABILITY_MAX_RECORDS || '1000', 10), + batchSize: settings.observabilityBatchSize || parseInt(process.env.OBSERVABILITY_BATCH_SIZE || '20', 10), + flushIntervalMs: settings.observabilityFlushIntervalMs || parseInt(process.env.OBSERVABILITY_FLUSH_INTERVAL_MS || '5000', 10), + maxJsonSize: (settings.observabilityMaxJsonSize || parseInt(process.env.OBSERVABILITY_MAX_JSON_SIZE || '1024', 10)) * 1024 + }; + } catch (error) { + console.error("[requestDetailsDb] Failed to load observability config:", error); + return { + maxRecords: 1000, + batchSize: 20, + flushIntervalMs: 5000, + maxJsonSize: 1024 * 1024 + }; + } +} + +// Cache config to avoid repeated database reads +let cachedConfig = null; + +let dbInstance = null; + +// Get app name +function getAppName() { + return "9router"; +} + +// Get user data directory based on platform +function getUserDataDir() { + if (isCloud) return "/tmp"; + + try { + const platform = process.platform; + const homeDir = os.homedir(); + const appName = getAppName(); + + if (platform === "win32") { + return path.join(process.env.APPDATA || path.join(homeDir, "AppData", "Roaming"), appName); + } else { + return path.join(homeDir, `.${appName}`); + } + } catch (error) { + console.error("[requestDetailsDb] Failed to get user data directory:", error.message); + return path.join(process.cwd(), ".9router"); + } +} + +// Database file path +const DATA_DIR = getUserDataDir(); +const DB_FILE = isCloud ? null : path.join(DATA_DIR, "request-details.sqlite"); + +// Ensure data directory exists +if (!isCloud && fs && typeof fs.existsSync === "function") { + try { + if (!fs.existsSync(DATA_DIR)) { + fs.mkdirSync(DATA_DIR, { recursive: true }); + } + } catch (error) { + console.error("[requestDetailsDb] Failed to create data directory:", error.message); + } +} + +// ============================================================================ +// BATCH WRITE QUEUE +// ============================================================================ + +/** + * In-memory buffer for batch writes. + * Accumulates request details before flushing to database in a transaction. + * @type {Array} + */ +let writeBuffer = []; + +/** + * Timer reference for auto-flush mechanism. + * Ensures data is written even during low traffic periods. + * @type {NodeJS.Timeout|null} + */ +let flushTimer = null; + +/** + * Flag indicating if a flush operation is currently in progress. + * Prevents concurrent flushes. + * @type {boolean} + */ +let isFlushing = false; + +/** + * Get SQLite database instance (singleton) + */ +export async function getRequestDetailsDb() { + if (isCloud) { + // In-memory mock for Workers + if (!dbInstance) { + dbInstance = { + prepare: () => ({ + run: () => {}, + get: () => null, + all: () => [] + }), + exec: () => {}, + pragma: () => {} + }; + } + return dbInstance; + } + + if (!dbInstance) { + const db = new Database(DB_FILE); + + // Configure for better concurrency + db.pragma('journal_mode = WAL'); // Write-Ahead Logging for concurrent access + db.pragma('synchronous = NORMAL'); // Faster than FULL, still safe + db.pragma('cache_size = -64000'); // 64MB cache + db.pragma('temp_store = MEMORY'); // Use memory for temp tables + + // Create table with indexes + db.exec(` + CREATE TABLE IF NOT EXISTS request_details ( + id TEXT PRIMARY KEY, + provider TEXT, + model TEXT, + connection_id TEXT, + timestamp INTEGER NOT NULL, + status TEXT, + latency TEXT, + tokens TEXT, + request TEXT, + provider_request TEXT, + provider_response TEXT, + response TEXT + ); + + -- Indexes for common queries + CREATE INDEX IF NOT EXISTS idx_timestamp + ON request_details(timestamp DESC); + CREATE INDEX IF NOT EXISTS idx_provider + ON request_details(provider); + CREATE INDEX IF NOT EXISTS idx_model + ON request_details(model); + CREATE INDEX IF NOT EXISTS idx_connection + ON request_details(connection_id); + CREATE INDEX IF NOT EXISTS idx_status + ON request_details(status); + `); + + dbInstance = db; + + // Register shutdown handler on first database initialization + ensureShutdownHandler(); + } + + return dbInstance; +} + +/** + * Generate unique ID for request detail + */ +function generateDetailId(model) { + const timestamp = new Date().toISOString(); + const random = Math.random().toString(36).substring(2, 8); + const modelPart = model ? model.replace(/[^a-zA-Z0-9-]/g, '-') : 'unknown'; + return `${timestamp}-${random}-${modelPart}`; +} + +/** + * Flush all buffered items to database in a single transaction. + * This function is called automatically when: + * 1. Buffer size reaches OBSERVABILITY_BATCH_SIZE + * 2. OBSERVABILITY_FLUSH_INTERVAL_MS elapses + * 3. Process is shutting down (graceful shutdown) + * + * @private + */ +async function flushToDatabase() { + if (isCloud || isFlushing || writeBuffer.length === 0) { + return; + } + + isFlushing = true; + + try { + // Take a snapshot of the buffer and clear it immediately + const itemsToSave = [...writeBuffer]; + writeBuffer = []; + + const db = await getRequestDetailsDb(); + const config = await getObservabilityConfig(); + + // Prepare statements outside transaction for better performance + const insertStmt = db.prepare(` + INSERT OR REPLACE INTO request_details + (id, provider, model, connection_id, timestamp, status, latency, tokens, + request, provider_request, provider_response, response) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + `); + + const deleteStmt = db.prepare(` + DELETE FROM request_details + WHERE id NOT IN ( + SELECT id FROM request_details + ORDER BY timestamp DESC + LIMIT ? + ) + `); + + // Execute all writes in a single transaction for atomicity + const transaction = db.transaction((items) => { + const maxJsonSize = config.maxJsonSize; + + for (const item of items) { + if (!item.id) { + item.id = generateDetailId(item.model); + } + + if (!item.timestamp) { + item.timestamp = new Date().toISOString(); + } + + // Sanitize headers if present + if (item.request && item.request.headers) { + item.request.headers = sanitizeHeaders(item.request.headers); + } + + insertStmt.run( + item.id, + item.provider || null, + item.model || null, + item.connectionId || null, + new Date(item.timestamp).getTime(), + item.status || null, + JSON.stringify(item.latency || {}), + JSON.stringify(item.tokens || {}), + safeJsonStringify(item.request || {}, maxJsonSize), + safeJsonStringify(item.providerRequest || {}, maxJsonSize), + safeJsonStringify(item.providerResponse || {}, maxJsonSize), + safeJsonStringify(item.response || {}, maxJsonSize) + ); + } + + // Cleanup old records once per batch (not per item) + deleteStmt.run(config.maxRecords); + }); + + transaction(itemsToSave); + } catch (error) { + console.error("[requestDetailsDb] Batch write failed:", error); + } finally { + isFlushing = false; + } +} + +/** + * Safely stringify an object with a size limit. + * Truncates the result if it exceeds the limit. + * @param {object} obj - Object to stringify + * @param {number} maxSize - Maximum string size in bytes + * @returns {string} + */ +function safeJsonStringify(obj, maxSize) { + try { + const str = JSON.stringify(obj); + if (str.length > maxSize) { + return str.substring(0, maxSize) + "... (truncated due to size limit)"; + } + return str; + } catch (error) { + return JSON.stringify({ error: "Failed to stringify object", message: error.message }); + } +} + +/** + * Sanitize sensitive headers from request + */ +function sanitizeHeaders(headers) { + if (!headers || typeof headers !== 'object') return {}; + + const sensitiveKeys = ['authorization', 'x-api-key', 'cookie', 'token', 'api-key']; + const sanitized = { ...headers }; + + for (const key of Object.keys(sanitized)) { + if (sensitiveKeys.some(sensitive => key.toLowerCase().includes(sensitive))) { + delete sanitized[key]; + } + } + + return sanitized; +} + +/** + * Save request detail to SQLite (batched for performance). + * Details are accumulated in memory and flushed to database in batches. + * + * @param {object} detail - Request detail object + * @see {@link flushToDatabase} for batch write implementation + */ +export async function saveRequestDetail(detail) { + if (isCloud) return; + + if (!cachedConfig) { + cachedConfig = await getObservabilityConfig(); + } + + writeBuffer.push(detail); + + if (writeBuffer.length >= cachedConfig.batchSize) { + await flushToDatabase(); + + if (flushTimer) { + clearTimeout(flushTimer); + flushTimer = null; + } + } else if (!flushTimer) { + flushTimer = setTimeout(() => { + flushToDatabase().catch(() => {}); + flushTimer = null; + }, cachedConfig.flushIntervalMs); + } +} + +// ============================================================================ +// GRACEFUL SHUTDOWN HANDLER +// ============================================================================ + +let shutdownHandlerRegistered = false; + +/** + * Register process shutdown handlers to flush remaining data before exit. + * Should be called once when the module initializes. + */ +function ensureShutdownHandler() { + if (shutdownHandlerRegistered || isCloud) { + return; + } + + const handler = async () => { + // Clear timer to prevent any pending flush + if (flushTimer) { + clearTimeout(flushTimer); + flushTimer = null; + } + + // Flush any remaining data in buffer + if (writeBuffer.length > 0) { + console.log(`[requestDetailsDb] Flushing ${writeBuffer.length} items before shutdown...`); + await flushToDatabase(); + } + }; + + // Register handlers for various termination signals + process.on('beforeExit', handler); + process.on('SIGINT', handler); + process.on('SIGTERM', handler); + process.on('exit', handler); + + shutdownHandlerRegistered = true; +} + +/** + * Get request details with filtering and pagination + * @param {object} filter - Filter options + * @returns {Promise} Details with pagination info + */ +export async function getRequestDetails(filter = {}) { + const db = await getRequestDetailsDb(); + + if (isCloud) { + return { details: [], pagination: { page: 1, pageSize: filter.pageSize || 50, totalItems: 0, totalPages: 0, hasNext: false, hasPrev: false } }; + } + + let query = 'SELECT * FROM request_details WHERE 1=1'; + const params = []; + + if (filter.provider) { + query += ' AND provider = ?'; + params.push(filter.provider); + } + + if (filter.model) { + query += ' AND model = ?'; + params.push(filter.model); + } + + if (filter.connectionId) { + query += ' AND connection_id = ?'; + params.push(filter.connectionId); + } + + if (filter.status) { + query += ' AND status = ?'; + params.push(filter.status); + } + + if (filter.startDate) { + query += ' AND timestamp >= ?'; + params.push(new Date(filter.startDate).getTime()); + } + + if (filter.endDate) { + query += ' AND timestamp <= ?'; + params.push(new Date(filter.endDate).getTime()); + } + + // Get total count first + const countQuery = query.replace('SELECT *', 'SELECT COUNT(*)'); + const countStmt = db.prepare(countQuery); + const totalResult = countStmt.get(...params); + const total = totalResult['COUNT(*)']; + + // Add pagination + query += ' ORDER BY timestamp DESC'; + const page = filter.page || 1; + const pageSize = filter.pageSize || 50; + query += ' LIMIT ? OFFSET ?'; + params.push(pageSize, (page - 1) * pageSize); + + // Execute query + const stmt = db.prepare(query); + const rows = stmt.all(...params); + + // Convert back to original format + const details = rows.map(row => ({ + id: row.id, + provider: row.provider, + model: row.model, + connectionId: row.connection_id, + timestamp: new Date(row.timestamp).toISOString(), + status: row.status, + latency: JSON.parse(row.latency || '{}'), + tokens: JSON.parse(row.tokens || '{}'), + request: JSON.parse(row.request || '{}'), + providerRequest: JSON.parse(row.provider_request || '{}'), + providerResponse: JSON.parse(row.provider_response || '{}'), + response: JSON.parse(row.response || '{}') + })); + + return { + details, + pagination: { + page, + pageSize, + totalItems: total, + totalPages: Math.ceil(total / pageSize), + hasNext: page < Math.ceil(total / pageSize), + hasPrev: page > 1 + } + }; +} + +/** + * Get single request detail by ID + * @param {string} id - Request detail ID + * @returns {Promise} Request detail or null + */ +export async function getRequestDetailById(id) { + const db = await getRequestDetailsDb(); + + if (isCloud) return null; + + const stmt = db.prepare('SELECT * FROM request_details WHERE id = ?'); + const row = stmt.get(id); + + if (!row) return null; + + return { + id: row.id, + provider: row.provider, + model: row.model, + connectionId: row.connection_id, + timestamp: new Date(row.timestamp).toISOString(), + status: row.status, + latency: JSON.parse(row.latency || '{}'), + tokens: JSON.parse(row.tokens || '{}'), + request: JSON.parse(row.request || '{}'), + providerRequest: JSON.parse(row.provider_request || '{}'), + providerResponse: JSON.parse(row.provider_response || '{}'), + response: JSON.parse(row.response || '{}') + }; +} diff --git a/src/lib/usageDb.js b/src/lib/usageDb.js index b658001..99546ce 100644 --- a/src/lib/usageDb.js +++ b/src/lib/usageDb.js @@ -511,3 +511,6 @@ export async function getUsageStats() { return stats; } + +// Re-export request details functions from new SQLite-based module +export { saveRequestDetail, getRequestDetails, getRequestDetailById } from "./requestDetailsDb.js"; diff --git a/src/shared/components/Drawer.js b/src/shared/components/Drawer.js new file mode 100644 index 0000000..b1f095d --- /dev/null +++ b/src/shared/components/Drawer.js @@ -0,0 +1,89 @@ +"use client"; + +import { useEffect } from "react"; +import { cn } from "@/shared/utils/cn"; + +export default function Drawer({ + isOpen, + onClose, + title, + children, + width = "md", + className +}) { + const widths = { + sm: "w-[400px]", + md: "w-[500px]", + lg: "w-[600px]", + xl: "w-[800px]", + full: "w-full", + }; + + // Lock body scroll when drawer is open + useEffect(() => { + if (isOpen) { + document.body.style.overflow = "hidden"; + } else { + document.body.style.overflow = ""; + } + return () => { + document.body.style.overflow = ""; + }; + }, [isOpen]); + + // Handle escape key + useEffect(() => { + const handleEscape = (e) => { + if (e.key === "Escape" && isOpen) { + onClose(); + } + }; + document.addEventListener("keydown", handleEscape); + return () => document.removeEventListener("keydown", handleEscape); + }, [isOpen, onClose]); + + if (!isOpen) return null; + + return ( +
+ {/* Overlay */} + + ); +} diff --git a/src/shared/components/Pagination.js b/src/shared/components/Pagination.js new file mode 100644 index 0000000..5c4f908 --- /dev/null +++ b/src/shared/components/Pagination.js @@ -0,0 +1,146 @@ +"use client"; + +import { cn } from "@/shared/utils/cn"; +import Button from "./Button"; + +export default function Pagination({ + currentPage, + pageSize, + totalItems, + onPageChange, + onPageSizeChange, + className, +}) { + const totalPages = Math.ceil(totalItems / pageSize); + const startItem = totalItems > 0 ? (currentPage - 1) * pageSize + 1 : 0; + const endItem = Math.min(currentPage * pageSize, totalItems); + + const getPageNumbers = () => { + const pages = []; + const showMax = 5; + + let start = Math.max(1, currentPage - 2); + let end = Math.min(totalPages, start + showMax - 1); + + if (end - start + 1 < showMax) { + start = Math.max(1, end - showMax + 1); + } + + for (let i = start; i <= end; i++) { + pages.push(i); + } + return pages; + }; + + const pageNumbers = getPageNumbers(); + + return ( +
+ {/* Info text */} + {totalItems > 0 && ( +
+ Showing {startItem} to{" "} + {endItem} of{" "} + {totalItems} results +
+ )} + +
+ {/* Page size selector */} + {onPageSizeChange && ( +
+ Rows: + +
+ )} + + {totalPages > 1 && ( +
+ + + {pageNumbers[0] > 1 && ( + <> + + {pageNumbers[0] > 2 && ( + ... + )} + + )} + + {pageNumbers.map((page) => ( + + ))} + + {pageNumbers[pageNumbers.length - 1] < totalPages && ( + <> + {pageNumbers[pageNumbers.length - 1] < totalPages - 1 && ( + ... + )} + + + )} + + +
+ )} +
+
+ ); +}