From 5954b8f4eb9f5e4c20559fcbd9ea34beb06b16b1 Mon Sep 17 00:00:00 2001 From: decolua Date: Fri, 27 Feb 2026 11:15:12 +0700 Subject: [PATCH] - Refactor chatCore.js to streamline imports and remove unused functions. - Fix streaming /v1/responses --- open-sse/config/providerModels.js | 1 - open-sse/handlers/chatCore.js | 962 +----------------- .../handlers/chatCore/nonStreamingHandler.js | 194 ++++ open-sse/handlers/chatCore/requestDetail.js | 102 ++ .../handlers/chatCore/sseToJsonHandler.js | 161 +++ .../handlers/chatCore/streamingHandler.js | 97 ++ open-sse/handlers/responsesHandler.js | 5 +- package.json | 2 +- .../dashboard/providers/[id]/page.js | 69 +- .../api/providers/[id]/test-models/route.js | 111 ++ 10 files changed, 785 insertions(+), 919 deletions(-) create mode 100644 open-sse/handlers/chatCore/nonStreamingHandler.js create mode 100644 open-sse/handlers/chatCore/requestDetail.js create mode 100644 open-sse/handlers/chatCore/sseToJsonHandler.js create mode 100644 open-sse/handlers/chatCore/streamingHandler.js create mode 100644 src/app/api/providers/[id]/test-models/route.js diff --git a/open-sse/config/providerModels.js b/open-sse/config/providerModels.js index e9b843b..cb7cb30 100644 --- a/open-sse/config/providerModels.js +++ b/open-sse/config/providerModels.js @@ -113,7 +113,6 @@ export const PROVIDER_MODELS = { { id: "claude-4.6-opus-max", name: "Claude 4.6 Opus Max" }, { id: "claude-4.6-sonnet-medium-thinking", name: "Claude 4.6 Sonnet Medium Thinking" }, { id: "kimi-k2.5", name: "Kimi K2.5" }, - { id: "gemini-3.1-pro-preview", name: "Gemini 3.1 Pro Preview" }, { id: "gemini-3-flash-preview", name: "Gemini 3 Flash Preview" }, { id: "gpt-5.2", name: "GPT 5.2" }, { id: "gpt-5.3-codex", name: "GPT 5.3 Codex" }, diff --git a/open-sse/handlers/chatCore.js b/open-sse/handlers/chatCore.js index 6ccbbd1..83e10fe 100644 --- a/open-sse/handlers/chatCore.js +++ b/open-sse/handlers/chatCore.js @@ -1,484 +1,93 @@ import { detectFormat, getTargetFormat } from "../services/provider.js"; -import { translateRequest, needsTranslation } from "../translator/index.js"; +import { translateRequest } from "../translator/index.js"; import { FORMATS } from "../translator/formats.js"; -import { createSSETransformStreamWithLogger, createPassthroughStreamWithLogger, COLORS } from "../utils/stream.js"; -import { createStreamController, pipeWithDisconnect } from "../utils/streamHandler.js"; -import { addBufferToUsage, filterUsageForFormat } from "../utils/usageTracking.js"; +import { COLORS } from "../utils/stream.js"; +import { createStreamController } from "../utils/streamHandler.js"; import { refreshWithRetry } from "../services/tokenRefresh.js"; import { createRequestLogger } from "../utils/requestLogger.js"; import { getModelTargetFormat, PROVIDER_ID_TO_ALIAS } from "../config/providerModels.js"; import { createErrorResult, parseUpstreamError, formatProviderError } from "../utils/error.js"; import { HTTP_STATUS } from "../config/constants.js"; import { handleBypassRequest } from "../utils/bypassHandler.js"; -import { saveRequestUsage, trackPendingRequest, appendRequestLog, saveRequestDetail } from "@/lib/usageDb.js"; +import { trackPendingRequest, appendRequestLog, saveRequestDetail } from "@/lib/usageDb.js"; import { getExecutor } from "../executors/index.js"; -import { convertResponsesStreamToJson } from "../transformer/streamToJsonConverter.js"; - -/** - * Translate non-streaming response to OpenAI format - * Handles different provider response formats (Gemini, Claude, etc.) - */ -function translateNonStreamingResponse(responseBody, targetFormat, sourceFormat) { - // If already in source format (usually OpenAI), return as-is - if (targetFormat === sourceFormat || targetFormat === FORMATS.OPENAI) { - return responseBody; - } - - // Handle Gemini/Antigravity format - if (targetFormat === FORMATS.GEMINI || targetFormat === FORMATS.ANTIGRAVITY || targetFormat === FORMATS.GEMINI_CLI) { - const response = responseBody.response || responseBody; - if (!response?.candidates?.[0]) { - return responseBody; // Can't translate, return raw - } - - const candidate = response.candidates[0]; - const content = candidate.content; - const usage = response.usageMetadata || responseBody.usageMetadata; - - // Build message content - let textContent = ""; - const toolCalls = []; - let reasoningContent = ""; - - if (content?.parts) { - for (const part of content.parts) { - // Handle thinking/reasoning - if (part.thought === true && part.text) { - reasoningContent += part.text; - } - // Regular text - else if (part.text !== undefined) { - textContent += part.text; - } - // Function calls - if (part.functionCall) { - toolCalls.push({ - id: `call_${part.functionCall.name}_${Date.now()}_${toolCalls.length}`, - type: "function", - function: { - name: part.functionCall.name, - arguments: JSON.stringify(part.functionCall.args || {}) - } - }); - } - } - } - - // Build OpenAI format message - const message = { role: "assistant" }; - if (textContent) { - message.content = textContent; - } - if (reasoningContent) { - message.reasoning_content = reasoningContent; - } - if (toolCalls.length > 0) { - message.tool_calls = toolCalls; - } - // If no content at all, set content to empty string - if (!message.content && !message.tool_calls) { - message.content = ""; - } - - // Determine finish reason - let finishReason = (candidate.finishReason || "stop").toLowerCase(); - if (finishReason === "stop" && toolCalls.length > 0) { - finishReason = "tool_calls"; - } - - const result = { - id: `chatcmpl-${response.responseId || Date.now()}`, - object: "chat.completion", - created: Math.floor(new Date(response.createTime || Date.now()).getTime() / 1000), - model: response.modelVersion || "gemini", - choices: [{ - index: 0, - message, - finish_reason: finishReason - }] - }; - - // Add usage if available (match streaming translator: add thoughtsTokenCount to prompt_tokens) - if (usage) { - result.usage = { - prompt_tokens: (usage.promptTokenCount || 0) + (usage.thoughtsTokenCount || 0), - completion_tokens: usage.candidatesTokenCount || 0, - total_tokens: usage.totalTokenCount || 0 - }; - if (usage.thoughtsTokenCount > 0) { - result.usage.completion_tokens_details = { - reasoning_tokens: usage.thoughtsTokenCount - }; - } - } - - return result; - } - - // Handle Claude format - if (targetFormat === FORMATS.CLAUDE) { - if (!responseBody.content) { - return responseBody; // Can't translate, return raw - } - - let textContent = ""; - let thinkingContent = ""; - const toolCalls = []; - - for (const block of responseBody.content) { - if (block.type === "text") { - textContent += block.text; - } else if (block.type === "thinking") { - thinkingContent += block.thinking || ""; - } else if (block.type === "tool_use") { - toolCalls.push({ - id: block.id, - type: "function", - function: { - name: block.name, - arguments: JSON.stringify(block.input || {}) - } - }); - } - } - - const message = { role: "assistant" }; - if (textContent) { - message.content = textContent; - } - if (thinkingContent) { - message.reasoning_content = thinkingContent; - } - if (toolCalls.length > 0) { - message.tool_calls = toolCalls; - } - if (!message.content && !message.tool_calls) { - message.content = ""; - } - - let finishReason = responseBody.stop_reason || "stop"; - if (finishReason === "end_turn") finishReason = "stop"; - if (finishReason === "tool_use") finishReason = "tool_calls"; - - const result = { - id: `chatcmpl-${responseBody.id || Date.now()}`, - object: "chat.completion", - created: Math.floor(Date.now() / 1000), - model: responseBody.model || "claude", - choices: [{ - index: 0, - message, - finish_reason: finishReason - }] - }; - - if (responseBody.usage) { - result.usage = { - prompt_tokens: responseBody.usage.input_tokens || 0, - completion_tokens: responseBody.usage.output_tokens || 0, - total_tokens: (responseBody.usage.input_tokens || 0) + (responseBody.usage.output_tokens || 0) - }; - } - - return result; - } - - // Unknown format, return as-is - return responseBody; -} - -/** - * Extract usage from non-streaming response body - * Handles different provider response formats - */ -function extractUsageFromResponse(responseBody, provider) { - if (!responseBody || typeof responseBody !== 'object') return null; - - // Claude format - check first to avoid conflict with OpenAI check - if (responseBody.usage && typeof responseBody.usage === 'object' && responseBody.usage.input_tokens !== undefined) { - return { - prompt_tokens: responseBody.usage.input_tokens || 0, - completion_tokens: responseBody.usage.output_tokens || 0, - cache_read_input_tokens: responseBody.usage.cache_read_input_tokens, - cache_creation_input_tokens: responseBody.usage.cache_creation_input_tokens - }; - } - - // OpenAI format - if (responseBody.usage && typeof responseBody.usage === 'object' && responseBody.usage.prompt_tokens !== undefined) { - return { - prompt_tokens: responseBody.usage.prompt_tokens || 0, - completion_tokens: responseBody.usage.completion_tokens || 0, - cached_tokens: responseBody.usage.prompt_tokens_details?.cached_tokens, - reasoning_tokens: responseBody.usage.completion_tokens_details?.reasoning_tokens - }; - } - - // Gemini format - if (responseBody.usageMetadata && typeof responseBody.usageMetadata === 'object') { - return { - prompt_tokens: responseBody.usageMetadata.promptTokenCount || 0, - completion_tokens: responseBody.usageMetadata.candidatesTokenCount || 0, - reasoning_tokens: responseBody.usageMetadata.thoughtsTokenCount - }; - } - - return null; -} - -/** - * Extract full request configuration from body - * Captures all relevant parameters for request details - */ -function extractRequestConfig(body, stream) { - const config = { - messages: body.messages || [], - model: body.model, - stream: stream - }; - - // Add all optional configuration parameters - const optionalParams = [ - 'temperature', 'top_p', 'top_k', - 'max_tokens', 'max_completion_tokens', - 'thinking', 'reasoning', 'enable_thinking', - 'presence_penalty', 'frequency_penalty', - 'seed', 'stop', 'tools', 'tool_choice', - 'response_format', 'prediction', 'store', 'metadata', - 'n', 'logprobs', 'top_logprobs', 'logit_bias', - 'user', 'parallel_tool_calls' - ]; - - for (const param of optionalParams) { - if (body[param] !== undefined) { - config[param] = body[param]; - } - } - - return config; -} - -/** - * Convert OpenAI-style SSE chunks into a single non-streaming JSON response. - * Used as a fallback when upstream returns text/event-stream for stream=false. - */ -function parseSSEToOpenAIResponse(rawSSE, fallbackModel) { - const lines = String(rawSSE || "").split("\n"); - const chunks = []; - - for (const line of lines) { - const trimmed = line.trim(); - if (!trimmed.startsWith("data:")) continue; - const payload = trimmed.slice(5).trim(); - if (!payload || payload === "[DONE]") continue; - try { - chunks.push(JSON.parse(payload)); - } catch { - // Ignore malformed SSE lines and continue best-effort parsing. - } - } - - if (chunks.length === 0) return null; - - const first = chunks[0]; - const contentParts = []; - const reasoningParts = []; - let finishReason = "stop"; - let usage = null; - - for (const chunk of chunks) { - const choice = chunk?.choices?.[0]; - const delta = choice?.delta || {}; - - if (typeof delta.content === "string" && delta.content.length > 0) { - contentParts.push(delta.content); - } - if (typeof delta.reasoning_content === "string" && delta.reasoning_content.length > 0) { - reasoningParts.push(delta.reasoning_content); - } - if (choice?.finish_reason) { - finishReason = choice.finish_reason; - } - if (chunk?.usage && typeof chunk.usage === "object") { - usage = chunk.usage; - } - } - - const message = { - role: "assistant", - content: contentParts.join("") - }; - if (reasoningParts.length > 0) { - message.reasoning_content = reasoningParts.join(""); - } - - const result = { - id: first.id || `chatcmpl-${Date.now()}`, - object: "chat.completion", - created: first.created || Math.floor(Date.now() / 1000), - model: first.model || fallbackModel || "unknown", - choices: [ - { - index: 0, - message, - finish_reason: finishReason - } - ] - }; - - if (usage) { - result.usage = usage; - } - - return result; -} +import { buildRequestDetail, extractRequestConfig } from "./chatCore/requestDetail.js"; +import { handleForcedSSEToJson } from "./chatCore/sseToJsonHandler.js"; +import { handleNonStreamingResponse } from "./chatCore/nonStreamingHandler.js"; +import { handleStreamingResponse, buildOnStreamComplete } from "./chatCore/streamingHandler.js"; /** * Core chat handler - shared between SSE and Worker - * Returns { success, response, status, error } for caller to handle fallback - * @param {object} options * @param {object} options.body - Request body * @param {object} options.modelInfo - { provider, model } * @param {object} options.credentials - Provider credentials - * @param {object} options.log - Logger instance (optional) - * @param {function} options.onCredentialsRefreshed - Callback when credentials are refreshed - * @param {function} options.onRequestSuccess - Callback when request succeeds (to clear error status) - * @param {function} options.onDisconnect - Callback when client disconnects - * @param {string} options.connectionId - Connection ID for usage tracking - * @param {string} options.apiKey - API key for usage tracking + * @param {string} options.sourceFormatOverride - Override detected source format (e.g. "openai-responses") */ -export async function handleChatCore({ body, modelInfo, credentials, log, onCredentialsRefreshed, onRequestSuccess, onDisconnect, clientRawRequest, connectionId, userAgent, apiKey }) { +export async function handleChatCore({ body, modelInfo, credentials, log, onCredentialsRefreshed, onRequestSuccess, onDisconnect, clientRawRequest, connectionId, userAgent, apiKey, sourceFormatOverride }) { const { provider, model } = modelInfo; const requestStartTime = Date.now(); - const sourceFormat = detectFormat(body); + const sourceFormat = sourceFormatOverride || detectFormat(body); - // Check for bypass patterns (warmup, skip) - return fake response + // Check for bypass patterns (warmup, skip) const bypassResponse = handleBypassRequest(body, model, userAgent); - if (bypassResponse) { - return bypassResponse; - } - - // Detect source format and get target format - // Model-specific targetFormat takes priority over provider default + if (bypassResponse) return bypassResponse; const alias = PROVIDER_ID_TO_ALIAS[provider] || provider; const modelTargetFormat = getModelTargetFormat(alias, model); const targetFormat = modelTargetFormat || getTargetFormat(provider); - // Track if client actually wants streaming (before we force it for providers) const clientRequestedStreaming = body.stream === true || sourceFormat === FORMATS.ANTIGRAVITY || sourceFormat === FORMATS.GEMINI || sourceFormat === FORMATS.GEMINI_CLI; - const providerRequiresStreaming = provider === 'openai' || provider === 'codex'; - - // Force streaming for OpenAI/Codex models (they don't support non-streaming mode properly) + const providerRequiresStreaming = provider === "openai" || provider === "codex"; const stream = providerRequiresStreaming ? true : (body.stream !== false); - // Create request logger for this session: sourceFormat_targetFormat_model const reqLogger = await createRequestLogger(sourceFormat, targetFormat, model); - - // 0. Log client raw request (before any conversion) - if (clientRawRequest) { - reqLogger.logClientRawRequest( - clientRawRequest.endpoint, - clientRawRequest.body, - clientRawRequest.headers - ); - } - - // 1. Log raw request from client + if (clientRawRequest) reqLogger.logClientRawRequest(clientRawRequest.endpoint, clientRawRequest.body, clientRawRequest.headers); reqLogger.logRawRequest(body); - log?.debug?.("FORMAT", `${sourceFormat} → ${targetFormat} | stream=${stream}`); - // Translate request (pass reqLogger for intermediate logging) - let translatedBody = body; - translatedBody = translateRequest(sourceFormat, targetFormat, model, body, stream, credentials, provider, reqLogger); - - // Extract toolNameMap for response translation (Claude OAuth) + let translatedBody = translateRequest(sourceFormat, targetFormat, model, body, stream, credentials, provider, reqLogger); const toolNameMap = translatedBody._toolNameMap; delete translatedBody._toolNameMap; - - // Update model in body translatedBody.model = model; - // Get executor for this provider const executor = getExecutor(provider); - - // Track pending request trackPendingRequest(model, provider, connectionId, true); + appendRequestLog({ model, provider, connectionId, status: "PENDING" }).catch(() => {}); - // Log start - appendRequestLog({ model, provider, connectionId, status: "PENDING" }).catch(() => { }); - - const msgCount = translatedBody.messages?.length - || translatedBody.input?.length - || translatedBody.contents?.length - || translatedBody.request?.contents?.length - || 0; + const msgCount = translatedBody.messages?.length || translatedBody.input?.length || translatedBody.contents?.length || translatedBody.request?.contents?.length || 0; log?.debug?.("REQUEST", `${provider.toUpperCase()} | ${model} | ${msgCount} msgs`); - // Create stream controller for disconnect detection - const streamController = createStreamController({ + const streamController = createStreamController({ onDisconnect: (reason) => { - // Track request finished (disconnected) trackPendingRequest(model, provider, connectionId, false); if (onDisconnect) onDisconnect(reason); }, - onError: (error) => { - // Track request finished (error/zombie) - trackPendingRequest(model, provider, connectionId, false); - }, - log, - provider, - model + onError: () => trackPendingRequest(model, provider, connectionId, false), + log, provider, model }); - // Execute request using executor (handles URL building, headers, fallback, transform) - let providerResponse; - let providerUrl; - let providerHeaders; - let finalBody; - + // Execute request + let providerResponse, providerUrl, providerHeaders, finalBody; try { - const result = await executor.execute({ - model, - body: translatedBody, - stream, - credentials, - signal: streamController.signal, - log - }); - + const result = await executor.execute({ model, body: translatedBody, stream, credentials, signal: streamController.signal, log }); providerResponse = result.response; providerUrl = result.url; providerHeaders = result.headers; finalBody = result.transformedBody; - - // Log target request (final request to provider) reqLogger.logTargetRequest(providerUrl, providerHeaders, finalBody); - } catch (error) { trackPendingRequest(model, provider, connectionId, false, true); - appendRequestLog({ model, provider, connectionId, status: `FAILED ${error.name === "AbortError" ? 499 : HTTP_STATUS.BAD_GATEWAY}` }).catch(() => { }); - - const errorDetail = { - provider: provider || "unknown", - model: model || "unknown", - connectionId: connectionId || undefined, - timestamp: new Date().toISOString(), + appendRequestLog({ model, provider, connectionId, status: `FAILED ${error.name === "AbortError" ? 499 : HTTP_STATUS.BAD_GATEWAY}` }).catch(() => {}); + saveRequestDetail(buildRequestDetail({ + provider, model, connectionId, latency: { ttft: 0, total: Date.now() - requestStartTime }, tokens: { prompt_tokens: 0, completion_tokens: 0 }, request: extractRequestConfig(body, stream), providerRequest: translatedBody || null, - providerResponse: null, - response: { - error: error.message || String(error), - status: error.name === "AbortError" ? 499 : 502, - thinking: null - }, + response: { error: error.message || String(error), status: error.name === "AbortError" ? 499 : 502, thinking: null }, status: "error" - }; - saveRequestDetail(errorDetail).catch(() => {}); + })).catch(() => {}); if (error.name === "AbortError") { streamController.handleError(error); @@ -489,530 +98,67 @@ export async function handleChatCore({ body, modelInfo, credentials, log, onCred return createErrorResult(HTTP_STATUS.BAD_GATEWAY, errMsg); } - // Handle 401/403 - try token refresh using executor + // Handle 401/403 - try token refresh if (providerResponse.status === HTTP_STATUS.UNAUTHORIZED || providerResponse.status === HTTP_STATUS.FORBIDDEN) { - const newCredentials = await refreshWithRetry( - () => executor.refreshCredentials(credentials, log), - 3, - log - ); - + const newCredentials = await refreshWithRetry(() => executor.refreshCredentials(credentials, log), 3, log); if (newCredentials?.accessToken || newCredentials?.copilotToken) { log?.info?.("TOKEN", `${provider.toUpperCase()} | refreshed`); - - // Update credentials Object.assign(credentials, newCredentials); - - // Notify caller about refreshed credentials - if (onCredentialsRefreshed && newCredentials) { - await onCredentialsRefreshed(newCredentials); - } - - // Retry with new credentials + if (onCredentialsRefreshed) await onCredentialsRefreshed(newCredentials); try { - const retryResult = await executor.execute({ - model, - body: translatedBody, - stream, - credentials, - signal: streamController.signal, - log - }); - - if (retryResult.response.ok) { - providerResponse = retryResult.response; - providerUrl = retryResult.url; - } - } catch (retryError) { - log?.warn?.("TOKEN", `${provider.toUpperCase()} | retry after refresh failed`); - } + const retryResult = await executor.execute({ model, body: translatedBody, stream, credentials, signal: streamController.signal, log }); + if (retryResult.response.ok) { providerResponse = retryResult.response; providerUrl = retryResult.url; } + } catch { log?.warn?.("TOKEN", `${provider.toUpperCase()} | retry after refresh failed`); } } else { log?.warn?.("TOKEN", `${provider.toUpperCase()} | refresh failed`); } } - // Check provider response - return error info for fallback handling + // Provider returned error if (!providerResponse.ok) { trackPendingRequest(model, provider, connectionId, false, true); const { statusCode, message, retryAfterMs } = await parseUpstreamError(providerResponse, provider); - appendRequestLog({ model, provider, connectionId, status: `FAILED ${statusCode}` }).catch(() => { }); - - const errorDetail = { - provider: provider || "unknown", - model: model || "unknown", - connectionId: connectionId || undefined, - timestamp: new Date().toISOString(), + appendRequestLog({ model, provider, connectionId, status: `FAILED ${statusCode}` }).catch(() => {}); + saveRequestDetail(buildRequestDetail({ + provider, model, connectionId, latency: { ttft: 0, total: Date.now() - requestStartTime }, tokens: { prompt_tokens: 0, completion_tokens: 0 }, request: extractRequestConfig(body, stream), providerRequest: finalBody || translatedBody || null, - providerResponse: null, - response: { - error: message, - status: statusCode, - thinking: null - }, + response: { error: message, status: statusCode, thinking: null }, status: "error" - }; - saveRequestDetail(errorDetail).catch(() => {}); + })).catch(() => {}); const errMsg = formatProviderError(new Error(message), provider, model, statusCode); console.log(`${COLORS.red}[ERROR] ${errMsg}${COLORS.reset}`); - - // Log Antigravity retry time if available if (retryAfterMs && provider === "antigravity") { - const retrySeconds = Math.ceil(retryAfterMs / 1000); - log?.debug?.("RETRY", `Antigravity quota reset in ${retrySeconds}s (${retryAfterMs}ms)`); + log?.debug?.("RETRY", `Antigravity quota reset in ${Math.ceil(retryAfterMs / 1000)}s`); } - - // Log error with full request body for debugging reqLogger.logError(new Error(message), finalBody || translatedBody); - return createErrorResult(statusCode, errMsg, retryAfterMs); } - // Provider forced streaming but client wants JSON - convert SSE to JSON + const sharedCtx = { provider, model, body, stream, translatedBody, finalBody, requestStartTime, connectionId, apiKey, clientRawRequest, onRequestSuccess }; + const appendLog = (extra) => appendRequestLog({ model, provider, connectionId, ...extra }).catch(() => {}); + const trackDone = () => trackPendingRequest(model, provider, connectionId, false); + + // Provider forced streaming but client wants JSON if (!clientRequestedStreaming && providerRequiresStreaming) { - trackPendingRequest(model, provider, connectionId, false); - const contentType = providerResponse.headers.get("content-type") || ""; - - // Treat as SSE if content-type says so OR if it's empty/missing - // (Codex API doesn't always set Content-Type on streaming responses) - const isSSEResponse = contentType.includes("text/event-stream") || (contentType === "" && provider === "codex"); - if (isSSEResponse) { - // Codex always returns Responses API SSE format regardless of client source format - const isCodexResponsesApi = provider === "codex" || sourceFormat === "openai-responses"; - - if (isCodexResponsesApi) { - // Responses API SSE → parse → translate to client format - try { - const jsonResponse = await convertResponsesStreamToJson(providerResponse.body); - - if (onRequestSuccess) await onRequestSuccess(); - - const usage = jsonResponse.usage || {}; - appendRequestLog({ model, provider, connectionId, tokens: usage, status: "200 OK" }).catch(() => { }); - - if (usage && typeof usage === 'object') { - const msg = `[${new Date().toLocaleTimeString("en-US", { hour12: false, hour: "2-digit", minute: "2-digit" })}] 📊 [USAGE] ${provider.toUpperCase()} | in=${usage?.input_tokens || 0} | out=${usage?.output_tokens || 0}${connectionId ? ` | account=${connectionId.slice(0, 8)}...` : ""}`; - console.log(`${COLORS.green}${msg}${COLORS.reset}`); - - saveRequestUsage({ - provider: provider || "unknown", - model: model || "unknown", - tokens: { prompt_tokens: usage.input_tokens || 0, completion_tokens: usage.output_tokens || 0 }, - timestamp: new Date().toISOString(), - connectionId: connectionId || undefined, - apiKey: apiKey || undefined - }).catch(() => { }); - } - - const msgItem = jsonResponse.output?.find(item => item.type === "message"); - const textContent = msgItem?.content?.find(c => c.type === "output_text")?.text || msgItem?.content?.[0]?.text || null; - console.log(`[DBG] codex status=${jsonResponse.status} output.len=${jsonResponse.output?.length} msgItem.type=${msgItem?.type} textLen=${textContent?.length||0}`); - const totalLatency = Date.now() - requestStartTime; - saveRequestDetail({ - provider: provider || "unknown", - model: model || "unknown", - connectionId: connectionId || undefined, - timestamp: new Date().toISOString(), - latency: { ttft: totalLatency, total: totalLatency }, - tokens: { prompt_tokens: usage.input_tokens || 0, completion_tokens: usage.output_tokens || 0 }, - request: extractRequestConfig(body, stream), - providerRequest: finalBody || translatedBody || null, - providerResponse: null, - response: { content: textContent, thinking: null, finish_reason: jsonResponse.status || "unknown" }, - status: "success", - endpoint: clientRawRequest?.endpoint || null - }).catch(() => { }); - - // If client is openai-responses → return as-is - if (sourceFormat === "openai-responses") { - return { - success: true, - response: new Response(JSON.stringify(jsonResponse), { - headers: { "Content-Type": "application/json", "Access-Control-Allow-Origin": "*" } - }) - }; - } - - // Translate Responses API JSON → OpenAI chat completion JSON - const openaiMsg = { - id: jsonResponse.id || `chatcmpl-${Date.now()}`, - object: "chat.completion", - created: jsonResponse.created_at || Math.floor(Date.now() / 1000), - model: jsonResponse.model || model, - choices: [{ - index: 0, - message: { role: "assistant", content: textContent || "" }, - finish_reason: jsonResponse.status === "completed" ? "stop" : (jsonResponse.status || "stop") - }], - usage: { - prompt_tokens: usage.input_tokens || 0, - completion_tokens: usage.output_tokens || 0, - total_tokens: (usage.input_tokens || 0) + (usage.output_tokens || 0) - } - }; - - // Build client-format response based on sourceFormat - let finalResp; - if (sourceFormat === FORMATS.ANTIGRAVITY || sourceFormat === FORMATS.GEMINI || sourceFormat === FORMATS.GEMINI_CLI) { - // Antigravity/Gemini non-streaming format - finalResp = { - response: { - candidates: [{ - content: { role: "model", parts: [{ text: textContent || "" }] }, - finishReason: "STOP", - index: 0 - }], - usageMetadata: { - promptTokenCount: usage.input_tokens || 0, - candidatesTokenCount: usage.output_tokens || 0, - totalTokenCount: (usage.input_tokens || 0) + (usage.output_tokens || 0) - }, - modelVersion: model, - responseId: jsonResponse.id || `resp_${Date.now()}` - } - }; - } else { - finalResp = openaiMsg; - } - - return { - success: true, - response: new Response(JSON.stringify(finalResp), { - headers: { "Content-Type": "application/json", "Access-Control-Allow-Origin": "*" } - }) - }; - } catch (error) { - console.error("[ChatCore] Responses API SSE→JSON conversion failed:", error); - return createErrorResult(HTTP_STATUS.BAD_GATEWAY, "Failed to convert streaming response to JSON"); - } - } else { - // Chat Completions SSE → Chat Completions JSON - try { - const sseText = await providerResponse.text(); - const parsed = parseSSEToOpenAIResponse(sseText, model); - if (parsed) { - if (onRequestSuccess) await onRequestSuccess(); - - const usage = parsed.usage || {}; - appendRequestLog({ model, provider, connectionId, tokens: usage, status: "200 OK" }).catch(() => { }); - - if (usage && typeof usage === "object") { - saveRequestUsage({ - provider: provider || "unknown", - model: model || "unknown", - tokens: usage, - timestamp: new Date().toISOString(), - connectionId: connectionId || undefined, - apiKey: apiKey || undefined, - endpoint: clientRawRequest?.endpoint || null - }).catch(() => { }); - } - - const totalLatency = Date.now() - requestStartTime; - saveRequestDetail({ - provider: provider || "unknown", - model: model || "unknown", - connectionId: connectionId || undefined, - timestamp: new Date().toISOString(), - latency: { ttft: totalLatency, total: totalLatency }, - tokens: usage, - request: extractRequestConfig(body, stream), - providerRequest: finalBody || translatedBody || null, - providerResponse: null, - response: { - content: parsed.choices?.[0]?.message?.content || null, - thinking: parsed.choices?.[0]?.message?.reasoning_content || null, - finish_reason: parsed.choices?.[0]?.finish_reason || "unknown" - }, - status: "success", - endpoint: clientRawRequest?.endpoint || null - }).catch(() => { }); - - return { - success: true, - response: new Response(JSON.stringify(parsed), { - headers: { - "Content-Type": "application/json", - "Access-Control-Allow-Origin": "*" - } - }) - }; - } - return createErrorResult(HTTP_STATUS.BAD_GATEWAY, "Invalid SSE response for non-streaming request"); - } catch (error) { - console.error("[ChatCore] Chat Completions SSE→JSON conversion failed:", error); - return createErrorResult(HTTP_STATUS.BAD_GATEWAY, "Failed to convert streaming response to JSON"); - } - } - } + const result = await handleForcedSSEToJson({ ...sharedCtx, providerResponse, sourceFormat, trackDone, appendLog }); + if (result) return result; } - // Non-streaming response + // True non-streaming response if (!stream) { - trackPendingRequest(model, provider, connectionId, false); - const contentType = providerResponse.headers.get("content-type") || ""; - let responseBody; - - if (contentType.includes("text/event-stream")) { - // Upstream returned SSE even though stream=false; convert best-effort to JSON. - const sseText = await providerResponse.text(); - const parsedFromSSE = parseSSEToOpenAIResponse(sseText, model); - if (!parsedFromSSE) { - appendRequestLog({ model, provider, connectionId, status: `FAILED ${HTTP_STATUS.BAD_GATEWAY}` }).catch(() => { }); - return createErrorResult(HTTP_STATUS.BAD_GATEWAY, "Invalid SSE response for non-streaming request"); - } - responseBody = parsedFromSSE; - } else { - try { - responseBody = await providerResponse.json(); - } catch (parseError) { - appendRequestLog({ model, provider, connectionId, status: `FAILED ${HTTP_STATUS.BAD_GATEWAY}` }).catch(() => { }); - console.error(`[ChatCore] Failed to parse JSON response from ${provider}:`, parseError.message); - return createErrorResult(HTTP_STATUS.BAD_GATEWAY, `Invalid JSON response from ${provider}`); - } - } - - // Log provider response (raw response from provider) - reqLogger.logProviderResponse( - providerResponse.status, - providerResponse.statusText, - providerResponse.headers, - responseBody - ); - - // Notify success - caller can clear error status if needed - if (onRequestSuccess) { - await onRequestSuccess(); - } - - // Log usage for non-streaming responses - const usage = extractUsageFromResponse(responseBody, provider); - appendRequestLog({ model, provider, connectionId, tokens: usage, status: "200 OK" }).catch(() => { }); - if (usage && typeof usage === 'object') { - const msg = `[${new Date().toLocaleTimeString("en-US", { hour12: false, hour: "2-digit", minute: "2-digit" })}] 📊 [USAGE] ${provider.toUpperCase()} | in=${usage?.prompt_tokens || 0} | out=${usage?.completion_tokens || 0}${connectionId ? ` | account=${connectionId.slice(0, 8)}...` : ""}`; - console.log(`${COLORS.green}${msg}${COLORS.reset}`); - - saveRequestUsage({ - provider: provider || "unknown", - model: model || "unknown", - tokens: usage, - timestamp: new Date().toISOString(), - connectionId: connectionId || undefined, - apiKey: apiKey || undefined, - endpoint: clientRawRequest?.endpoint || null - }).catch(err => { - console.error("Failed to save usage stats:", err.message); - }); - } - - // Translate response to client's expected format (usually OpenAI) - const translatedResponse = needsTranslation(targetFormat, sourceFormat) - ? translateNonStreamingResponse(responseBody, targetFormat, sourceFormat) - : responseBody; - - // Ensure OpenAI-required fields are present (needed for Letta and other strict clients) - if (!translatedResponse.object) translatedResponse.object = "chat.completion"; - if (!translatedResponse.created) translatedResponse.created = Math.floor(Date.now() / 1000); - - // Strip Azure-specific non-standard fields - if (translatedResponse.prompt_filter_results !== undefined) { - delete translatedResponse.prompt_filter_results; - } - if (translatedResponse?.choices) { - for (const choice of translatedResponse.choices) { - if (choice.content_filter_results !== undefined) { - delete choice.content_filter_results; - } - } - } - - // Add buffer and filter usage for client (to prevent CLI context errors) - if (translatedResponse?.usage) { - const buffered = addBufferToUsage(translatedResponse.usage); - translatedResponse.usage = filterUsageForFormat(buffered, sourceFormat); - } - - // Log converted response (final response to client) - reqLogger.logConvertedResponse(translatedResponse); - - const totalLatency = Date.now() - requestStartTime; - const requestDetail = { - provider: provider || "unknown", - model: model || "unknown", - connectionId: connectionId || undefined, - timestamp: new Date().toISOString(), - latency: { - ttft: totalLatency, - total: totalLatency - }, - tokens: usage || { prompt_tokens: 0, completion_tokens: 0 }, - request: extractRequestConfig(body, stream), - providerRequest: finalBody || translatedBody || null, - providerResponse: responseBody || null, - response: { - content: translatedResponse?.choices?.[0]?.message?.content || - translatedResponse?.content || - null, - thinking: translatedResponse?.choices?.[0]?.message?.reasoning_content || - translatedResponse?.reasoning_content || - null, - finish_reason: translatedResponse?.choices?.[0]?.finish_reason || "unknown" - }, - status: "success", - endpoint: clientRawRequest?.endpoint || null - }; - - // Async save (don't block response) - saveRequestDetail(requestDetail).catch(err => { - console.error("[RequestDetail] Failed to save:", err.message); - }); - - return { - success: true, - response: new Response(JSON.stringify(translatedResponse), { - headers: { - "Content-Type": "application/json", - "Access-Control-Allow-Origin": "*" - } - }) - }; + return handleNonStreamingResponse({ ...sharedCtx, providerResponse, sourceFormat, targetFormat, reqLogger, trackDone, appendLog }); } // Streaming response - - // Notify success - caller can clear error status if needed - if (onRequestSuccess) { - await onRequestSuccess(); - } - - const responseHeaders = { - "Content-Type": "text/event-stream", - "Cache-Control": "no-cache", - "Connection": "keep-alive", - "Access-Control-Allow-Origin": "*" - }; - - let streamContent = ""; - let streamUsage = null; - const streamDetailId = `${Date.now()}-${Math.random().toString(36).slice(2, 11)}`; - - const onStreamComplete = (contentObj, usage, ttftAt) => { - // contentObj is object { content, thinking } - streamUsage = usage; - - const updatedDetail = { - provider: provider || "unknown", - model: model || "unknown", - connectionId: connectionId || undefined, - timestamp: new Date().toISOString(), - latency: { - ttft: ttftAt ? ttftAt - requestStartTime : Date.now() - requestStartTime, - total: Date.now() - requestStartTime - }, - tokens: usage || { prompt_tokens: 0, completion_tokens: 0 }, - request: extractRequestConfig(body, stream), - providerRequest: finalBody || translatedBody || null, - providerResponse: contentObj.content || "[Empty streaming response]", - response: { - content: contentObj.content || "[Empty streaming response]", - thinking: contentObj.thinking || null, - type: "streaming" - }, - status: "success", - id: streamDetailId - }; - - saveRequestDetail(updatedDetail).catch(err => { - console.error("[RequestDetail] Failed to update streaming content:", err.message); - }); - - // Save usage stats for dashboard (skip if no real token data to avoid duplicates) - if (usage && typeof usage === 'object' && (usage.prompt_tokens > 0 || usage.completion_tokens > 0)) { - const msg = `[${new Date().toLocaleTimeString("en-US", { hour12: false, hour: "2-digit", minute: "2-digit" })}] 📊 [STREAM USAGE] ${provider.toUpperCase()} | in=${usage?.prompt_tokens || 0} | out=${usage?.completion_tokens || 0}${connectionId ? ` | account=${connectionId.slice(0, 8)}...` : ""}`; - console.log(`${COLORS.green}${msg}${COLORS.reset}`); - - saveRequestUsage({ - provider: provider || "unknown", - model: model || "unknown", - tokens: usage, - timestamp: new Date().toISOString(), - connectionId: connectionId || undefined, - apiKey: apiKey || undefined, - endpoint: clientRawRequest?.endpoint || null - }).catch(err => { - console.error("Failed to save streaming usage stats:", err.message); - }); - } - }; - - let transformStream; - const isDroidCLI = userAgent?.toLowerCase().includes('droid') || userAgent?.toLowerCase().includes('codex-cli'); - const needsCodexTranslation = provider === 'codex' - && targetFormat === 'openai-responses' - && !isDroidCLI; - - if (needsCodexTranslation) { - // Translate Codex (openai-responses) SSE → client's source format - // Claude → claude, Antigravity/Gemini → antigravity, others → openai - let codexTarget; - if (sourceFormat === FORMATS.CLAUDE) codexTarget = FORMATS.CLAUDE; - else if (sourceFormat === FORMATS.ANTIGRAVITY || sourceFormat === FORMATS.GEMINI || sourceFormat === FORMATS.GEMINI_CLI) codexTarget = FORMATS.ANTIGRAVITY; - else codexTarget = FORMATS.OPENAI; - log?.debug?.("STREAM", `Codex translation mode: openai-responses → ${codexTarget}`); - transformStream = createSSETransformStreamWithLogger('openai-responses', codexTarget, provider, reqLogger, toolNameMap, model, connectionId, body, onStreamComplete, apiKey); - } else if (needsTranslation(targetFormat, sourceFormat)) { - log?.debug?.("STREAM", `Translation mode: ${targetFormat} → ${sourceFormat}`); - transformStream = createSSETransformStreamWithLogger(targetFormat, sourceFormat, provider, reqLogger, toolNameMap, model, connectionId, body, onStreamComplete, apiKey); - } else { - log?.debug?.("STREAM", `Standard passthrough mode`); - transformStream = createPassthroughStreamWithLogger(provider, reqLogger, model, connectionId, body, onStreamComplete, apiKey); - } - - const transformedBody = pipeWithDisconnect(providerResponse, transformStream, streamController); - - const totalLatency = Date.now() - requestStartTime; - const streamingDetail = { - provider: provider || "unknown", - model: model || "unknown", - connectionId: connectionId || undefined, - timestamp: new Date().toISOString(), - latency: { - ttft: 0, - total: Date.now() - requestStartTime - }, - tokens: { prompt_tokens: 0, completion_tokens: 0 }, - request: extractRequestConfig(body, stream), - providerRequest: finalBody || translatedBody || null, - providerResponse: "[Streaming - raw response not captured]", - response: { - content: "[Streaming in progress...]", - thinking: null, - type: "streaming" - }, - status: "success", - id: streamDetailId - }; - - saveRequestDetail(streamingDetail).catch(err => { - console.error("[RequestDetail] Failed to save streaming request:", err.message); - }); - - return { - success: true, - response: new Response(transformedBody, { - headers: responseHeaders - }) - }; + const { onStreamComplete } = buildOnStreamComplete({ ...sharedCtx }); + return handleStreamingResponse({ ...sharedCtx, providerResponse, sourceFormat, targetFormat, userAgent, reqLogger, toolNameMap, streamController, onStreamComplete }); } -/** - * Check if token is expired or about to expire - */ export function isTokenExpiringSoon(expiresAt, bufferMs = 5 * 60 * 1000) { if (!expiresAt) return false; - const expiresAtMs = new Date(expiresAt).getTime(); - return expiresAtMs - Date.now() < bufferMs; + return new Date(expiresAt).getTime() - Date.now() < bufferMs; } diff --git a/open-sse/handlers/chatCore/nonStreamingHandler.js b/open-sse/handlers/chatCore/nonStreamingHandler.js new file mode 100644 index 0000000..190def6 --- /dev/null +++ b/open-sse/handlers/chatCore/nonStreamingHandler.js @@ -0,0 +1,194 @@ +import { FORMATS } from "../../translator/formats.js"; +import { needsTranslation } from "../../translator/index.js"; +import { addBufferToUsage, filterUsageForFormat } from "../../utils/usageTracking.js"; +import { createErrorResult } from "../../utils/error.js"; +import { HTTP_STATUS } from "../../config/constants.js"; +import { parseSSEToOpenAIResponse } from "./sseToJsonHandler.js"; +import { buildRequestDetail, extractRequestConfig, extractUsageFromResponse, saveUsageStats } from "./requestDetail.js"; +import { appendRequestLog, saveRequestDetail } from "@/lib/usageDb.js"; + +/** + * Translate non-streaming response body from provider format → OpenAI format. + */ +export function translateNonStreamingResponse(responseBody, targetFormat, sourceFormat) { + if (targetFormat === sourceFormat || targetFormat === FORMATS.OPENAI) return responseBody; + + // Gemini / Antigravity + if (targetFormat === FORMATS.GEMINI || targetFormat === FORMATS.ANTIGRAVITY || targetFormat === FORMATS.GEMINI_CLI) { + const response = responseBody.response || responseBody; + if (!response?.candidates?.[0]) return responseBody; + + const candidate = response.candidates[0]; + const content = candidate.content; + const usage = response.usageMetadata || responseBody.usageMetadata; + let textContent = "", reasoningContent = ""; + const toolCalls = []; + + if (content?.parts) { + for (const part of content.parts) { + if (part.thought === true && part.text) reasoningContent += part.text; + else if (part.text !== undefined) textContent += part.text; + if (part.functionCall) { + toolCalls.push({ + id: `call_${part.functionCall.name}_${Date.now()}_${toolCalls.length}`, + type: "function", + function: { name: part.functionCall.name, arguments: JSON.stringify(part.functionCall.args || {}) } + }); + } + } + } + + const message = { role: "assistant" }; + if (textContent) message.content = textContent; + if (reasoningContent) message.reasoning_content = reasoningContent; + if (toolCalls.length > 0) message.tool_calls = toolCalls; + if (!message.content && !message.tool_calls) message.content = ""; + + let finishReason = (candidate.finishReason || "stop").toLowerCase(); + if (finishReason === "stop" && toolCalls.length > 0) finishReason = "tool_calls"; + + const result = { + id: `chatcmpl-${response.responseId || Date.now()}`, + object: "chat.completion", + created: Math.floor(new Date(response.createTime || Date.now()).getTime() / 1000), + model: response.modelVersion || "gemini", + choices: [{ index: 0, message, finish_reason: finishReason }] + }; + + if (usage) { + result.usage = { + prompt_tokens: (usage.promptTokenCount || 0) + (usage.thoughtsTokenCount || 0), + completion_tokens: usage.candidatesTokenCount || 0, + total_tokens: usage.totalTokenCount || 0 + }; + if (usage.thoughtsTokenCount > 0) { + result.usage.completion_tokens_details = { reasoning_tokens: usage.thoughtsTokenCount }; + } + } + return result; + } + + // Claude + if (targetFormat === FORMATS.CLAUDE) { + if (!responseBody.content) return responseBody; + + let textContent = "", thinkingContent = ""; + const toolCalls = []; + + for (const block of responseBody.content) { + if (block.type === "text") textContent += block.text; + else if (block.type === "thinking") thinkingContent += block.thinking || ""; + else if (block.type === "tool_use") { + toolCalls.push({ id: block.id, type: "function", function: { name: block.name, arguments: JSON.stringify(block.input || {}) } }); + } + } + + const message = { role: "assistant" }; + if (textContent) message.content = textContent; + if (thinkingContent) message.reasoning_content = thinkingContent; + if (toolCalls.length > 0) message.tool_calls = toolCalls; + if (!message.content && !message.tool_calls) message.content = ""; + + let finishReason = responseBody.stop_reason || "stop"; + if (finishReason === "end_turn") finishReason = "stop"; + if (finishReason === "tool_use") finishReason = "tool_calls"; + + const result = { + id: `chatcmpl-${responseBody.id || Date.now()}`, + object: "chat.completion", + created: Math.floor(Date.now() / 1000), + model: responseBody.model || "claude", + choices: [{ index: 0, message, finish_reason: finishReason }] + }; + + if (responseBody.usage) { + result.usage = { + prompt_tokens: responseBody.usage.input_tokens || 0, + completion_tokens: responseBody.usage.output_tokens || 0, + total_tokens: (responseBody.usage.input_tokens || 0) + (responseBody.usage.output_tokens || 0) + }; + } + return result; + } + + return responseBody; +} + +/** + * Handle non-streaming response from provider. + */ +export async function handleNonStreamingResponse({ providerResponse, provider, model, sourceFormat, targetFormat, body, stream, translatedBody, finalBody, requestStartTime, connectionId, apiKey, clientRawRequest, onRequestSuccess, reqLogger, trackDone, appendLog }) { + trackDone(); + const contentType = providerResponse.headers.get("content-type") || ""; + let responseBody; + + if (contentType.includes("text/event-stream")) { + const sseText = await providerResponse.text(); + const parsed = parseSSEToOpenAIResponse(sseText, model); + if (!parsed) { + appendLog({ status: `FAILED ${HTTP_STATUS.BAD_GATEWAY}` }); + return createErrorResult(HTTP_STATUS.BAD_GATEWAY, "Invalid SSE response for non-streaming request"); + } + responseBody = parsed; + } else { + try { + responseBody = await providerResponse.json(); + } catch (err) { + appendLog({ status: `FAILED ${HTTP_STATUS.BAD_GATEWAY}` }); + console.error(`[ChatCore] Failed to parse JSON from ${provider}:`, err.message); + return createErrorResult(HTTP_STATUS.BAD_GATEWAY, `Invalid JSON response from ${provider}`); + } + } + + reqLogger.logProviderResponse(providerResponse.status, providerResponse.statusText, providerResponse.headers, responseBody); + if (onRequestSuccess) await onRequestSuccess(); + + const usage = extractUsageFromResponse(responseBody); + appendLog({ tokens: usage, status: "200 OK" }); + saveUsageStats({ provider, model, tokens: usage, connectionId, apiKey, endpoint: clientRawRequest?.endpoint }); + + const translatedResponse = needsTranslation(targetFormat, sourceFormat) + ? translateNonStreamingResponse(responseBody, targetFormat, sourceFormat) + : responseBody; + + // Ensure OpenAI-required fields + if (!translatedResponse.object) translatedResponse.object = "chat.completion"; + if (!translatedResponse.created) translatedResponse.created = Math.floor(Date.now() / 1000); + + // Strip Azure-specific fields + delete translatedResponse.prompt_filter_results; + if (translatedResponse?.choices) { + for (const choice of translatedResponse.choices) delete choice.content_filter_results; + } + + if (translatedResponse?.usage) { + translatedResponse.usage = filterUsageForFormat(addBufferToUsage(translatedResponse.usage), sourceFormat); + } + + reqLogger.logConvertedResponse(translatedResponse); + + const totalLatency = Date.now() - requestStartTime; + saveRequestDetail(buildRequestDetail({ + provider, model, connectionId, + latency: { ttft: totalLatency, total: totalLatency }, + tokens: usage || { prompt_tokens: 0, completion_tokens: 0 }, + request: extractRequestConfig(body, stream), + providerRequest: finalBody || translatedBody || null, + providerResponse: responseBody || null, + response: { + content: translatedResponse?.choices?.[0]?.message?.content || translatedResponse?.content || null, + thinking: translatedResponse?.choices?.[0]?.message?.reasoning_content || translatedResponse?.reasoning_content || null, + finish_reason: translatedResponse?.choices?.[0]?.finish_reason || "unknown" + }, + status: "success" + }, { endpoint: clientRawRequest?.endpoint || null })).catch(err => { + console.error("[RequestDetail] Failed to save:", err.message); + }); + + return { + success: true, + response: new Response(JSON.stringify(translatedResponse), { + headers: { "Content-Type": "application/json", "Access-Control-Allow-Origin": "*" } + }) + }; +} diff --git a/open-sse/handlers/chatCore/requestDetail.js b/open-sse/handlers/chatCore/requestDetail.js new file mode 100644 index 0000000..12db16f --- /dev/null +++ b/open-sse/handlers/chatCore/requestDetail.js @@ -0,0 +1,102 @@ +import { saveRequestUsage, appendRequestLog, saveRequestDetail } from "@/lib/usageDb.js"; +import { COLORS } from "../../utils/stream.js"; + +const OPTIONAL_PARAMS = [ + "temperature", "top_p", "top_k", + "max_tokens", "max_completion_tokens", + "thinking", "reasoning", "enable_thinking", + "presence_penalty", "frequency_penalty", + "seed", "stop", "tools", "tool_choice", + "response_format", "prediction", "store", "metadata", + "n", "logprobs", "top_logprobs", "logit_bias", + "user", "parallel_tool_calls" +]; + +export function extractRequestConfig(body, stream) { + const config = { messages: body.messages || [], model: body.model, stream }; + for (const param of OPTIONAL_PARAMS) { + if (body[param] !== undefined) config[param] = body[param]; + } + return config; +} + +export function extractUsageFromResponse(responseBody) { + if (!responseBody || typeof responseBody !== "object") return null; + + // Claude format + if (responseBody.usage?.input_tokens !== undefined) { + return { + prompt_tokens: responseBody.usage.input_tokens || 0, + completion_tokens: responseBody.usage.output_tokens || 0, + cache_read_input_tokens: responseBody.usage.cache_read_input_tokens, + cache_creation_input_tokens: responseBody.usage.cache_creation_input_tokens + }; + } + + // OpenAI format + if (responseBody.usage?.prompt_tokens !== undefined) { + return { + prompt_tokens: responseBody.usage.prompt_tokens || 0, + completion_tokens: responseBody.usage.completion_tokens || 0, + cached_tokens: responseBody.usage.prompt_tokens_details?.cached_tokens, + reasoning_tokens: responseBody.usage.completion_tokens_details?.reasoning_tokens + }; + } + + // Gemini format + if (responseBody.usageMetadata) { + return { + prompt_tokens: responseBody.usageMetadata.promptTokenCount || 0, + completion_tokens: responseBody.usageMetadata.candidatesTokenCount || 0, + reasoning_tokens: responseBody.usageMetadata.thoughtsTokenCount + }; + } + + return null; +} + +export function buildRequestDetail(base, overrides = {}) { + return { + provider: base.provider || "unknown", + model: base.model || "unknown", + connectionId: base.connectionId || undefined, + timestamp: new Date().toISOString(), + latency: base.latency || { ttft: 0, total: 0 }, + tokens: base.tokens || { prompt_tokens: 0, completion_tokens: 0 }, + request: base.request, + providerRequest: base.providerRequest || null, + providerResponse: base.providerResponse || null, + response: base.response || {}, + status: base.status || "success", + ...overrides + }; +} + +export function saveUsageStats({ provider, model, tokens, connectionId, apiKey, endpoint, label = "USAGE" }) { + if (!tokens || typeof tokens !== "object") return; + + const inTokens = tokens.input_tokens ?? tokens.prompt_tokens ?? 0; + const outTokens = tokens.output_tokens ?? tokens.completion_tokens ?? 0; + + if (inTokens === 0 && outTokens === 0) return; + + const time = new Date().toLocaleTimeString("en-US", { hour12: false, hour: "2-digit", minute: "2-digit" }); + const accountSuffix = connectionId ? ` | account=${connectionId.slice(0, 8)}...` : ""; + console.log(`${COLORS.green}[${time}] 📊 [${label}] ${provider.toUpperCase()} | in=${inTokens} | out=${outTokens}${accountSuffix}${COLORS.reset}`); + + // Normalize to OpenAI token shape for storage + const normalized = { + prompt_tokens: tokens.prompt_tokens ?? tokens.input_tokens ?? 0, + completion_tokens: tokens.completion_tokens ?? tokens.output_tokens ?? 0 + }; + + saveRequestUsage({ + provider: provider || "unknown", + model: model || "unknown", + tokens: normalized, + timestamp: new Date().toISOString(), + connectionId: connectionId || undefined, + apiKey: apiKey || undefined, + endpoint: endpoint || null + }).catch(() => {}); +} diff --git a/open-sse/handlers/chatCore/sseToJsonHandler.js b/open-sse/handlers/chatCore/sseToJsonHandler.js new file mode 100644 index 0000000..0c71be3 --- /dev/null +++ b/open-sse/handlers/chatCore/sseToJsonHandler.js @@ -0,0 +1,161 @@ +import { convertResponsesStreamToJson } from "../../transformer/streamToJsonConverter.js"; +import { createErrorResult } from "../../utils/error.js"; +import { HTTP_STATUS } from "../../config/constants.js"; +import { FORMATS } from "../../translator/formats.js"; +import { buildRequestDetail, extractRequestConfig, saveUsageStats } from "./requestDetail.js"; +import { saveRequestDetail, appendRequestLog } from "@/lib/usageDb.js"; + +/** + * Parse OpenAI-style SSE text into a single chat completion JSON. + * Used when provider forces streaming but client wants non-streaming. + */ +export function parseSSEToOpenAIResponse(rawSSE, fallbackModel) { + const chunks = []; + + for (const line of String(rawSSE || "").split("\n")) { + const trimmed = line.trim(); + if (!trimmed.startsWith("data:")) continue; + const payload = trimmed.slice(5).trim(); + if (!payload || payload === "[DONE]") continue; + try { chunks.push(JSON.parse(payload)); } catch { /* ignore malformed lines */ } + } + + if (chunks.length === 0) return null; + + const first = chunks[0]; + const contentParts = []; + const reasoningParts = []; + let finishReason = "stop"; + let usage = null; + + for (const chunk of chunks) { + const choice = chunk?.choices?.[0]; + const delta = choice?.delta || {}; + if (typeof delta.content === "string" && delta.content.length > 0) contentParts.push(delta.content); + if (typeof delta.reasoning_content === "string" && delta.reasoning_content.length > 0) reasoningParts.push(delta.reasoning_content); + if (choice?.finish_reason) finishReason = choice.finish_reason; + if (chunk?.usage && typeof chunk.usage === "object") usage = chunk.usage; + } + + const message = { role: "assistant", content: contentParts.join("") }; + if (reasoningParts.length > 0) message.reasoning_content = reasoningParts.join(""); + + const result = { + id: first.id || `chatcmpl-${Date.now()}`, + object: "chat.completion", + created: first.created || Math.floor(Date.now() / 1000), + model: first.model || fallbackModel || "unknown", + choices: [{ index: 0, message, finish_reason: finishReason }] + }; + if (usage) result.usage = usage; + return result; +} + +/** + * Handle case: provider forced streaming but client wants JSON. + * Supports both Codex/Responses API SSE and standard Chat Completions SSE. + */ +export async function handleForcedSSEToJson({ providerResponse, sourceFormat, provider, model, body, stream, translatedBody, finalBody, requestStartTime, connectionId, apiKey, clientRawRequest, onRequestSuccess, trackDone, appendLog }) { + const contentType = providerResponse.headers.get("content-type") || ""; + const isSSE = contentType.includes("text/event-stream") || (contentType === "" && provider === "codex"); + if (!isSSE) return null; // not handled here + + trackDone(); + + const ctx = { + provider, model, connectionId, + request: extractRequestConfig(body, stream), + providerRequest: finalBody || translatedBody || null + }; + + // Codex/Responses API SSE path + const isCodexResponsesApi = provider === "codex" || sourceFormat === FORMATS.OPENAI_RESPONSES; + if (isCodexResponsesApi) { + try { + const jsonResponse = await convertResponsesStreamToJson(providerResponse.body); + if (onRequestSuccess) await onRequestSuccess(); + + const usage = jsonResponse.usage || {}; + appendLog({ tokens: usage, status: "200 OK" }); + saveUsageStats({ provider, model, tokens: usage, connectionId, apiKey, endpoint: clientRawRequest?.endpoint }); + + const msgItem = jsonResponse.output?.find(item => item.type === "message"); + const textContent = msgItem?.content?.find(c => c.type === "output_text")?.text || msgItem?.content?.[0]?.text || null; + const totalLatency = Date.now() - requestStartTime; + + saveRequestDetail(buildRequestDetail({ + ...ctx, + latency: { ttft: totalLatency, total: totalLatency }, + tokens: { prompt_tokens: usage.input_tokens || 0, completion_tokens: usage.output_tokens || 0 }, + response: { content: textContent, thinking: null, finish_reason: jsonResponse.status || "unknown" }, + status: "success" + }, { endpoint: clientRawRequest?.endpoint || null })).catch(() => {}); + + // Client is Responses API → return as-is + if (sourceFormat === FORMATS.OPENAI_RESPONSES) { + return { success: true, response: new Response(JSON.stringify(jsonResponse), { headers: { "Content-Type": "application/json", "Access-Control-Allow-Origin": "*" } }) }; + } + + // Build client-format response + const inTokens = usage.input_tokens || 0; + const outTokens = usage.output_tokens || 0; + let finalResp; + + if (sourceFormat === FORMATS.ANTIGRAVITY || sourceFormat === FORMATS.GEMINI || sourceFormat === FORMATS.GEMINI_CLI) { + finalResp = { + response: { + candidates: [{ content: { role: "model", parts: [{ text: textContent || "" }] }, finishReason: "STOP", index: 0 }], + usageMetadata: { promptTokenCount: inTokens, candidatesTokenCount: outTokens, totalTokenCount: inTokens + outTokens }, + modelVersion: model, + responseId: jsonResponse.id || `resp_${Date.now()}` + } + }; + } else { + finalResp = { + id: jsonResponse.id || `chatcmpl-${Date.now()}`, + object: "chat.completion", + created: jsonResponse.created_at || Math.floor(Date.now() / 1000), + model: jsonResponse.model || model, + choices: [{ index: 0, message: { role: "assistant", content: textContent || "" }, finish_reason: jsonResponse.status === "completed" ? "stop" : (jsonResponse.status || "stop") }], + usage: { prompt_tokens: inTokens, completion_tokens: outTokens, total_tokens: inTokens + outTokens } + }; + } + + return { success: true, response: new Response(JSON.stringify(finalResp), { headers: { "Content-Type": "application/json", "Access-Control-Allow-Origin": "*" } }) }; + } catch (err) { + console.error("[ChatCore] Responses API SSE→JSON failed:", err); + return createErrorResult(HTTP_STATUS.BAD_GATEWAY, "Failed to convert streaming response to JSON"); + } + } + + // Standard Chat Completions SSE path + try { + const sseText = await providerResponse.text(); + const parsed = parseSSEToOpenAIResponse(sseText, model); + if (!parsed) return createErrorResult(HTTP_STATUS.BAD_GATEWAY, "Invalid SSE response for non-streaming request"); + + if (onRequestSuccess) await onRequestSuccess(); + + const usage = parsed.usage || {}; + appendLog({ tokens: usage, status: "200 OK" }); + saveUsageStats({ provider, model, tokens: usage, connectionId, apiKey, endpoint: clientRawRequest?.endpoint }); + + const totalLatency = Date.now() - requestStartTime; + saveRequestDetail(buildRequestDetail({ + ...ctx, + latency: { ttft: totalLatency, total: totalLatency }, + tokens: usage, + response: { + content: parsed.choices?.[0]?.message?.content || null, + thinking: parsed.choices?.[0]?.message?.reasoning_content || null, + finish_reason: parsed.choices?.[0]?.finish_reason || "unknown" + }, + status: "success" + }, { endpoint: clientRawRequest?.endpoint || null })).catch(() => {}); + + return { success: true, response: new Response(JSON.stringify(parsed), { headers: { "Content-Type": "application/json", "Access-Control-Allow-Origin": "*" } }) }; + } catch (err) { + console.error("[ChatCore] Chat Completions SSE→JSON failed:", err); + return createErrorResult(HTTP_STATUS.BAD_GATEWAY, "Failed to convert streaming response to JSON"); + } +} diff --git a/open-sse/handlers/chatCore/streamingHandler.js b/open-sse/handlers/chatCore/streamingHandler.js new file mode 100644 index 0000000..415c9b8 --- /dev/null +++ b/open-sse/handlers/chatCore/streamingHandler.js @@ -0,0 +1,97 @@ +import { FORMATS } from "../../translator/formats.js"; +import { needsTranslation } from "../../translator/index.js"; +import { createSSETransformStreamWithLogger, createPassthroughStreamWithLogger } from "../../utils/stream.js"; +import { pipeWithDisconnect } from "../../utils/streamHandler.js"; +import { buildRequestDetail, extractRequestConfig, saveUsageStats } from "./requestDetail.js"; +import { saveRequestDetail } from "@/lib/usageDb.js"; + +const SSE_HEADERS = { + "Content-Type": "text/event-stream", + "Cache-Control": "no-cache", + "Connection": "keep-alive", + "Access-Control-Allow-Origin": "*" +}; + +/** + * Determine which SSE transform stream to use based on provider/format. + */ +function buildTransformStream({ provider, sourceFormat, targetFormat, userAgent, reqLogger, toolNameMap, model, connectionId, body, onStreamComplete, apiKey }) { + const isDroidCLI = userAgent?.toLowerCase().includes("droid") || userAgent?.toLowerCase().includes("codex-cli"); + const needsCodexTranslation = provider === "codex" && targetFormat === FORMATS.OPENAI_RESPONSES && !isDroidCLI; + + if (needsCodexTranslation) { + // Codex returns Responses API SSE → translate to client format + let codexTarget; + if (sourceFormat === FORMATS.OPENAI_RESPONSES) codexTarget = FORMATS.OPENAI_RESPONSES; + else if (sourceFormat === FORMATS.CLAUDE) codexTarget = FORMATS.CLAUDE; + else if (sourceFormat === FORMATS.ANTIGRAVITY || sourceFormat === FORMATS.GEMINI || sourceFormat === FORMATS.GEMINI_CLI) codexTarget = FORMATS.ANTIGRAVITY; + else codexTarget = FORMATS.OPENAI; + return createSSETransformStreamWithLogger(FORMATS.OPENAI_RESPONSES, codexTarget, provider, reqLogger, toolNameMap, model, connectionId, body, onStreamComplete, apiKey); + } + + if (needsTranslation(targetFormat, sourceFormat)) { + return createSSETransformStreamWithLogger(targetFormat, sourceFormat, provider, reqLogger, toolNameMap, model, connectionId, body, onStreamComplete, apiKey); + } + + return createPassthroughStreamWithLogger(provider, reqLogger, model, connectionId, body, onStreamComplete, apiKey); +} + +/** + * Handle streaming response — pipe provider SSE through transform stream to client. + */ +export function handleStreamingResponse({ providerResponse, provider, model, sourceFormat, targetFormat, userAgent, body, stream, translatedBody, finalBody, requestStartTime, connectionId, apiKey, clientRawRequest, onRequestSuccess, reqLogger, toolNameMap, streamController, onStreamComplete }) { + if (onRequestSuccess) onRequestSuccess(); + + const transformStream = buildTransformStream({ provider, sourceFormat, targetFormat, userAgent, reqLogger, toolNameMap, model, connectionId, body, onStreamComplete, apiKey }); + const transformedBody = pipeWithDisconnect(providerResponse, transformStream, streamController); + + const streamDetailId = `${Date.now()}-${Math.random().toString(36).slice(2, 11)}`; + saveRequestDetail(buildRequestDetail({ + provider, model, connectionId, + latency: { ttft: 0, total: Date.now() - requestStartTime }, + tokens: { prompt_tokens: 0, completion_tokens: 0 }, + request: extractRequestConfig(body, stream), + providerRequest: finalBody || translatedBody || null, + providerResponse: "[Streaming - raw response not captured]", + response: { content: "[Streaming in progress...]", thinking: null, type: "streaming" }, + status: "success" + }, { id: streamDetailId })).catch(err => { + console.error("[RequestDetail] Failed to save streaming request:", err.message); + }); + + return { + success: true, + response: new Response(transformedBody, { headers: SSE_HEADERS }) + }; +} + +/** + * Build onStreamComplete callback for streaming usage tracking. + */ +export function buildOnStreamComplete({ provider, model, connectionId, apiKey, requestStartTime, body, stream, finalBody, translatedBody, clientRawRequest }) { + const streamDetailId = `${Date.now()}-${Math.random().toString(36).slice(2, 11)}`; + + const onStreamComplete = (contentObj, usage, ttftAt) => { + const latency = { + ttft: ttftAt ? ttftAt - requestStartTime : Date.now() - requestStartTime, + total: Date.now() - requestStartTime + }; + + saveRequestDetail(buildRequestDetail({ + provider, model, connectionId, + latency, + tokens: usage || { prompt_tokens: 0, completion_tokens: 0 }, + request: extractRequestConfig(body, stream), + providerRequest: finalBody || translatedBody || null, + providerResponse: contentObj.content || "[Empty streaming response]", + response: { content: contentObj.content || "[Empty streaming response]", thinking: contentObj.thinking || null, type: "streaming" }, + status: "success" + }, { id: streamDetailId })).catch(err => { + console.error("[RequestDetail] Failed to update streaming content:", err.message); + }); + + saveUsageStats({ provider, model, tokens: usage, connectionId, apiKey, endpoint: clientRawRequest?.endpoint, label: "STREAM USAGE" }); + }; + + return { onStreamComplete, streamDetailId }; +} diff --git a/open-sse/handlers/responsesHandler.js b/open-sse/handlers/responsesHandler.js index c256985..823ff8c 100644 --- a/open-sse/handlers/responsesHandler.js +++ b/open-sse/handlers/responsesHandler.js @@ -32,7 +32,7 @@ export async function handleResponsesCore({ body, modelInfo, credentials, log, o convertedBody.stream = false; } - // Call chat core handler + // Call chat core handler — force sourceFormat so streaming path knows this is a Responses API client const result = await handleChatCore({ body: convertedBody, modelInfo, @@ -41,7 +41,8 @@ export async function handleResponsesCore({ body, modelInfo, credentials, log, o onCredentialsRefreshed, onRequestSuccess, onDisconnect, - connectionId + connectionId, + sourceFormatOverride: "openai-responses" }); if (!result.success || !result.response) { diff --git a/package.json b/package.json index 0d0b44a..a82afae 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "9router-app", - "version": "0.2.99", + "version": "0.3.1", "description": "9Router web dashboard", "private": true, "scripts": { diff --git a/src/app/(dashboard)/dashboard/providers/[id]/page.js b/src/app/(dashboard)/dashboard/providers/[id]/page.js index 729730d..bd6ba05 100644 --- a/src/app/(dashboard)/dashboard/providers/[id]/page.js +++ b/src/app/(dashboard)/dashboard/providers/[id]/page.js @@ -24,6 +24,8 @@ export default function ProviderDetailPage() { const [selectedConnection, setSelectedConnection] = useState(null); const [modelAliases, setModelAliases] = useState({}); const [headerImgError, setHeaderImgError] = useState(false); + const [modelTestResults, setModelTestResults] = useState({}); + const [testingModels, setTestingModels] = useState(false); const { copied, copy } = useCopyToClipboard(); const providerInfo = providerNode @@ -256,6 +258,27 @@ export default function ProviderDetailPage() { } }; + const handleTestModels = async () => { + if (testingModels) return; + const conn = connections.find((c) => c.isActive !== false) || connections[0]; + if (!conn) return; + setTestingModels(true); + setModelTestResults({}); + try { + const res = await fetch(`/api/providers/${conn.id}/test-models`, { method: "POST" }); + const data = await res.json(); + if (res.ok) { + const map = {}; + for (const r of data.results || []) map[r.modelId] = r.ok ? "ok" : "error"; + setModelTestResults(map); + } + } catch { + // silent fail + } finally { + setTestingModels(false); + } + }; + const renderModelsSection = () => { if (isCompatible) { return ( @@ -305,6 +328,7 @@ export default function ProviderDetailPage() { onCopy={copy} onSetAlias={(alias) => handleSetAlias(model.id, alias, providerStorageAlias)} onDeleteAlias={() => handleDeleteAlias(existingAlias)} + testStatus={modelTestResults[model.id]} /> ); })} @@ -494,11 +518,23 @@ export default function ProviderDetailPage() { {/* Models */} -

- {providerInfo.passthroughModels ? "Model Aliases" : "Available Models"} -

+
+

+ {providerInfo.passthroughModels ? "Model Aliases" : "Available Models"} +

+ {connections.length > 0 && ( + + )} +
{renderModelsSection()} -
{/* Modals */} @@ -552,10 +588,27 @@ export default function ProviderDetailPage() { ); } -function ModelRow({ model, fullModel, alias, copied, onCopy }) { +function ModelRow({ model, fullModel, alias, copied, onCopy, testStatus }) { + const borderColor = testStatus === "ok" + ? "border-green-500/40" + : testStatus === "error" + ? "border-red-500/40" + : "border-border"; + + const iconColor = testStatus === "ok" + ? "#22c55e" + : testStatus === "error" + ? "#ef4444" + : undefined; + return ( -
- smart_toy +
+ + {testStatus === "ok" ? "check_circle" : testStatus === "error" ? "cancel" : "smart_toy"} + {fullModel}