9router/open-sse/utils/stream.js
Blade 85b7a0b136
Feature/ai observability dashboard (#79)
* feat: add AI request details feature with latency tracking

Add comprehensive request history and debugging capability to the Usage dashboard:

**Storage Layer** (usageDb.js):
- Add saveRequestDetail() for storing full request/response details
- Implement FIFO queue with 1000-record limit in request-details.json
- Auto-sanitize sensitive headers (authorization, api-key, cookie, token)
- Add getRequestDetails() with pagination and filtering support
- Add getRequestDetailById() for single record lookup

**Pipeline Integration** (chatCore.js):
- Track request start time and calculate total latency
- Record TTFT (Time To First Token) and total latency for all requests
- Capture full request details (messages, model, parameters)
- Save response content for non-streaming, mark streaming responses
- Handle error cases with detailed error information
- Async non-blocking saves to avoid impacting request performance

**API Layer** (/api/usage/request-details):
- GET endpoint with pagination (page, pageSize: 1-100)
- Filter by provider, model, connectionId, status, date range
- Returns { details: [...], pagination: {...} } format

**UI Components**:
- Drawer.js: Right slide-out panel with backdrop blur and ESC close
- Pagination.js: Full pagination with page size selector (10/20/50)
- RequestDetailsTab.js: Complete table view with filters and detail drawer

**Dashboard Integration**:
- Add "Details" tab to Usage page (4th tab after Overview/Logger/Limits)
- Table columns: Timestamp, Model, Provider, Input Tokens, Output Tokens, Latency (TTFT/Total), Action
- Provider filter dropdown (9 providers supported)
- Date range filters (start/end datetime)
- Click "Detail" button to view full request/response JSON in slide-out drawer

**Features**:
- Real-time latency monitoring (TTFT & Total)
- Complete request/response inspection for debugging
- Filterable and searchable request history
- Responsive design with mobile-friendly filters
- Data security with automatic header sanitization
- Performance: async saves don't block request pipeline

**Files Created/Modified**:
- src/lib/usageDb.js (modified)
- open-sse/handlers/chatCore.js (modified)
- src/app/api/usage/request-details/route.js (new)
- src/shared/components/Drawer.js (new)
- src/shared/components/Pagination.js (new)
- src/app/(dashboard)/dashboard/usage/components/RequestDetailsTab.js (new)
- src/app/(dashboard)/dashboard/usage/page.js (modified)

Closes: AI Observability Dashboard feature

* feat: enhance request details with full config and streaming content capture

Improve Request Details feature to capture comprehensive request parameters
and actual streaming response content:

**Request Configuration Enhancement** (chatCore.js):
- Add extractRequestConfig() helper function to capture all request parameters
- Include temperature controls: temperature, top_p, top_k
- Include token limits: max_tokens, max_completion_tokens
- Include thinking/reasoning modes: thinking, reasoning, enable_thinking
- Include OpenAI parameters: presence_penalty, frequency_penalty, seed, stop,
  tools, tool_choice, response_format, n, logprobs, top_logprobs, logit_bias,
  user, parallel_tool_calls, prediction, store, metadata
- Apply to all request types: non-streaming, streaming, and error cases

**Streaming Content Capture** (chatCore.js & stream.js):
- Add onStreamComplete callback mechanism to stream processors
- Accumulate content from all formats: OpenAI, Claude, Gemini
- Track content from delta.content, delta.reasoning_content, delta.text,
  delta.thinking, and Gemini content.parts
- Save initial record with "[Streaming in progress...]" marker
- Update record with actual content when stream completes
- Include usage tokens when available from stream

**Files Modified**:
- open-sse/handlers/chatCore.js - extractRequestConfig() + streaming capture
- open-sse/utils/stream.js - onStreamComplete callback + content accumulation

**Benefits**:
- View complete request configuration in Request Details (thinking mode, etc.)
- See actual streaming response content instead of placeholder
- Better debugging and observability for AI requests

Refs: #request-details-enhancement

* feat: separate thinking/reasoning content from response content

Improve Request Details to display thinking process separately from final response:

**Backend Changes**:
- stream.js: Capture content and thinking separately in streaming mode
  - Add accumulatedThinking variable alongside accumulatedContent
  - Route delta.content to content, delta.reasoning_content to thinking
  - Support OpenAI (reasoning_content), Claude (thinking), Gemini (part.thought)
  - Update onStreamComplete callback to return { content, thinking } object

- chatCore.js: Update response structure to include thinking field
  - Non-streaming: Extract thinking from reasoning_content field
  - Streaming: Receive { content, thinking } from stream callback
  - Error responses: Include thinking: null
  - Initial streaming save: Include thinking: null

**Frontend Changes**:
- RequestDetailsTab.js: Display thinking and content in separate sections
  - Add amber/yellow themed "Thinking Process" section with psychology icon
  - Show "Final Response" label when thinking is present
  - Use distinct visual styling for thinking (amber bg) vs content (gray bg)
  - Only show thinking section when thinking content exists

**Benefits**:
- Users can clearly see model's reasoning process vs final answer
- Better debugging for models with thinking capabilities (Claude, o1, etc.)
- Visual distinction makes it easy to identify thinking vs response

Refs: #thinking-content-separation

* fix: map Claude thinking to reasoning_content field

Fix Claude thinking content to be properly captured as reasoning_content
instead of regular content, enabling separate display in Request Details:

**Changes**:
- claude-to-openai.js: Use reasoning_content field for thinking blocks
  - thinking start: send { reasoning_content: "" } instead of { content: "```\n```" }
  - thinking delta: map to reasoning_content instead of content
  - thinking stop: send { reasoning_content: "" } instead of { content: "```\n```" }

**Why This Matters**:
- Previously Claude thinking was sent as `content` field, mixed with actual response
- Now thinking uses `reasoning_content` field, matching OpenAI's o1 format
- stream.js can now properly route thinking to accumulatedThinking variable
- Request Details UI will show Claude thinking in separate "Thinking Process" section

**Supported Thinking Formats**:
- OpenAI: delta.reasoning_content → thinking
- Claude: delta.thinking → reasoning_content (now fixed)
- Gemini: part.thought === true → thinking

Refs: #claude-thinking-fix

* feat(observability): capture and display full 4-layer request chain

Capture complete request/response chain in AI Request Details:
- Add providerRequest field (translated request sent to provider)
- Add providerResponse field (raw provider response, streaming indicator)
- Update chatCore.js at all 5 saveRequestDetail() call sites
- Reorganize UI into 4 collapsible sections with Material icons
- Preserve backward compatibility for old records
- Add distinct styling for streaming indicator

* fix(observability): resolve React duplicate key warning in request details table

- Use composite key (detail.id + index) to ensure unique keys
- Prevents React warnings when database contains duplicate IDs from old ID generation

* fix(observability): display actual content in streaming request details

Change providerResponse field for streaming requests from placeholder
"[Streaming - raw response not captured]" to actual final content.

This improves debugging experience by showing the real AI response
in the "Provider Response (Raw)" section instead of a confusing
placeholder message.

Files changed:
- open-sse/handlers/chatCore.js: Save contentObj.content to providerResponse
- src/app/.../RequestDetailsTab.js: Remove special handling for placeholder

* refactor(observability): migrate request details to SQLite for improved concurrency

- Replace LowDB JSON storage with better-sqlite3
- Enable WAL mode for true concurrent read/write support
- Add 5 indexes to accelerate queries (timestamp, provider, model, connection_id, status)
- Perform pagination at the database level to reduce memory footprint
- Maintain 1000 record limit with automatic cleanup of old data
- Ensure API compatibility via re-exports, requiring no caller changes

Performance improvements:
- Concurrent Writes: Lock-free WAL mode prevents data contention
- Query Efficiency: Index-based searches replace full dataset loading
- Data Integrity: Atomic operations prevent file corruption

* fix(observability): resolve pagination statistics display issues

- Fix issue where totalItems=0 showed 'Showing 1 to 0 of 0 results'
- Hide pagination controls when totalItems=0 or totalPages<=1
- Standardize API response fields: pagination.total -> pagination.totalItems

Before: Incorrect stats shown for empty data, and pager visible even for single-page results
After: Stats hidden for empty data, pager hidden when navigation is unnecessary

* feat(observability): display friendly provider names in request details

- Add /api/usage/providers endpoint to dynamically fetch provider list with names
- Replace hardcoded provider options with dynamic loading from database
- Display friendly provider names instead of IDs in both table and detail drawer
- Support custom provider nodes (e.g., OpenAI-compatible) with user-defined names
- Add provider name caching to optimize performance

* fix(observability): use INSERT OR REPLACE for request details to handle streaming updates

* fix(observability): resolve zero-token display issue by ensuring streaming usage capture and fixing key mismatch

* fix(observability): separate TTFT and total latency calculation for streaming requests

* feat(observability): implement SQLite write queue and JSON size limits

- Added in-memory buffer and batch writing for SQLite to prevent lock contention
- Implemented  with configurable 1MB limit to prevent DB bloat
- Added dashboard UI for observability performance and data management settings
- Integrated graceful shutdown handlers to prevent data loss

* fix(observability): resolve ReferenceError by declaring dbInstance
2026-02-09 10:30:42 +07:00

354 lines
13 KiB
JavaScript

import { translateResponse, initState } from "../translator/index.js";
import { FORMATS } from "../translator/formats.js";
import { trackPendingRequest, appendRequestLog } from "@/lib/usageDb.js";
import { extractUsage, hasValidUsage, estimateUsage, logUsage, addBufferToUsage, filterUsageForFormat, COLORS } from "./usageTracking.js";
import { parseSSELine, hasValuableContent, fixInvalidId, formatSSE } from "./streamHelpers.js";
export { COLORS, formatSSE };
const sharedDecoder = new TextDecoder();
const sharedEncoder = new TextEncoder();
/**
* Stream modes
*/
const STREAM_MODE = {
TRANSLATE: "translate", // Full translation between formats
PASSTHROUGH: "passthrough" // No translation, normalize output, extract usage
};
/**
* Create unified SSE transform stream
* @param {object} options
* @param {string} options.mode - Stream mode: translate, passthrough
* @param {string} options.targetFormat - Provider format (for translate mode)
* @param {string} options.sourceFormat - Client format (for translate mode)
* @param {string} options.provider - Provider name
* @param {object} options.reqLogger - Request logger instance
* @param {string} options.model - Model name
* @param {string} options.connectionId - Connection ID for usage tracking
* @param {object} options.body - Request body (for input token estimation)
* @param {function} options.onStreamComplete - Callback when stream completes (content, usage)
*/
export function createSSEStream(options = {}) {
const {
mode = STREAM_MODE.TRANSLATE,
targetFormat,
sourceFormat,
provider = null,
reqLogger = null,
toolNameMap = null,
model = null,
connectionId = null,
body = null,
onStreamComplete = null
} = options;
let buffer = "";
let usage = null;
const state = mode === STREAM_MODE.TRANSLATE ? { ...initState(sourceFormat), provider, toolNameMap } : null;
let totalContentLength = 0;
let accumulatedContent = "";
let accumulatedThinking = "";
let ttftAt = null;
return new TransformStream({
transform(chunk, controller) {
if (!ttftAt) {
ttftAt = Date.now();
}
const text = sharedDecoder.decode(chunk, { stream: true });
buffer += text;
reqLogger?.appendProviderChunk?.(text);
const lines = buffer.split("\n");
buffer = lines.pop() || "";
for (const line of lines) {
const trimmed = line.trim();
// Passthrough mode: normalize and forward
if (mode === STREAM_MODE.PASSTHROUGH) {
let output;
let injectedUsage = false;
if (trimmed.startsWith("data:") && trimmed.slice(5).trim() !== "[DONE]") {
try {
const parsed = JSON.parse(trimmed.slice(5).trim());
const idFixed = fixInvalidId(parsed);
if (!hasValuableContent(parsed, FORMATS.OPENAI)) {
continue;
}
const delta = parsed.choices?.[0]?.delta;
const content = delta?.content;
const reasoning = delta?.reasoning_content;
if (content && typeof content === "string") {
totalContentLength += content.length;
accumulatedContent += content;
}
if (reasoning && typeof reasoning === "string") {
totalContentLength += reasoning.length;
accumulatedThinking += reasoning;
}
const extracted = extractUsage(parsed);
if (extracted) {
usage = extracted;
}
const isFinishChunk = parsed.choices?.[0]?.finish_reason;
if (isFinishChunk && !hasValidUsage(parsed.usage)) {
const estimated = estimateUsage(body, totalContentLength, FORMATS.OPENAI);
parsed.usage = filterUsageForFormat(estimated, FORMATS.OPENAI);
output = `data: ${JSON.stringify(parsed)}\n`;
usage = estimated;
injectedUsage = true;
} else if (isFinishChunk && usage) {
const buffered = addBufferToUsage(usage);
parsed.usage = filterUsageForFormat(buffered, FORMATS.OPENAI);
output = `data: ${JSON.stringify(parsed)}\n`;
injectedUsage = true;
} else if (idFixed) {
output = `data: ${JSON.stringify(parsed)}\n`;
injectedUsage = true;
}
} catch { }
}
if (!injectedUsage) {
if (line.startsWith("data:") && !line.startsWith("data: ")) {
output = "data: " + line.slice(5) + "\n";
} else {
output = line + "\n";
}
}
reqLogger?.appendConvertedChunk?.(output);
controller.enqueue(sharedEncoder.encode(output));
continue;
}
// Translate mode
if (!trimmed) continue;
const parsed = parseSSELine(trimmed);
if (!parsed) continue;
if (parsed && parsed.done) {
const output = "data: [DONE]\n\n";
reqLogger?.appendConvertedChunk?.(output);
controller.enqueue(sharedEncoder.encode(output));
continue;
}
// Claude format - content
if (parsed.delta?.text) {
totalContentLength += parsed.delta.text.length;
accumulatedContent += parsed.delta.text;
}
// Claude format - thinking
if (parsed.delta?.thinking) {
totalContentLength += parsed.delta.thinking.length;
accumulatedThinking += parsed.delta.thinking;
}
// OpenAI format - content
if (parsed.choices?.[0]?.delta?.content) {
totalContentLength += parsed.choices[0].delta.content.length;
accumulatedContent += parsed.choices[0].delta.content;
}
// OpenAI format - reasoning
if (parsed.choices?.[0]?.delta?.reasoning_content) {
totalContentLength += parsed.choices[0].delta.reasoning_content.length;
accumulatedThinking += parsed.choices[0].delta.reasoning_content;
}
// Gemini format
if (parsed.candidates?.[0]?.content?.parts) {
for (const part of parsed.candidates[0].content.parts) {
if (part.text && typeof part.text === "string") {
totalContentLength += part.text.length;
// Check if this is thinking content
if (part.thought === true) {
accumulatedThinking += part.text;
} else {
accumulatedContent += part.text;
}
}
}
}
// Extract usage
const extracted = extractUsage(parsed);
if (extracted) state.usage = extracted; // Keep original usage for logging
// Translate: targetFormat -> openai -> sourceFormat
const translated = translateResponse(targetFormat, sourceFormat, parsed, state);
// Log OpenAI intermediate chunks (if available)
if (translated?._openaiIntermediate) {
for (const item of translated._openaiIntermediate) {
const openaiOutput = formatSSE(item, FORMATS.OPENAI);
reqLogger?.appendOpenAIChunk?.(openaiOutput);
}
}
if (translated?.length > 0) {
for (const item of translated) {
// Filter empty chunks
if (!hasValuableContent(item, sourceFormat)) {
continue; // Skip this empty chunk
}
// Inject estimated usage if finish chunk has no valid usage
const isFinishChunk = item.type === "message_delta" || item.choices?.[0]?.finish_reason;
if (state.finishReason && isFinishChunk && !hasValidUsage(item.usage) && totalContentLength > 0) {
const estimated = estimateUsage(body, totalContentLength, sourceFormat);
item.usage = filterUsageForFormat(estimated, sourceFormat); // Filter + already has buffer
state.usage = estimated;
} else if (state.finishReason && isFinishChunk && state.usage) {
// Add buffer and filter usage for client (but keep original in state.usage for logging)
const buffered = addBufferToUsage(state.usage);
item.usage = filterUsageForFormat(buffered, sourceFormat);
}
const output = formatSSE(item, sourceFormat);
reqLogger?.appendConvertedChunk?.(output);
controller.enqueue(sharedEncoder.encode(output));
}
}
}
},
flush(controller) {
trackPendingRequest(model, provider, connectionId, false);
try {
const remaining = sharedDecoder.decode();
if (remaining) buffer += remaining;
if (mode === STREAM_MODE.PASSTHROUGH) {
if (buffer) {
let output = buffer;
if (buffer.startsWith("data:") && !buffer.startsWith("data: ")) {
output = "data: " + buffer.slice(5);
}
reqLogger?.appendConvertedChunk?.(output);
controller.enqueue(sharedEncoder.encode(output));
}
if (!hasValidUsage(usage) && totalContentLength > 0) {
usage = estimateUsage(body, totalContentLength, FORMATS.OPENAI);
}
if (hasValidUsage(usage)) {
logUsage(provider, usage, model, connectionId);
} else {
appendRequestLog({ model, provider, connectionId, tokens: null, status: "200 OK" }).catch(() => { });
}
if (onStreamComplete) {
onStreamComplete({
content: accumulatedContent,
thinking: accumulatedThinking
}, usage, ttftAt);
}
return;
}
if (buffer.trim()) {
const parsed = parseSSELine(buffer.trim());
if (parsed && !parsed.done) {
const translated = translateResponse(targetFormat, sourceFormat, parsed, state);
if (translated?._openaiIntermediate) {
for (const item of translated._openaiIntermediate) {
const openaiOutput = formatSSE(item, FORMATS.OPENAI);
reqLogger?.appendOpenAIChunk?.(openaiOutput);
}
}
if (translated?.length > 0) {
for (const item of translated) {
const output = formatSSE(item, sourceFormat);
reqLogger?.appendConvertedChunk?.(output);
controller.enqueue(sharedEncoder.encode(output));
}
}
}
}
const flushed = translateResponse(targetFormat, sourceFormat, null, state);
if (flushed?._openaiIntermediate) {
for (const item of flushed._openaiIntermediate) {
const openaiOutput = formatSSE(item, FORMATS.OPENAI);
reqLogger?.appendOpenAIChunk?.(openaiOutput);
}
}
if (flushed?.length > 0) {
for (const item of flushed) {
const output = formatSSE(item, sourceFormat);
reqLogger?.appendConvertedChunk?.(output);
controller.enqueue(sharedEncoder.encode(output));
}
}
const doneOutput = "data: [DONE]\n\n";
reqLogger?.appendConvertedChunk?.(doneOutput);
controller.enqueue(sharedEncoder.encode(doneOutput));
if (!hasValidUsage(state?.usage) && totalContentLength > 0) {
state.usage = estimateUsage(body, totalContentLength, sourceFormat);
}
if (hasValidUsage(state?.usage)) {
logUsage(state.provider || targetFormat, state.usage, model, connectionId);
} else {
appendRequestLog({ model, provider, connectionId, tokens: null, status: "200 OK" }).catch(() => { });
}
if (onStreamComplete) {
onStreamComplete({
content: accumulatedContent,
thinking: accumulatedThinking
}, state?.usage, ttftAt);
}
} catch (error) {
console.log("Error in flush:", error);
}
}
});
}
export function createSSETransformStreamWithLogger(targetFormat, sourceFormat, provider = null, reqLogger = null, toolNameMap = null, model = null, connectionId = null, body = null, onStreamComplete = null) {
return createSSEStream({
mode: STREAM_MODE.TRANSLATE,
targetFormat,
sourceFormat,
provider,
reqLogger,
toolNameMap,
model,
connectionId,
body,
onStreamComplete
});
}
export function createPassthroughStreamWithLogger(provider = null, reqLogger = null, model = null, connectionId = null, body = null, onStreamComplete = null) {
return createSSEStream({
mode: STREAM_MODE.PASSTHROUGH,
provider,
reqLogger,
model,
connectionId,
body,
onStreamComplete
});
}