diff --git a/open-sse/config/providerModels.js b/open-sse/config/providerModels.js
index e9b843b..cb7cb30 100644
--- a/open-sse/config/providerModels.js
+++ b/open-sse/config/providerModels.js
@@ -113,7 +113,6 @@ export const PROVIDER_MODELS = {
{ id: "claude-4.6-opus-max", name: "Claude 4.6 Opus Max" },
{ id: "claude-4.6-sonnet-medium-thinking", name: "Claude 4.6 Sonnet Medium Thinking" },
{ id: "kimi-k2.5", name: "Kimi K2.5" },
- { id: "gemini-3.1-pro-preview", name: "Gemini 3.1 Pro Preview" },
{ id: "gemini-3-flash-preview", name: "Gemini 3 Flash Preview" },
{ id: "gpt-5.2", name: "GPT 5.2" },
{ id: "gpt-5.3-codex", name: "GPT 5.3 Codex" },
diff --git a/open-sse/handlers/chatCore.js b/open-sse/handlers/chatCore.js
index 6ccbbd1..83e10fe 100644
--- a/open-sse/handlers/chatCore.js
+++ b/open-sse/handlers/chatCore.js
@@ -1,484 +1,93 @@
import { detectFormat, getTargetFormat } from "../services/provider.js";
-import { translateRequest, needsTranslation } from "../translator/index.js";
+import { translateRequest } from "../translator/index.js";
import { FORMATS } from "../translator/formats.js";
-import { createSSETransformStreamWithLogger, createPassthroughStreamWithLogger, COLORS } from "../utils/stream.js";
-import { createStreamController, pipeWithDisconnect } from "../utils/streamHandler.js";
-import { addBufferToUsage, filterUsageForFormat } from "../utils/usageTracking.js";
+import { COLORS } from "../utils/stream.js";
+import { createStreamController } from "../utils/streamHandler.js";
import { refreshWithRetry } from "../services/tokenRefresh.js";
import { createRequestLogger } from "../utils/requestLogger.js";
import { getModelTargetFormat, PROVIDER_ID_TO_ALIAS } from "../config/providerModels.js";
import { createErrorResult, parseUpstreamError, formatProviderError } from "../utils/error.js";
import { HTTP_STATUS } from "../config/constants.js";
import { handleBypassRequest } from "../utils/bypassHandler.js";
-import { saveRequestUsage, trackPendingRequest, appendRequestLog, saveRequestDetail } from "@/lib/usageDb.js";
+import { trackPendingRequest, appendRequestLog, saveRequestDetail } from "@/lib/usageDb.js";
import { getExecutor } from "../executors/index.js";
-import { convertResponsesStreamToJson } from "../transformer/streamToJsonConverter.js";
-
-/**
- * Translate non-streaming response to OpenAI format
- * Handles different provider response formats (Gemini, Claude, etc.)
- */
-function translateNonStreamingResponse(responseBody, targetFormat, sourceFormat) {
- // If already in source format (usually OpenAI), return as-is
- if (targetFormat === sourceFormat || targetFormat === FORMATS.OPENAI) {
- return responseBody;
- }
-
- // Handle Gemini/Antigravity format
- if (targetFormat === FORMATS.GEMINI || targetFormat === FORMATS.ANTIGRAVITY || targetFormat === FORMATS.GEMINI_CLI) {
- const response = responseBody.response || responseBody;
- if (!response?.candidates?.[0]) {
- return responseBody; // Can't translate, return raw
- }
-
- const candidate = response.candidates[0];
- const content = candidate.content;
- const usage = response.usageMetadata || responseBody.usageMetadata;
-
- // Build message content
- let textContent = "";
- const toolCalls = [];
- let reasoningContent = "";
-
- if (content?.parts) {
- for (const part of content.parts) {
- // Handle thinking/reasoning
- if (part.thought === true && part.text) {
- reasoningContent += part.text;
- }
- // Regular text
- else if (part.text !== undefined) {
- textContent += part.text;
- }
- // Function calls
- if (part.functionCall) {
- toolCalls.push({
- id: `call_${part.functionCall.name}_${Date.now()}_${toolCalls.length}`,
- type: "function",
- function: {
- name: part.functionCall.name,
- arguments: JSON.stringify(part.functionCall.args || {})
- }
- });
- }
- }
- }
-
- // Build OpenAI format message
- const message = { role: "assistant" };
- if (textContent) {
- message.content = textContent;
- }
- if (reasoningContent) {
- message.reasoning_content = reasoningContent;
- }
- if (toolCalls.length > 0) {
- message.tool_calls = toolCalls;
- }
- // If no content at all, set content to empty string
- if (!message.content && !message.tool_calls) {
- message.content = "";
- }
-
- // Determine finish reason
- let finishReason = (candidate.finishReason || "stop").toLowerCase();
- if (finishReason === "stop" && toolCalls.length > 0) {
- finishReason = "tool_calls";
- }
-
- const result = {
- id: `chatcmpl-${response.responseId || Date.now()}`,
- object: "chat.completion",
- created: Math.floor(new Date(response.createTime || Date.now()).getTime() / 1000),
- model: response.modelVersion || "gemini",
- choices: [{
- index: 0,
- message,
- finish_reason: finishReason
- }]
- };
-
- // Add usage if available (match streaming translator: add thoughtsTokenCount to prompt_tokens)
- if (usage) {
- result.usage = {
- prompt_tokens: (usage.promptTokenCount || 0) + (usage.thoughtsTokenCount || 0),
- completion_tokens: usage.candidatesTokenCount || 0,
- total_tokens: usage.totalTokenCount || 0
- };
- if (usage.thoughtsTokenCount > 0) {
- result.usage.completion_tokens_details = {
- reasoning_tokens: usage.thoughtsTokenCount
- };
- }
- }
-
- return result;
- }
-
- // Handle Claude format
- if (targetFormat === FORMATS.CLAUDE) {
- if (!responseBody.content) {
- return responseBody; // Can't translate, return raw
- }
-
- let textContent = "";
- let thinkingContent = "";
- const toolCalls = [];
-
- for (const block of responseBody.content) {
- if (block.type === "text") {
- textContent += block.text;
- } else if (block.type === "thinking") {
- thinkingContent += block.thinking || "";
- } else if (block.type === "tool_use") {
- toolCalls.push({
- id: block.id,
- type: "function",
- function: {
- name: block.name,
- arguments: JSON.stringify(block.input || {})
- }
- });
- }
- }
-
- const message = { role: "assistant" };
- if (textContent) {
- message.content = textContent;
- }
- if (thinkingContent) {
- message.reasoning_content = thinkingContent;
- }
- if (toolCalls.length > 0) {
- message.tool_calls = toolCalls;
- }
- if (!message.content && !message.tool_calls) {
- message.content = "";
- }
-
- let finishReason = responseBody.stop_reason || "stop";
- if (finishReason === "end_turn") finishReason = "stop";
- if (finishReason === "tool_use") finishReason = "tool_calls";
-
- const result = {
- id: `chatcmpl-${responseBody.id || Date.now()}`,
- object: "chat.completion",
- created: Math.floor(Date.now() / 1000),
- model: responseBody.model || "claude",
- choices: [{
- index: 0,
- message,
- finish_reason: finishReason
- }]
- };
-
- if (responseBody.usage) {
- result.usage = {
- prompt_tokens: responseBody.usage.input_tokens || 0,
- completion_tokens: responseBody.usage.output_tokens || 0,
- total_tokens: (responseBody.usage.input_tokens || 0) + (responseBody.usage.output_tokens || 0)
- };
- }
-
- return result;
- }
-
- // Unknown format, return as-is
- return responseBody;
-}
-
-/**
- * Extract usage from non-streaming response body
- * Handles different provider response formats
- */
-function extractUsageFromResponse(responseBody, provider) {
- if (!responseBody || typeof responseBody !== 'object') return null;
-
- // Claude format - check first to avoid conflict with OpenAI check
- if (responseBody.usage && typeof responseBody.usage === 'object' && responseBody.usage.input_tokens !== undefined) {
- return {
- prompt_tokens: responseBody.usage.input_tokens || 0,
- completion_tokens: responseBody.usage.output_tokens || 0,
- cache_read_input_tokens: responseBody.usage.cache_read_input_tokens,
- cache_creation_input_tokens: responseBody.usage.cache_creation_input_tokens
- };
- }
-
- // OpenAI format
- if (responseBody.usage && typeof responseBody.usage === 'object' && responseBody.usage.prompt_tokens !== undefined) {
- return {
- prompt_tokens: responseBody.usage.prompt_tokens || 0,
- completion_tokens: responseBody.usage.completion_tokens || 0,
- cached_tokens: responseBody.usage.prompt_tokens_details?.cached_tokens,
- reasoning_tokens: responseBody.usage.completion_tokens_details?.reasoning_tokens
- };
- }
-
- // Gemini format
- if (responseBody.usageMetadata && typeof responseBody.usageMetadata === 'object') {
- return {
- prompt_tokens: responseBody.usageMetadata.promptTokenCount || 0,
- completion_tokens: responseBody.usageMetadata.candidatesTokenCount || 0,
- reasoning_tokens: responseBody.usageMetadata.thoughtsTokenCount
- };
- }
-
- return null;
-}
-
-/**
- * Extract full request configuration from body
- * Captures all relevant parameters for request details
- */
-function extractRequestConfig(body, stream) {
- const config = {
- messages: body.messages || [],
- model: body.model,
- stream: stream
- };
-
- // Add all optional configuration parameters
- const optionalParams = [
- 'temperature', 'top_p', 'top_k',
- 'max_tokens', 'max_completion_tokens',
- 'thinking', 'reasoning', 'enable_thinking',
- 'presence_penalty', 'frequency_penalty',
- 'seed', 'stop', 'tools', 'tool_choice',
- 'response_format', 'prediction', 'store', 'metadata',
- 'n', 'logprobs', 'top_logprobs', 'logit_bias',
- 'user', 'parallel_tool_calls'
- ];
-
- for (const param of optionalParams) {
- if (body[param] !== undefined) {
- config[param] = body[param];
- }
- }
-
- return config;
-}
-
-/**
- * Convert OpenAI-style SSE chunks into a single non-streaming JSON response.
- * Used as a fallback when upstream returns text/event-stream for stream=false.
- */
-function parseSSEToOpenAIResponse(rawSSE, fallbackModel) {
- const lines = String(rawSSE || "").split("\n");
- const chunks = [];
-
- for (const line of lines) {
- const trimmed = line.trim();
- if (!trimmed.startsWith("data:")) continue;
- const payload = trimmed.slice(5).trim();
- if (!payload || payload === "[DONE]") continue;
- try {
- chunks.push(JSON.parse(payload));
- } catch {
- // Ignore malformed SSE lines and continue best-effort parsing.
- }
- }
-
- if (chunks.length === 0) return null;
-
- const first = chunks[0];
- const contentParts = [];
- const reasoningParts = [];
- let finishReason = "stop";
- let usage = null;
-
- for (const chunk of chunks) {
- const choice = chunk?.choices?.[0];
- const delta = choice?.delta || {};
-
- if (typeof delta.content === "string" && delta.content.length > 0) {
- contentParts.push(delta.content);
- }
- if (typeof delta.reasoning_content === "string" && delta.reasoning_content.length > 0) {
- reasoningParts.push(delta.reasoning_content);
- }
- if (choice?.finish_reason) {
- finishReason = choice.finish_reason;
- }
- if (chunk?.usage && typeof chunk.usage === "object") {
- usage = chunk.usage;
- }
- }
-
- const message = {
- role: "assistant",
- content: contentParts.join("")
- };
- if (reasoningParts.length > 0) {
- message.reasoning_content = reasoningParts.join("");
- }
-
- const result = {
- id: first.id || `chatcmpl-${Date.now()}`,
- object: "chat.completion",
- created: first.created || Math.floor(Date.now() / 1000),
- model: first.model || fallbackModel || "unknown",
- choices: [
- {
- index: 0,
- message,
- finish_reason: finishReason
- }
- ]
- };
-
- if (usage) {
- result.usage = usage;
- }
-
- return result;
-}
+import { buildRequestDetail, extractRequestConfig } from "./chatCore/requestDetail.js";
+import { handleForcedSSEToJson } from "./chatCore/sseToJsonHandler.js";
+import { handleNonStreamingResponse } from "./chatCore/nonStreamingHandler.js";
+import { handleStreamingResponse, buildOnStreamComplete } from "./chatCore/streamingHandler.js";
/**
* Core chat handler - shared between SSE and Worker
- * Returns { success, response, status, error } for caller to handle fallback
- * @param {object} options
* @param {object} options.body - Request body
* @param {object} options.modelInfo - { provider, model }
* @param {object} options.credentials - Provider credentials
- * @param {object} options.log - Logger instance (optional)
- * @param {function} options.onCredentialsRefreshed - Callback when credentials are refreshed
- * @param {function} options.onRequestSuccess - Callback when request succeeds (to clear error status)
- * @param {function} options.onDisconnect - Callback when client disconnects
- * @param {string} options.connectionId - Connection ID for usage tracking
- * @param {string} options.apiKey - API key for usage tracking
+ * @param {string} options.sourceFormatOverride - Override detected source format (e.g. "openai-responses")
*/
-export async function handleChatCore({ body, modelInfo, credentials, log, onCredentialsRefreshed, onRequestSuccess, onDisconnect, clientRawRequest, connectionId, userAgent, apiKey }) {
+export async function handleChatCore({ body, modelInfo, credentials, log, onCredentialsRefreshed, onRequestSuccess, onDisconnect, clientRawRequest, connectionId, userAgent, apiKey, sourceFormatOverride }) {
const { provider, model } = modelInfo;
const requestStartTime = Date.now();
- const sourceFormat = detectFormat(body);
+ const sourceFormat = sourceFormatOverride || detectFormat(body);
- // Check for bypass patterns (warmup, skip) - return fake response
+ // Check for bypass patterns (warmup, skip)
const bypassResponse = handleBypassRequest(body, model, userAgent);
- if (bypassResponse) {
- return bypassResponse;
- }
-
- // Detect source format and get target format
- // Model-specific targetFormat takes priority over provider default
+ if (bypassResponse) return bypassResponse;
const alias = PROVIDER_ID_TO_ALIAS[provider] || provider;
const modelTargetFormat = getModelTargetFormat(alias, model);
const targetFormat = modelTargetFormat || getTargetFormat(provider);
- // Track if client actually wants streaming (before we force it for providers)
const clientRequestedStreaming = body.stream === true || sourceFormat === FORMATS.ANTIGRAVITY || sourceFormat === FORMATS.GEMINI || sourceFormat === FORMATS.GEMINI_CLI;
- const providerRequiresStreaming = provider === 'openai' || provider === 'codex';
-
- // Force streaming for OpenAI/Codex models (they don't support non-streaming mode properly)
+ const providerRequiresStreaming = provider === "openai" || provider === "codex";
const stream = providerRequiresStreaming ? true : (body.stream !== false);
- // Create request logger for this session: sourceFormat_targetFormat_model
const reqLogger = await createRequestLogger(sourceFormat, targetFormat, model);
-
- // 0. Log client raw request (before any conversion)
- if (clientRawRequest) {
- reqLogger.logClientRawRequest(
- clientRawRequest.endpoint,
- clientRawRequest.body,
- clientRawRequest.headers
- );
- }
-
- // 1. Log raw request from client
+ if (clientRawRequest) reqLogger.logClientRawRequest(clientRawRequest.endpoint, clientRawRequest.body, clientRawRequest.headers);
reqLogger.logRawRequest(body);
-
log?.debug?.("FORMAT", `${sourceFormat} → ${targetFormat} | stream=${stream}`);
- // Translate request (pass reqLogger for intermediate logging)
- let translatedBody = body;
- translatedBody = translateRequest(sourceFormat, targetFormat, model, body, stream, credentials, provider, reqLogger);
-
- // Extract toolNameMap for response translation (Claude OAuth)
+ let translatedBody = translateRequest(sourceFormat, targetFormat, model, body, stream, credentials, provider, reqLogger);
const toolNameMap = translatedBody._toolNameMap;
delete translatedBody._toolNameMap;
-
- // Update model in body
translatedBody.model = model;
- // Get executor for this provider
const executor = getExecutor(provider);
-
- // Track pending request
trackPendingRequest(model, provider, connectionId, true);
+ appendRequestLog({ model, provider, connectionId, status: "PENDING" }).catch(() => {});
- // Log start
- appendRequestLog({ model, provider, connectionId, status: "PENDING" }).catch(() => { });
-
- const msgCount = translatedBody.messages?.length
- || translatedBody.input?.length
- || translatedBody.contents?.length
- || translatedBody.request?.contents?.length
- || 0;
+ const msgCount = translatedBody.messages?.length || translatedBody.input?.length || translatedBody.contents?.length || translatedBody.request?.contents?.length || 0;
log?.debug?.("REQUEST", `${provider.toUpperCase()} | ${model} | ${msgCount} msgs`);
- // Create stream controller for disconnect detection
- const streamController = createStreamController({
+ const streamController = createStreamController({
onDisconnect: (reason) => {
- // Track request finished (disconnected)
trackPendingRequest(model, provider, connectionId, false);
if (onDisconnect) onDisconnect(reason);
},
- onError: (error) => {
- // Track request finished (error/zombie)
- trackPendingRequest(model, provider, connectionId, false);
- },
- log,
- provider,
- model
+ onError: () => trackPendingRequest(model, provider, connectionId, false),
+ log, provider, model
});
- // Execute request using executor (handles URL building, headers, fallback, transform)
- let providerResponse;
- let providerUrl;
- let providerHeaders;
- let finalBody;
-
+ // Execute request
+ let providerResponse, providerUrl, providerHeaders, finalBody;
try {
- const result = await executor.execute({
- model,
- body: translatedBody,
- stream,
- credentials,
- signal: streamController.signal,
- log
- });
-
+ const result = await executor.execute({ model, body: translatedBody, stream, credentials, signal: streamController.signal, log });
providerResponse = result.response;
providerUrl = result.url;
providerHeaders = result.headers;
finalBody = result.transformedBody;
-
- // Log target request (final request to provider)
reqLogger.logTargetRequest(providerUrl, providerHeaders, finalBody);
-
} catch (error) {
trackPendingRequest(model, provider, connectionId, false, true);
- appendRequestLog({ model, provider, connectionId, status: `FAILED ${error.name === "AbortError" ? 499 : HTTP_STATUS.BAD_GATEWAY}` }).catch(() => { });
-
- const errorDetail = {
- provider: provider || "unknown",
- model: model || "unknown",
- connectionId: connectionId || undefined,
- timestamp: new Date().toISOString(),
+ appendRequestLog({ model, provider, connectionId, status: `FAILED ${error.name === "AbortError" ? 499 : HTTP_STATUS.BAD_GATEWAY}` }).catch(() => {});
+ saveRequestDetail(buildRequestDetail({
+ provider, model, connectionId,
latency: { ttft: 0, total: Date.now() - requestStartTime },
tokens: { prompt_tokens: 0, completion_tokens: 0 },
request: extractRequestConfig(body, stream),
providerRequest: translatedBody || null,
- providerResponse: null,
- response: {
- error: error.message || String(error),
- status: error.name === "AbortError" ? 499 : 502,
- thinking: null
- },
+ response: { error: error.message || String(error), status: error.name === "AbortError" ? 499 : 502, thinking: null },
status: "error"
- };
- saveRequestDetail(errorDetail).catch(() => {});
+ })).catch(() => {});
if (error.name === "AbortError") {
streamController.handleError(error);
@@ -489,530 +98,67 @@ export async function handleChatCore({ body, modelInfo, credentials, log, onCred
return createErrorResult(HTTP_STATUS.BAD_GATEWAY, errMsg);
}
- // Handle 401/403 - try token refresh using executor
+ // Handle 401/403 - try token refresh
if (providerResponse.status === HTTP_STATUS.UNAUTHORIZED || providerResponse.status === HTTP_STATUS.FORBIDDEN) {
- const newCredentials = await refreshWithRetry(
- () => executor.refreshCredentials(credentials, log),
- 3,
- log
- );
-
+ const newCredentials = await refreshWithRetry(() => executor.refreshCredentials(credentials, log), 3, log);
if (newCredentials?.accessToken || newCredentials?.copilotToken) {
log?.info?.("TOKEN", `${provider.toUpperCase()} | refreshed`);
-
- // Update credentials
Object.assign(credentials, newCredentials);
-
- // Notify caller about refreshed credentials
- if (onCredentialsRefreshed && newCredentials) {
- await onCredentialsRefreshed(newCredentials);
- }
-
- // Retry with new credentials
+ if (onCredentialsRefreshed) await onCredentialsRefreshed(newCredentials);
try {
- const retryResult = await executor.execute({
- model,
- body: translatedBody,
- stream,
- credentials,
- signal: streamController.signal,
- log
- });
-
- if (retryResult.response.ok) {
- providerResponse = retryResult.response;
- providerUrl = retryResult.url;
- }
- } catch (retryError) {
- log?.warn?.("TOKEN", `${provider.toUpperCase()} | retry after refresh failed`);
- }
+ const retryResult = await executor.execute({ model, body: translatedBody, stream, credentials, signal: streamController.signal, log });
+ if (retryResult.response.ok) { providerResponse = retryResult.response; providerUrl = retryResult.url; }
+ } catch { log?.warn?.("TOKEN", `${provider.toUpperCase()} | retry after refresh failed`); }
} else {
log?.warn?.("TOKEN", `${provider.toUpperCase()} | refresh failed`);
}
}
- // Check provider response - return error info for fallback handling
+ // Provider returned error
if (!providerResponse.ok) {
trackPendingRequest(model, provider, connectionId, false, true);
const { statusCode, message, retryAfterMs } = await parseUpstreamError(providerResponse, provider);
- appendRequestLog({ model, provider, connectionId, status: `FAILED ${statusCode}` }).catch(() => { });
-
- const errorDetail = {
- provider: provider || "unknown",
- model: model || "unknown",
- connectionId: connectionId || undefined,
- timestamp: new Date().toISOString(),
+ appendRequestLog({ model, provider, connectionId, status: `FAILED ${statusCode}` }).catch(() => {});
+ saveRequestDetail(buildRequestDetail({
+ provider, model, connectionId,
latency: { ttft: 0, total: Date.now() - requestStartTime },
tokens: { prompt_tokens: 0, completion_tokens: 0 },
request: extractRequestConfig(body, stream),
providerRequest: finalBody || translatedBody || null,
- providerResponse: null,
- response: {
- error: message,
- status: statusCode,
- thinking: null
- },
+ response: { error: message, status: statusCode, thinking: null },
status: "error"
- };
- saveRequestDetail(errorDetail).catch(() => {});
+ })).catch(() => {});
const errMsg = formatProviderError(new Error(message), provider, model, statusCode);
console.log(`${COLORS.red}[ERROR] ${errMsg}${COLORS.reset}`);
-
- // Log Antigravity retry time if available
if (retryAfterMs && provider === "antigravity") {
- const retrySeconds = Math.ceil(retryAfterMs / 1000);
- log?.debug?.("RETRY", `Antigravity quota reset in ${retrySeconds}s (${retryAfterMs}ms)`);
+ log?.debug?.("RETRY", `Antigravity quota reset in ${Math.ceil(retryAfterMs / 1000)}s`);
}
-
- // Log error with full request body for debugging
reqLogger.logError(new Error(message), finalBody || translatedBody);
-
return createErrorResult(statusCode, errMsg, retryAfterMs);
}
- // Provider forced streaming but client wants JSON - convert SSE to JSON
+ const sharedCtx = { provider, model, body, stream, translatedBody, finalBody, requestStartTime, connectionId, apiKey, clientRawRequest, onRequestSuccess };
+ const appendLog = (extra) => appendRequestLog({ model, provider, connectionId, ...extra }).catch(() => {});
+ const trackDone = () => trackPendingRequest(model, provider, connectionId, false);
+
+ // Provider forced streaming but client wants JSON
if (!clientRequestedStreaming && providerRequiresStreaming) {
- trackPendingRequest(model, provider, connectionId, false);
- const contentType = providerResponse.headers.get("content-type") || "";
-
- // Treat as SSE if content-type says so OR if it's empty/missing
- // (Codex API doesn't always set Content-Type on streaming responses)
- const isSSEResponse = contentType.includes("text/event-stream") || (contentType === "" && provider === "codex");
- if (isSSEResponse) {
- // Codex always returns Responses API SSE format regardless of client source format
- const isCodexResponsesApi = provider === "codex" || sourceFormat === "openai-responses";
-
- if (isCodexResponsesApi) {
- // Responses API SSE → parse → translate to client format
- try {
- const jsonResponse = await convertResponsesStreamToJson(providerResponse.body);
-
- if (onRequestSuccess) await onRequestSuccess();
-
- const usage = jsonResponse.usage || {};
- appendRequestLog({ model, provider, connectionId, tokens: usage, status: "200 OK" }).catch(() => { });
-
- if (usage && typeof usage === 'object') {
- const msg = `[${new Date().toLocaleTimeString("en-US", { hour12: false, hour: "2-digit", minute: "2-digit" })}] 📊 [USAGE] ${provider.toUpperCase()} | in=${usage?.input_tokens || 0} | out=${usage?.output_tokens || 0}${connectionId ? ` | account=${connectionId.slice(0, 8)}...` : ""}`;
- console.log(`${COLORS.green}${msg}${COLORS.reset}`);
-
- saveRequestUsage({
- provider: provider || "unknown",
- model: model || "unknown",
- tokens: { prompt_tokens: usage.input_tokens || 0, completion_tokens: usage.output_tokens || 0 },
- timestamp: new Date().toISOString(),
- connectionId: connectionId || undefined,
- apiKey: apiKey || undefined
- }).catch(() => { });
- }
-
- const msgItem = jsonResponse.output?.find(item => item.type === "message");
- const textContent = msgItem?.content?.find(c => c.type === "output_text")?.text || msgItem?.content?.[0]?.text || null;
- console.log(`[DBG] codex status=${jsonResponse.status} output.len=${jsonResponse.output?.length} msgItem.type=${msgItem?.type} textLen=${textContent?.length||0}`);
- const totalLatency = Date.now() - requestStartTime;
- saveRequestDetail({
- provider: provider || "unknown",
- model: model || "unknown",
- connectionId: connectionId || undefined,
- timestamp: new Date().toISOString(),
- latency: { ttft: totalLatency, total: totalLatency },
- tokens: { prompt_tokens: usage.input_tokens || 0, completion_tokens: usage.output_tokens || 0 },
- request: extractRequestConfig(body, stream),
- providerRequest: finalBody || translatedBody || null,
- providerResponse: null,
- response: { content: textContent, thinking: null, finish_reason: jsonResponse.status || "unknown" },
- status: "success",
- endpoint: clientRawRequest?.endpoint || null
- }).catch(() => { });
-
- // If client is openai-responses → return as-is
- if (sourceFormat === "openai-responses") {
- return {
- success: true,
- response: new Response(JSON.stringify(jsonResponse), {
- headers: { "Content-Type": "application/json", "Access-Control-Allow-Origin": "*" }
- })
- };
- }
-
- // Translate Responses API JSON → OpenAI chat completion JSON
- const openaiMsg = {
- id: jsonResponse.id || `chatcmpl-${Date.now()}`,
- object: "chat.completion",
- created: jsonResponse.created_at || Math.floor(Date.now() / 1000),
- model: jsonResponse.model || model,
- choices: [{
- index: 0,
- message: { role: "assistant", content: textContent || "" },
- finish_reason: jsonResponse.status === "completed" ? "stop" : (jsonResponse.status || "stop")
- }],
- usage: {
- prompt_tokens: usage.input_tokens || 0,
- completion_tokens: usage.output_tokens || 0,
- total_tokens: (usage.input_tokens || 0) + (usage.output_tokens || 0)
- }
- };
-
- // Build client-format response based on sourceFormat
- let finalResp;
- if (sourceFormat === FORMATS.ANTIGRAVITY || sourceFormat === FORMATS.GEMINI || sourceFormat === FORMATS.GEMINI_CLI) {
- // Antigravity/Gemini non-streaming format
- finalResp = {
- response: {
- candidates: [{
- content: { role: "model", parts: [{ text: textContent || "" }] },
- finishReason: "STOP",
- index: 0
- }],
- usageMetadata: {
- promptTokenCount: usage.input_tokens || 0,
- candidatesTokenCount: usage.output_tokens || 0,
- totalTokenCount: (usage.input_tokens || 0) + (usage.output_tokens || 0)
- },
- modelVersion: model,
- responseId: jsonResponse.id || `resp_${Date.now()}`
- }
- };
- } else {
- finalResp = openaiMsg;
- }
-
- return {
- success: true,
- response: new Response(JSON.stringify(finalResp), {
- headers: { "Content-Type": "application/json", "Access-Control-Allow-Origin": "*" }
- })
- };
- } catch (error) {
- console.error("[ChatCore] Responses API SSE→JSON conversion failed:", error);
- return createErrorResult(HTTP_STATUS.BAD_GATEWAY, "Failed to convert streaming response to JSON");
- }
- } else {
- // Chat Completions SSE → Chat Completions JSON
- try {
- const sseText = await providerResponse.text();
- const parsed = parseSSEToOpenAIResponse(sseText, model);
- if (parsed) {
- if (onRequestSuccess) await onRequestSuccess();
-
- const usage = parsed.usage || {};
- appendRequestLog({ model, provider, connectionId, tokens: usage, status: "200 OK" }).catch(() => { });
-
- if (usage && typeof usage === "object") {
- saveRequestUsage({
- provider: provider || "unknown",
- model: model || "unknown",
- tokens: usage,
- timestamp: new Date().toISOString(),
- connectionId: connectionId || undefined,
- apiKey: apiKey || undefined,
- endpoint: clientRawRequest?.endpoint || null
- }).catch(() => { });
- }
-
- const totalLatency = Date.now() - requestStartTime;
- saveRequestDetail({
- provider: provider || "unknown",
- model: model || "unknown",
- connectionId: connectionId || undefined,
- timestamp: new Date().toISOString(),
- latency: { ttft: totalLatency, total: totalLatency },
- tokens: usage,
- request: extractRequestConfig(body, stream),
- providerRequest: finalBody || translatedBody || null,
- providerResponse: null,
- response: {
- content: parsed.choices?.[0]?.message?.content || null,
- thinking: parsed.choices?.[0]?.message?.reasoning_content || null,
- finish_reason: parsed.choices?.[0]?.finish_reason || "unknown"
- },
- status: "success",
- endpoint: clientRawRequest?.endpoint || null
- }).catch(() => { });
-
- return {
- success: true,
- response: new Response(JSON.stringify(parsed), {
- headers: {
- "Content-Type": "application/json",
- "Access-Control-Allow-Origin": "*"
- }
- })
- };
- }
- return createErrorResult(HTTP_STATUS.BAD_GATEWAY, "Invalid SSE response for non-streaming request");
- } catch (error) {
- console.error("[ChatCore] Chat Completions SSE→JSON conversion failed:", error);
- return createErrorResult(HTTP_STATUS.BAD_GATEWAY, "Failed to convert streaming response to JSON");
- }
- }
- }
+ const result = await handleForcedSSEToJson({ ...sharedCtx, providerResponse, sourceFormat, trackDone, appendLog });
+ if (result) return result;
}
- // Non-streaming response
+ // True non-streaming response
if (!stream) {
- trackPendingRequest(model, provider, connectionId, false);
- const contentType = providerResponse.headers.get("content-type") || "";
- let responseBody;
-
- if (contentType.includes("text/event-stream")) {
- // Upstream returned SSE even though stream=false; convert best-effort to JSON.
- const sseText = await providerResponse.text();
- const parsedFromSSE = parseSSEToOpenAIResponse(sseText, model);
- if (!parsedFromSSE) {
- appendRequestLog({ model, provider, connectionId, status: `FAILED ${HTTP_STATUS.BAD_GATEWAY}` }).catch(() => { });
- return createErrorResult(HTTP_STATUS.BAD_GATEWAY, "Invalid SSE response for non-streaming request");
- }
- responseBody = parsedFromSSE;
- } else {
- try {
- responseBody = await providerResponse.json();
- } catch (parseError) {
- appendRequestLog({ model, provider, connectionId, status: `FAILED ${HTTP_STATUS.BAD_GATEWAY}` }).catch(() => { });
- console.error(`[ChatCore] Failed to parse JSON response from ${provider}:`, parseError.message);
- return createErrorResult(HTTP_STATUS.BAD_GATEWAY, `Invalid JSON response from ${provider}`);
- }
- }
-
- // Log provider response (raw response from provider)
- reqLogger.logProviderResponse(
- providerResponse.status,
- providerResponse.statusText,
- providerResponse.headers,
- responseBody
- );
-
- // Notify success - caller can clear error status if needed
- if (onRequestSuccess) {
- await onRequestSuccess();
- }
-
- // Log usage for non-streaming responses
- const usage = extractUsageFromResponse(responseBody, provider);
- appendRequestLog({ model, provider, connectionId, tokens: usage, status: "200 OK" }).catch(() => { });
- if (usage && typeof usage === 'object') {
- const msg = `[${new Date().toLocaleTimeString("en-US", { hour12: false, hour: "2-digit", minute: "2-digit" })}] 📊 [USAGE] ${provider.toUpperCase()} | in=${usage?.prompt_tokens || 0} | out=${usage?.completion_tokens || 0}${connectionId ? ` | account=${connectionId.slice(0, 8)}...` : ""}`;
- console.log(`${COLORS.green}${msg}${COLORS.reset}`);
-
- saveRequestUsage({
- provider: provider || "unknown",
- model: model || "unknown",
- tokens: usage,
- timestamp: new Date().toISOString(),
- connectionId: connectionId || undefined,
- apiKey: apiKey || undefined,
- endpoint: clientRawRequest?.endpoint || null
- }).catch(err => {
- console.error("Failed to save usage stats:", err.message);
- });
- }
-
- // Translate response to client's expected format (usually OpenAI)
- const translatedResponse = needsTranslation(targetFormat, sourceFormat)
- ? translateNonStreamingResponse(responseBody, targetFormat, sourceFormat)
- : responseBody;
-
- // Ensure OpenAI-required fields are present (needed for Letta and other strict clients)
- if (!translatedResponse.object) translatedResponse.object = "chat.completion";
- if (!translatedResponse.created) translatedResponse.created = Math.floor(Date.now() / 1000);
-
- // Strip Azure-specific non-standard fields
- if (translatedResponse.prompt_filter_results !== undefined) {
- delete translatedResponse.prompt_filter_results;
- }
- if (translatedResponse?.choices) {
- for (const choice of translatedResponse.choices) {
- if (choice.content_filter_results !== undefined) {
- delete choice.content_filter_results;
- }
- }
- }
-
- // Add buffer and filter usage for client (to prevent CLI context errors)
- if (translatedResponse?.usage) {
- const buffered = addBufferToUsage(translatedResponse.usage);
- translatedResponse.usage = filterUsageForFormat(buffered, sourceFormat);
- }
-
- // Log converted response (final response to client)
- reqLogger.logConvertedResponse(translatedResponse);
-
- const totalLatency = Date.now() - requestStartTime;
- const requestDetail = {
- provider: provider || "unknown",
- model: model || "unknown",
- connectionId: connectionId || undefined,
- timestamp: new Date().toISOString(),
- latency: {
- ttft: totalLatency,
- total: totalLatency
- },
- tokens: usage || { prompt_tokens: 0, completion_tokens: 0 },
- request: extractRequestConfig(body, stream),
- providerRequest: finalBody || translatedBody || null,
- providerResponse: responseBody || null,
- response: {
- content: translatedResponse?.choices?.[0]?.message?.content ||
- translatedResponse?.content ||
- null,
- thinking: translatedResponse?.choices?.[0]?.message?.reasoning_content ||
- translatedResponse?.reasoning_content ||
- null,
- finish_reason: translatedResponse?.choices?.[0]?.finish_reason || "unknown"
- },
- status: "success",
- endpoint: clientRawRequest?.endpoint || null
- };
-
- // Async save (don't block response)
- saveRequestDetail(requestDetail).catch(err => {
- console.error("[RequestDetail] Failed to save:", err.message);
- });
-
- return {
- success: true,
- response: new Response(JSON.stringify(translatedResponse), {
- headers: {
- "Content-Type": "application/json",
- "Access-Control-Allow-Origin": "*"
- }
- })
- };
+ return handleNonStreamingResponse({ ...sharedCtx, providerResponse, sourceFormat, targetFormat, reqLogger, trackDone, appendLog });
}
// Streaming response
-
- // Notify success - caller can clear error status if needed
- if (onRequestSuccess) {
- await onRequestSuccess();
- }
-
- const responseHeaders = {
- "Content-Type": "text/event-stream",
- "Cache-Control": "no-cache",
- "Connection": "keep-alive",
- "Access-Control-Allow-Origin": "*"
- };
-
- let streamContent = "";
- let streamUsage = null;
- const streamDetailId = `${Date.now()}-${Math.random().toString(36).slice(2, 11)}`;
-
- const onStreamComplete = (contentObj, usage, ttftAt) => {
- // contentObj is object { content, thinking }
- streamUsage = usage;
-
- const updatedDetail = {
- provider: provider || "unknown",
- model: model || "unknown",
- connectionId: connectionId || undefined,
- timestamp: new Date().toISOString(),
- latency: {
- ttft: ttftAt ? ttftAt - requestStartTime : Date.now() - requestStartTime,
- total: Date.now() - requestStartTime
- },
- tokens: usage || { prompt_tokens: 0, completion_tokens: 0 },
- request: extractRequestConfig(body, stream),
- providerRequest: finalBody || translatedBody || null,
- providerResponse: contentObj.content || "[Empty streaming response]",
- response: {
- content: contentObj.content || "[Empty streaming response]",
- thinking: contentObj.thinking || null,
- type: "streaming"
- },
- status: "success",
- id: streamDetailId
- };
-
- saveRequestDetail(updatedDetail).catch(err => {
- console.error("[RequestDetail] Failed to update streaming content:", err.message);
- });
-
- // Save usage stats for dashboard (skip if no real token data to avoid duplicates)
- if (usage && typeof usage === 'object' && (usage.prompt_tokens > 0 || usage.completion_tokens > 0)) {
- const msg = `[${new Date().toLocaleTimeString("en-US", { hour12: false, hour: "2-digit", minute: "2-digit" })}] 📊 [STREAM USAGE] ${provider.toUpperCase()} | in=${usage?.prompt_tokens || 0} | out=${usage?.completion_tokens || 0}${connectionId ? ` | account=${connectionId.slice(0, 8)}...` : ""}`;
- console.log(`${COLORS.green}${msg}${COLORS.reset}`);
-
- saveRequestUsage({
- provider: provider || "unknown",
- model: model || "unknown",
- tokens: usage,
- timestamp: new Date().toISOString(),
- connectionId: connectionId || undefined,
- apiKey: apiKey || undefined,
- endpoint: clientRawRequest?.endpoint || null
- }).catch(err => {
- console.error("Failed to save streaming usage stats:", err.message);
- });
- }
- };
-
- let transformStream;
- const isDroidCLI = userAgent?.toLowerCase().includes('droid') || userAgent?.toLowerCase().includes('codex-cli');
- const needsCodexTranslation = provider === 'codex'
- && targetFormat === 'openai-responses'
- && !isDroidCLI;
-
- if (needsCodexTranslation) {
- // Translate Codex (openai-responses) SSE → client's source format
- // Claude → claude, Antigravity/Gemini → antigravity, others → openai
- let codexTarget;
- if (sourceFormat === FORMATS.CLAUDE) codexTarget = FORMATS.CLAUDE;
- else if (sourceFormat === FORMATS.ANTIGRAVITY || sourceFormat === FORMATS.GEMINI || sourceFormat === FORMATS.GEMINI_CLI) codexTarget = FORMATS.ANTIGRAVITY;
- else codexTarget = FORMATS.OPENAI;
- log?.debug?.("STREAM", `Codex translation mode: openai-responses → ${codexTarget}`);
- transformStream = createSSETransformStreamWithLogger('openai-responses', codexTarget, provider, reqLogger, toolNameMap, model, connectionId, body, onStreamComplete, apiKey);
- } else if (needsTranslation(targetFormat, sourceFormat)) {
- log?.debug?.("STREAM", `Translation mode: ${targetFormat} → ${sourceFormat}`);
- transformStream = createSSETransformStreamWithLogger(targetFormat, sourceFormat, provider, reqLogger, toolNameMap, model, connectionId, body, onStreamComplete, apiKey);
- } else {
- log?.debug?.("STREAM", `Standard passthrough mode`);
- transformStream = createPassthroughStreamWithLogger(provider, reqLogger, model, connectionId, body, onStreamComplete, apiKey);
- }
-
- const transformedBody = pipeWithDisconnect(providerResponse, transformStream, streamController);
-
- const totalLatency = Date.now() - requestStartTime;
- const streamingDetail = {
- provider: provider || "unknown",
- model: model || "unknown",
- connectionId: connectionId || undefined,
- timestamp: new Date().toISOString(),
- latency: {
- ttft: 0,
- total: Date.now() - requestStartTime
- },
- tokens: { prompt_tokens: 0, completion_tokens: 0 },
- request: extractRequestConfig(body, stream),
- providerRequest: finalBody || translatedBody || null,
- providerResponse: "[Streaming - raw response not captured]",
- response: {
- content: "[Streaming in progress...]",
- thinking: null,
- type: "streaming"
- },
- status: "success",
- id: streamDetailId
- };
-
- saveRequestDetail(streamingDetail).catch(err => {
- console.error("[RequestDetail] Failed to save streaming request:", err.message);
- });
-
- return {
- success: true,
- response: new Response(transformedBody, {
- headers: responseHeaders
- })
- };
+ const { onStreamComplete } = buildOnStreamComplete({ ...sharedCtx });
+ return handleStreamingResponse({ ...sharedCtx, providerResponse, sourceFormat, targetFormat, userAgent, reqLogger, toolNameMap, streamController, onStreamComplete });
}
-/**
- * Check if token is expired or about to expire
- */
export function isTokenExpiringSoon(expiresAt, bufferMs = 5 * 60 * 1000) {
if (!expiresAt) return false;
- const expiresAtMs = new Date(expiresAt).getTime();
- return expiresAtMs - Date.now() < bufferMs;
+ return new Date(expiresAt).getTime() - Date.now() < bufferMs;
}
diff --git a/open-sse/handlers/chatCore/nonStreamingHandler.js b/open-sse/handlers/chatCore/nonStreamingHandler.js
new file mode 100644
index 0000000..190def6
--- /dev/null
+++ b/open-sse/handlers/chatCore/nonStreamingHandler.js
@@ -0,0 +1,194 @@
+import { FORMATS } from "../../translator/formats.js";
+import { needsTranslation } from "../../translator/index.js";
+import { addBufferToUsage, filterUsageForFormat } from "../../utils/usageTracking.js";
+import { createErrorResult } from "../../utils/error.js";
+import { HTTP_STATUS } from "../../config/constants.js";
+import { parseSSEToOpenAIResponse } from "./sseToJsonHandler.js";
+import { buildRequestDetail, extractRequestConfig, extractUsageFromResponse, saveUsageStats } from "./requestDetail.js";
+import { appendRequestLog, saveRequestDetail } from "@/lib/usageDb.js";
+
+/**
+ * Translate non-streaming response body from provider format → OpenAI format.
+ */
+export function translateNonStreamingResponse(responseBody, targetFormat, sourceFormat) {
+ if (targetFormat === sourceFormat || targetFormat === FORMATS.OPENAI) return responseBody;
+
+ // Gemini / Antigravity
+ if (targetFormat === FORMATS.GEMINI || targetFormat === FORMATS.ANTIGRAVITY || targetFormat === FORMATS.GEMINI_CLI) {
+ const response = responseBody.response || responseBody;
+ if (!response?.candidates?.[0]) return responseBody;
+
+ const candidate = response.candidates[0];
+ const content = candidate.content;
+ const usage = response.usageMetadata || responseBody.usageMetadata;
+ let textContent = "", reasoningContent = "";
+ const toolCalls = [];
+
+ if (content?.parts) {
+ for (const part of content.parts) {
+ if (part.thought === true && part.text) reasoningContent += part.text;
+ else if (part.text !== undefined) textContent += part.text;
+ if (part.functionCall) {
+ toolCalls.push({
+ id: `call_${part.functionCall.name}_${Date.now()}_${toolCalls.length}`,
+ type: "function",
+ function: { name: part.functionCall.name, arguments: JSON.stringify(part.functionCall.args || {}) }
+ });
+ }
+ }
+ }
+
+ const message = { role: "assistant" };
+ if (textContent) message.content = textContent;
+ if (reasoningContent) message.reasoning_content = reasoningContent;
+ if (toolCalls.length > 0) message.tool_calls = toolCalls;
+ if (!message.content && !message.tool_calls) message.content = "";
+
+ let finishReason = (candidate.finishReason || "stop").toLowerCase();
+ if (finishReason === "stop" && toolCalls.length > 0) finishReason = "tool_calls";
+
+ const result = {
+ id: `chatcmpl-${response.responseId || Date.now()}`,
+ object: "chat.completion",
+ created: Math.floor(new Date(response.createTime || Date.now()).getTime() / 1000),
+ model: response.modelVersion || "gemini",
+ choices: [{ index: 0, message, finish_reason: finishReason }]
+ };
+
+ if (usage) {
+ result.usage = {
+ prompt_tokens: (usage.promptTokenCount || 0) + (usage.thoughtsTokenCount || 0),
+ completion_tokens: usage.candidatesTokenCount || 0,
+ total_tokens: usage.totalTokenCount || 0
+ };
+ if (usage.thoughtsTokenCount > 0) {
+ result.usage.completion_tokens_details = { reasoning_tokens: usage.thoughtsTokenCount };
+ }
+ }
+ return result;
+ }
+
+ // Claude
+ if (targetFormat === FORMATS.CLAUDE) {
+ if (!responseBody.content) return responseBody;
+
+ let textContent = "", thinkingContent = "";
+ const toolCalls = [];
+
+ for (const block of responseBody.content) {
+ if (block.type === "text") textContent += block.text;
+ else if (block.type === "thinking") thinkingContent += block.thinking || "";
+ else if (block.type === "tool_use") {
+ toolCalls.push({ id: block.id, type: "function", function: { name: block.name, arguments: JSON.stringify(block.input || {}) } });
+ }
+ }
+
+ const message = { role: "assistant" };
+ if (textContent) message.content = textContent;
+ if (thinkingContent) message.reasoning_content = thinkingContent;
+ if (toolCalls.length > 0) message.tool_calls = toolCalls;
+ if (!message.content && !message.tool_calls) message.content = "";
+
+ let finishReason = responseBody.stop_reason || "stop";
+ if (finishReason === "end_turn") finishReason = "stop";
+ if (finishReason === "tool_use") finishReason = "tool_calls";
+
+ const result = {
+ id: `chatcmpl-${responseBody.id || Date.now()}`,
+ object: "chat.completion",
+ created: Math.floor(Date.now() / 1000),
+ model: responseBody.model || "claude",
+ choices: [{ index: 0, message, finish_reason: finishReason }]
+ };
+
+ if (responseBody.usage) {
+ result.usage = {
+ prompt_tokens: responseBody.usage.input_tokens || 0,
+ completion_tokens: responseBody.usage.output_tokens || 0,
+ total_tokens: (responseBody.usage.input_tokens || 0) + (responseBody.usage.output_tokens || 0)
+ };
+ }
+ return result;
+ }
+
+ return responseBody;
+}
+
+/**
+ * Handle non-streaming response from provider.
+ */
+export async function handleNonStreamingResponse({ providerResponse, provider, model, sourceFormat, targetFormat, body, stream, translatedBody, finalBody, requestStartTime, connectionId, apiKey, clientRawRequest, onRequestSuccess, reqLogger, trackDone, appendLog }) {
+ trackDone();
+ const contentType = providerResponse.headers.get("content-type") || "";
+ let responseBody;
+
+ if (contentType.includes("text/event-stream")) {
+ const sseText = await providerResponse.text();
+ const parsed = parseSSEToOpenAIResponse(sseText, model);
+ if (!parsed) {
+ appendLog({ status: `FAILED ${HTTP_STATUS.BAD_GATEWAY}` });
+ return createErrorResult(HTTP_STATUS.BAD_GATEWAY, "Invalid SSE response for non-streaming request");
+ }
+ responseBody = parsed;
+ } else {
+ try {
+ responseBody = await providerResponse.json();
+ } catch (err) {
+ appendLog({ status: `FAILED ${HTTP_STATUS.BAD_GATEWAY}` });
+ console.error(`[ChatCore] Failed to parse JSON from ${provider}:`, err.message);
+ return createErrorResult(HTTP_STATUS.BAD_GATEWAY, `Invalid JSON response from ${provider}`);
+ }
+ }
+
+ reqLogger.logProviderResponse(providerResponse.status, providerResponse.statusText, providerResponse.headers, responseBody);
+ if (onRequestSuccess) await onRequestSuccess();
+
+ const usage = extractUsageFromResponse(responseBody);
+ appendLog({ tokens: usage, status: "200 OK" });
+ saveUsageStats({ provider, model, tokens: usage, connectionId, apiKey, endpoint: clientRawRequest?.endpoint });
+
+ const translatedResponse = needsTranslation(targetFormat, sourceFormat)
+ ? translateNonStreamingResponse(responseBody, targetFormat, sourceFormat)
+ : responseBody;
+
+ // Ensure OpenAI-required fields
+ if (!translatedResponse.object) translatedResponse.object = "chat.completion";
+ if (!translatedResponse.created) translatedResponse.created = Math.floor(Date.now() / 1000);
+
+ // Strip Azure-specific fields
+ delete translatedResponse.prompt_filter_results;
+ if (translatedResponse?.choices) {
+ for (const choice of translatedResponse.choices) delete choice.content_filter_results;
+ }
+
+ if (translatedResponse?.usage) {
+ translatedResponse.usage = filterUsageForFormat(addBufferToUsage(translatedResponse.usage), sourceFormat);
+ }
+
+ reqLogger.logConvertedResponse(translatedResponse);
+
+ const totalLatency = Date.now() - requestStartTime;
+ saveRequestDetail(buildRequestDetail({
+ provider, model, connectionId,
+ latency: { ttft: totalLatency, total: totalLatency },
+ tokens: usage || { prompt_tokens: 0, completion_tokens: 0 },
+ request: extractRequestConfig(body, stream),
+ providerRequest: finalBody || translatedBody || null,
+ providerResponse: responseBody || null,
+ response: {
+ content: translatedResponse?.choices?.[0]?.message?.content || translatedResponse?.content || null,
+ thinking: translatedResponse?.choices?.[0]?.message?.reasoning_content || translatedResponse?.reasoning_content || null,
+ finish_reason: translatedResponse?.choices?.[0]?.finish_reason || "unknown"
+ },
+ status: "success"
+ }, { endpoint: clientRawRequest?.endpoint || null })).catch(err => {
+ console.error("[RequestDetail] Failed to save:", err.message);
+ });
+
+ return {
+ success: true,
+ response: new Response(JSON.stringify(translatedResponse), {
+ headers: { "Content-Type": "application/json", "Access-Control-Allow-Origin": "*" }
+ })
+ };
+}
diff --git a/open-sse/handlers/chatCore/requestDetail.js b/open-sse/handlers/chatCore/requestDetail.js
new file mode 100644
index 0000000..12db16f
--- /dev/null
+++ b/open-sse/handlers/chatCore/requestDetail.js
@@ -0,0 +1,102 @@
+import { saveRequestUsage, appendRequestLog, saveRequestDetail } from "@/lib/usageDb.js";
+import { COLORS } from "../../utils/stream.js";
+
+const OPTIONAL_PARAMS = [
+ "temperature", "top_p", "top_k",
+ "max_tokens", "max_completion_tokens",
+ "thinking", "reasoning", "enable_thinking",
+ "presence_penalty", "frequency_penalty",
+ "seed", "stop", "tools", "tool_choice",
+ "response_format", "prediction", "store", "metadata",
+ "n", "logprobs", "top_logprobs", "logit_bias",
+ "user", "parallel_tool_calls"
+];
+
+export function extractRequestConfig(body, stream) {
+ const config = { messages: body.messages || [], model: body.model, stream };
+ for (const param of OPTIONAL_PARAMS) {
+ if (body[param] !== undefined) config[param] = body[param];
+ }
+ return config;
+}
+
+export function extractUsageFromResponse(responseBody) {
+ if (!responseBody || typeof responseBody !== "object") return null;
+
+ // Claude format
+ if (responseBody.usage?.input_tokens !== undefined) {
+ return {
+ prompt_tokens: responseBody.usage.input_tokens || 0,
+ completion_tokens: responseBody.usage.output_tokens || 0,
+ cache_read_input_tokens: responseBody.usage.cache_read_input_tokens,
+ cache_creation_input_tokens: responseBody.usage.cache_creation_input_tokens
+ };
+ }
+
+ // OpenAI format
+ if (responseBody.usage?.prompt_tokens !== undefined) {
+ return {
+ prompt_tokens: responseBody.usage.prompt_tokens || 0,
+ completion_tokens: responseBody.usage.completion_tokens || 0,
+ cached_tokens: responseBody.usage.prompt_tokens_details?.cached_tokens,
+ reasoning_tokens: responseBody.usage.completion_tokens_details?.reasoning_tokens
+ };
+ }
+
+ // Gemini format
+ if (responseBody.usageMetadata) {
+ return {
+ prompt_tokens: responseBody.usageMetadata.promptTokenCount || 0,
+ completion_tokens: responseBody.usageMetadata.candidatesTokenCount || 0,
+ reasoning_tokens: responseBody.usageMetadata.thoughtsTokenCount
+ };
+ }
+
+ return null;
+}
+
+export function buildRequestDetail(base, overrides = {}) {
+ return {
+ provider: base.provider || "unknown",
+ model: base.model || "unknown",
+ connectionId: base.connectionId || undefined,
+ timestamp: new Date().toISOString(),
+ latency: base.latency || { ttft: 0, total: 0 },
+ tokens: base.tokens || { prompt_tokens: 0, completion_tokens: 0 },
+ request: base.request,
+ providerRequest: base.providerRequest || null,
+ providerResponse: base.providerResponse || null,
+ response: base.response || {},
+ status: base.status || "success",
+ ...overrides
+ };
+}
+
+export function saveUsageStats({ provider, model, tokens, connectionId, apiKey, endpoint, label = "USAGE" }) {
+ if (!tokens || typeof tokens !== "object") return;
+
+ const inTokens = tokens.input_tokens ?? tokens.prompt_tokens ?? 0;
+ const outTokens = tokens.output_tokens ?? tokens.completion_tokens ?? 0;
+
+ if (inTokens === 0 && outTokens === 0) return;
+
+ const time = new Date().toLocaleTimeString("en-US", { hour12: false, hour: "2-digit", minute: "2-digit" });
+ const accountSuffix = connectionId ? ` | account=${connectionId.slice(0, 8)}...` : "";
+ console.log(`${COLORS.green}[${time}] 📊 [${label}] ${provider.toUpperCase()} | in=${inTokens} | out=${outTokens}${accountSuffix}${COLORS.reset}`);
+
+ // Normalize to OpenAI token shape for storage
+ const normalized = {
+ prompt_tokens: tokens.prompt_tokens ?? tokens.input_tokens ?? 0,
+ completion_tokens: tokens.completion_tokens ?? tokens.output_tokens ?? 0
+ };
+
+ saveRequestUsage({
+ provider: provider || "unknown",
+ model: model || "unknown",
+ tokens: normalized,
+ timestamp: new Date().toISOString(),
+ connectionId: connectionId || undefined,
+ apiKey: apiKey || undefined,
+ endpoint: endpoint || null
+ }).catch(() => {});
+}
diff --git a/open-sse/handlers/chatCore/sseToJsonHandler.js b/open-sse/handlers/chatCore/sseToJsonHandler.js
new file mode 100644
index 0000000..0c71be3
--- /dev/null
+++ b/open-sse/handlers/chatCore/sseToJsonHandler.js
@@ -0,0 +1,161 @@
+import { convertResponsesStreamToJson } from "../../transformer/streamToJsonConverter.js";
+import { createErrorResult } from "../../utils/error.js";
+import { HTTP_STATUS } from "../../config/constants.js";
+import { FORMATS } from "../../translator/formats.js";
+import { buildRequestDetail, extractRequestConfig, saveUsageStats } from "./requestDetail.js";
+import { saveRequestDetail, appendRequestLog } from "@/lib/usageDb.js";
+
+/**
+ * Parse OpenAI-style SSE text into a single chat completion JSON.
+ * Used when provider forces streaming but client wants non-streaming.
+ */
+export function parseSSEToOpenAIResponse(rawSSE, fallbackModel) {
+ const chunks = [];
+
+ for (const line of String(rawSSE || "").split("\n")) {
+ const trimmed = line.trim();
+ if (!trimmed.startsWith("data:")) continue;
+ const payload = trimmed.slice(5).trim();
+ if (!payload || payload === "[DONE]") continue;
+ try { chunks.push(JSON.parse(payload)); } catch { /* ignore malformed lines */ }
+ }
+
+ if (chunks.length === 0) return null;
+
+ const first = chunks[0];
+ const contentParts = [];
+ const reasoningParts = [];
+ let finishReason = "stop";
+ let usage = null;
+
+ for (const chunk of chunks) {
+ const choice = chunk?.choices?.[0];
+ const delta = choice?.delta || {};
+ if (typeof delta.content === "string" && delta.content.length > 0) contentParts.push(delta.content);
+ if (typeof delta.reasoning_content === "string" && delta.reasoning_content.length > 0) reasoningParts.push(delta.reasoning_content);
+ if (choice?.finish_reason) finishReason = choice.finish_reason;
+ if (chunk?.usage && typeof chunk.usage === "object") usage = chunk.usage;
+ }
+
+ const message = { role: "assistant", content: contentParts.join("") };
+ if (reasoningParts.length > 0) message.reasoning_content = reasoningParts.join("");
+
+ const result = {
+ id: first.id || `chatcmpl-${Date.now()}`,
+ object: "chat.completion",
+ created: first.created || Math.floor(Date.now() / 1000),
+ model: first.model || fallbackModel || "unknown",
+ choices: [{ index: 0, message, finish_reason: finishReason }]
+ };
+ if (usage) result.usage = usage;
+ return result;
+}
+
+/**
+ * Handle case: provider forced streaming but client wants JSON.
+ * Supports both Codex/Responses API SSE and standard Chat Completions SSE.
+ */
+export async function handleForcedSSEToJson({ providerResponse, sourceFormat, provider, model, body, stream, translatedBody, finalBody, requestStartTime, connectionId, apiKey, clientRawRequest, onRequestSuccess, trackDone, appendLog }) {
+ const contentType = providerResponse.headers.get("content-type") || "";
+ const isSSE = contentType.includes("text/event-stream") || (contentType === "" && provider === "codex");
+ if (!isSSE) return null; // not handled here
+
+ trackDone();
+
+ const ctx = {
+ provider, model, connectionId,
+ request: extractRequestConfig(body, stream),
+ providerRequest: finalBody || translatedBody || null
+ };
+
+ // Codex/Responses API SSE path
+ const isCodexResponsesApi = provider === "codex" || sourceFormat === FORMATS.OPENAI_RESPONSES;
+ if (isCodexResponsesApi) {
+ try {
+ const jsonResponse = await convertResponsesStreamToJson(providerResponse.body);
+ if (onRequestSuccess) await onRequestSuccess();
+
+ const usage = jsonResponse.usage || {};
+ appendLog({ tokens: usage, status: "200 OK" });
+ saveUsageStats({ provider, model, tokens: usage, connectionId, apiKey, endpoint: clientRawRequest?.endpoint });
+
+ const msgItem = jsonResponse.output?.find(item => item.type === "message");
+ const textContent = msgItem?.content?.find(c => c.type === "output_text")?.text || msgItem?.content?.[0]?.text || null;
+ const totalLatency = Date.now() - requestStartTime;
+
+ saveRequestDetail(buildRequestDetail({
+ ...ctx,
+ latency: { ttft: totalLatency, total: totalLatency },
+ tokens: { prompt_tokens: usage.input_tokens || 0, completion_tokens: usage.output_tokens || 0 },
+ response: { content: textContent, thinking: null, finish_reason: jsonResponse.status || "unknown" },
+ status: "success"
+ }, { endpoint: clientRawRequest?.endpoint || null })).catch(() => {});
+
+ // Client is Responses API → return as-is
+ if (sourceFormat === FORMATS.OPENAI_RESPONSES) {
+ return { success: true, response: new Response(JSON.stringify(jsonResponse), { headers: { "Content-Type": "application/json", "Access-Control-Allow-Origin": "*" } }) };
+ }
+
+ // Build client-format response
+ const inTokens = usage.input_tokens || 0;
+ const outTokens = usage.output_tokens || 0;
+ let finalResp;
+
+ if (sourceFormat === FORMATS.ANTIGRAVITY || sourceFormat === FORMATS.GEMINI || sourceFormat === FORMATS.GEMINI_CLI) {
+ finalResp = {
+ response: {
+ candidates: [{ content: { role: "model", parts: [{ text: textContent || "" }] }, finishReason: "STOP", index: 0 }],
+ usageMetadata: { promptTokenCount: inTokens, candidatesTokenCount: outTokens, totalTokenCount: inTokens + outTokens },
+ modelVersion: model,
+ responseId: jsonResponse.id || `resp_${Date.now()}`
+ }
+ };
+ } else {
+ finalResp = {
+ id: jsonResponse.id || `chatcmpl-${Date.now()}`,
+ object: "chat.completion",
+ created: jsonResponse.created_at || Math.floor(Date.now() / 1000),
+ model: jsonResponse.model || model,
+ choices: [{ index: 0, message: { role: "assistant", content: textContent || "" }, finish_reason: jsonResponse.status === "completed" ? "stop" : (jsonResponse.status || "stop") }],
+ usage: { prompt_tokens: inTokens, completion_tokens: outTokens, total_tokens: inTokens + outTokens }
+ };
+ }
+
+ return { success: true, response: new Response(JSON.stringify(finalResp), { headers: { "Content-Type": "application/json", "Access-Control-Allow-Origin": "*" } }) };
+ } catch (err) {
+ console.error("[ChatCore] Responses API SSE→JSON failed:", err);
+ return createErrorResult(HTTP_STATUS.BAD_GATEWAY, "Failed to convert streaming response to JSON");
+ }
+ }
+
+ // Standard Chat Completions SSE path
+ try {
+ const sseText = await providerResponse.text();
+ const parsed = parseSSEToOpenAIResponse(sseText, model);
+ if (!parsed) return createErrorResult(HTTP_STATUS.BAD_GATEWAY, "Invalid SSE response for non-streaming request");
+
+ if (onRequestSuccess) await onRequestSuccess();
+
+ const usage = parsed.usage || {};
+ appendLog({ tokens: usage, status: "200 OK" });
+ saveUsageStats({ provider, model, tokens: usage, connectionId, apiKey, endpoint: clientRawRequest?.endpoint });
+
+ const totalLatency = Date.now() - requestStartTime;
+ saveRequestDetail(buildRequestDetail({
+ ...ctx,
+ latency: { ttft: totalLatency, total: totalLatency },
+ tokens: usage,
+ response: {
+ content: parsed.choices?.[0]?.message?.content || null,
+ thinking: parsed.choices?.[0]?.message?.reasoning_content || null,
+ finish_reason: parsed.choices?.[0]?.finish_reason || "unknown"
+ },
+ status: "success"
+ }, { endpoint: clientRawRequest?.endpoint || null })).catch(() => {});
+
+ return { success: true, response: new Response(JSON.stringify(parsed), { headers: { "Content-Type": "application/json", "Access-Control-Allow-Origin": "*" } }) };
+ } catch (err) {
+ console.error("[ChatCore] Chat Completions SSE→JSON failed:", err);
+ return createErrorResult(HTTP_STATUS.BAD_GATEWAY, "Failed to convert streaming response to JSON");
+ }
+}
diff --git a/open-sse/handlers/chatCore/streamingHandler.js b/open-sse/handlers/chatCore/streamingHandler.js
new file mode 100644
index 0000000..415c9b8
--- /dev/null
+++ b/open-sse/handlers/chatCore/streamingHandler.js
@@ -0,0 +1,97 @@
+import { FORMATS } from "../../translator/formats.js";
+import { needsTranslation } from "../../translator/index.js";
+import { createSSETransformStreamWithLogger, createPassthroughStreamWithLogger } from "../../utils/stream.js";
+import { pipeWithDisconnect } from "../../utils/streamHandler.js";
+import { buildRequestDetail, extractRequestConfig, saveUsageStats } from "./requestDetail.js";
+import { saveRequestDetail } from "@/lib/usageDb.js";
+
+const SSE_HEADERS = {
+ "Content-Type": "text/event-stream",
+ "Cache-Control": "no-cache",
+ "Connection": "keep-alive",
+ "Access-Control-Allow-Origin": "*"
+};
+
+/**
+ * Determine which SSE transform stream to use based on provider/format.
+ */
+function buildTransformStream({ provider, sourceFormat, targetFormat, userAgent, reqLogger, toolNameMap, model, connectionId, body, onStreamComplete, apiKey }) {
+ const isDroidCLI = userAgent?.toLowerCase().includes("droid") || userAgent?.toLowerCase().includes("codex-cli");
+ const needsCodexTranslation = provider === "codex" && targetFormat === FORMATS.OPENAI_RESPONSES && !isDroidCLI;
+
+ if (needsCodexTranslation) {
+ // Codex returns Responses API SSE → translate to client format
+ let codexTarget;
+ if (sourceFormat === FORMATS.OPENAI_RESPONSES) codexTarget = FORMATS.OPENAI_RESPONSES;
+ else if (sourceFormat === FORMATS.CLAUDE) codexTarget = FORMATS.CLAUDE;
+ else if (sourceFormat === FORMATS.ANTIGRAVITY || sourceFormat === FORMATS.GEMINI || sourceFormat === FORMATS.GEMINI_CLI) codexTarget = FORMATS.ANTIGRAVITY;
+ else codexTarget = FORMATS.OPENAI;
+ return createSSETransformStreamWithLogger(FORMATS.OPENAI_RESPONSES, codexTarget, provider, reqLogger, toolNameMap, model, connectionId, body, onStreamComplete, apiKey);
+ }
+
+ if (needsTranslation(targetFormat, sourceFormat)) {
+ return createSSETransformStreamWithLogger(targetFormat, sourceFormat, provider, reqLogger, toolNameMap, model, connectionId, body, onStreamComplete, apiKey);
+ }
+
+ return createPassthroughStreamWithLogger(provider, reqLogger, model, connectionId, body, onStreamComplete, apiKey);
+}
+
+/**
+ * Handle streaming response — pipe provider SSE through transform stream to client.
+ */
+export function handleStreamingResponse({ providerResponse, provider, model, sourceFormat, targetFormat, userAgent, body, stream, translatedBody, finalBody, requestStartTime, connectionId, apiKey, clientRawRequest, onRequestSuccess, reqLogger, toolNameMap, streamController, onStreamComplete }) {
+ if (onRequestSuccess) onRequestSuccess();
+
+ const transformStream = buildTransformStream({ provider, sourceFormat, targetFormat, userAgent, reqLogger, toolNameMap, model, connectionId, body, onStreamComplete, apiKey });
+ const transformedBody = pipeWithDisconnect(providerResponse, transformStream, streamController);
+
+ const streamDetailId = `${Date.now()}-${Math.random().toString(36).slice(2, 11)}`;
+ saveRequestDetail(buildRequestDetail({
+ provider, model, connectionId,
+ latency: { ttft: 0, total: Date.now() - requestStartTime },
+ tokens: { prompt_tokens: 0, completion_tokens: 0 },
+ request: extractRequestConfig(body, stream),
+ providerRequest: finalBody || translatedBody || null,
+ providerResponse: "[Streaming - raw response not captured]",
+ response: { content: "[Streaming in progress...]", thinking: null, type: "streaming" },
+ status: "success"
+ }, { id: streamDetailId })).catch(err => {
+ console.error("[RequestDetail] Failed to save streaming request:", err.message);
+ });
+
+ return {
+ success: true,
+ response: new Response(transformedBody, { headers: SSE_HEADERS })
+ };
+}
+
+/**
+ * Build onStreamComplete callback for streaming usage tracking.
+ */
+export function buildOnStreamComplete({ provider, model, connectionId, apiKey, requestStartTime, body, stream, finalBody, translatedBody, clientRawRequest }) {
+ const streamDetailId = `${Date.now()}-${Math.random().toString(36).slice(2, 11)}`;
+
+ const onStreamComplete = (contentObj, usage, ttftAt) => {
+ const latency = {
+ ttft: ttftAt ? ttftAt - requestStartTime : Date.now() - requestStartTime,
+ total: Date.now() - requestStartTime
+ };
+
+ saveRequestDetail(buildRequestDetail({
+ provider, model, connectionId,
+ latency,
+ tokens: usage || { prompt_tokens: 0, completion_tokens: 0 },
+ request: extractRequestConfig(body, stream),
+ providerRequest: finalBody || translatedBody || null,
+ providerResponse: contentObj.content || "[Empty streaming response]",
+ response: { content: contentObj.content || "[Empty streaming response]", thinking: contentObj.thinking || null, type: "streaming" },
+ status: "success"
+ }, { id: streamDetailId })).catch(err => {
+ console.error("[RequestDetail] Failed to update streaming content:", err.message);
+ });
+
+ saveUsageStats({ provider, model, tokens: usage, connectionId, apiKey, endpoint: clientRawRequest?.endpoint, label: "STREAM USAGE" });
+ };
+
+ return { onStreamComplete, streamDetailId };
+}
diff --git a/open-sse/handlers/responsesHandler.js b/open-sse/handlers/responsesHandler.js
index c256985..823ff8c 100644
--- a/open-sse/handlers/responsesHandler.js
+++ b/open-sse/handlers/responsesHandler.js
@@ -32,7 +32,7 @@ export async function handleResponsesCore({ body, modelInfo, credentials, log, o
convertedBody.stream = false;
}
- // Call chat core handler
+ // Call chat core handler — force sourceFormat so streaming path knows this is a Responses API client
const result = await handleChatCore({
body: convertedBody,
modelInfo,
@@ -41,7 +41,8 @@ export async function handleResponsesCore({ body, modelInfo, credentials, log, o
onCredentialsRefreshed,
onRequestSuccess,
onDisconnect,
- connectionId
+ connectionId,
+ sourceFormatOverride: "openai-responses"
});
if (!result.success || !result.response) {
diff --git a/package.json b/package.json
index 0d0b44a..a82afae 100644
--- a/package.json
+++ b/package.json
@@ -1,6 +1,6 @@
{
"name": "9router-app",
- "version": "0.2.99",
+ "version": "0.3.1",
"description": "9Router web dashboard",
"private": true,
"scripts": {
diff --git a/src/app/(dashboard)/dashboard/providers/[id]/page.js b/src/app/(dashboard)/dashboard/providers/[id]/page.js
index 729730d..bd6ba05 100644
--- a/src/app/(dashboard)/dashboard/providers/[id]/page.js
+++ b/src/app/(dashboard)/dashboard/providers/[id]/page.js
@@ -24,6 +24,8 @@ export default function ProviderDetailPage() {
const [selectedConnection, setSelectedConnection] = useState(null);
const [modelAliases, setModelAliases] = useState({});
const [headerImgError, setHeaderImgError] = useState(false);
+ const [modelTestResults, setModelTestResults] = useState({});
+ const [testingModels, setTestingModels] = useState(false);
const { copied, copy } = useCopyToClipboard();
const providerInfo = providerNode
@@ -256,6 +258,27 @@ export default function ProviderDetailPage() {
}
};
+ const handleTestModels = async () => {
+ if (testingModels) return;
+ const conn = connections.find((c) => c.isActive !== false) || connections[0];
+ if (!conn) return;
+ setTestingModels(true);
+ setModelTestResults({});
+ try {
+ const res = await fetch(`/api/providers/${conn.id}/test-models`, { method: "POST" });
+ const data = await res.json();
+ if (res.ok) {
+ const map = {};
+ for (const r of data.results || []) map[r.modelId] = r.ok ? "ok" : "error";
+ setModelTestResults(map);
+ }
+ } catch {
+ // silent fail
+ } finally {
+ setTestingModels(false);
+ }
+ };
+
const renderModelsSection = () => {
if (isCompatible) {
return (
@@ -305,6 +328,7 @@ export default function ProviderDetailPage() {
onCopy={copy}
onSetAlias={(alias) => handleSetAlias(model.id, alias, providerStorageAlias)}
onDeleteAlias={() => handleDeleteAlias(existingAlias)}
+ testStatus={modelTestResults[model.id]}
/>
);
})}
@@ -494,11 +518,23 @@ export default function ProviderDetailPage() {
{/* Models */}
- {providerInfo.passthroughModels ? "Model Aliases" : "Available Models"}
-
+
+ {providerInfo.passthroughModels ? "Model Aliases" : "Available Models"}
+
+ {connections.length > 0 && (
+
+ )}
+
{fullModel}