- Refactor chatCore.js to streamline imports and remove unused functions.
- Fix streaming /v1/responses
This commit is contained in:
parent
25c2ad7360
commit
5954b8f4eb
10 changed files with 785 additions and 919 deletions
|
|
@ -113,7 +113,6 @@ export const PROVIDER_MODELS = {
|
|||
{ id: "claude-4.6-opus-max", name: "Claude 4.6 Opus Max" },
|
||||
{ id: "claude-4.6-sonnet-medium-thinking", name: "Claude 4.6 Sonnet Medium Thinking" },
|
||||
{ id: "kimi-k2.5", name: "Kimi K2.5" },
|
||||
{ id: "gemini-3.1-pro-preview", name: "Gemini 3.1 Pro Preview" },
|
||||
{ id: "gemini-3-flash-preview", name: "Gemini 3 Flash Preview" },
|
||||
{ id: "gpt-5.2", name: "GPT 5.2" },
|
||||
{ id: "gpt-5.3-codex", name: "GPT 5.3 Codex" },
|
||||
|
|
|
|||
File diff suppressed because it is too large
Load diff
194
open-sse/handlers/chatCore/nonStreamingHandler.js
Normal file
194
open-sse/handlers/chatCore/nonStreamingHandler.js
Normal file
|
|
@ -0,0 +1,194 @@
|
|||
import { FORMATS } from "../../translator/formats.js";
|
||||
import { needsTranslation } from "../../translator/index.js";
|
||||
import { addBufferToUsage, filterUsageForFormat } from "../../utils/usageTracking.js";
|
||||
import { createErrorResult } from "../../utils/error.js";
|
||||
import { HTTP_STATUS } from "../../config/constants.js";
|
||||
import { parseSSEToOpenAIResponse } from "./sseToJsonHandler.js";
|
||||
import { buildRequestDetail, extractRequestConfig, extractUsageFromResponse, saveUsageStats } from "./requestDetail.js";
|
||||
import { appendRequestLog, saveRequestDetail } from "@/lib/usageDb.js";
|
||||
|
||||
/**
|
||||
* Translate non-streaming response body from provider format → OpenAI format.
|
||||
*/
|
||||
export function translateNonStreamingResponse(responseBody, targetFormat, sourceFormat) {
|
||||
if (targetFormat === sourceFormat || targetFormat === FORMATS.OPENAI) return responseBody;
|
||||
|
||||
// Gemini / Antigravity
|
||||
if (targetFormat === FORMATS.GEMINI || targetFormat === FORMATS.ANTIGRAVITY || targetFormat === FORMATS.GEMINI_CLI) {
|
||||
const response = responseBody.response || responseBody;
|
||||
if (!response?.candidates?.[0]) return responseBody;
|
||||
|
||||
const candidate = response.candidates[0];
|
||||
const content = candidate.content;
|
||||
const usage = response.usageMetadata || responseBody.usageMetadata;
|
||||
let textContent = "", reasoningContent = "";
|
||||
const toolCalls = [];
|
||||
|
||||
if (content?.parts) {
|
||||
for (const part of content.parts) {
|
||||
if (part.thought === true && part.text) reasoningContent += part.text;
|
||||
else if (part.text !== undefined) textContent += part.text;
|
||||
if (part.functionCall) {
|
||||
toolCalls.push({
|
||||
id: `call_${part.functionCall.name}_${Date.now()}_${toolCalls.length}`,
|
||||
type: "function",
|
||||
function: { name: part.functionCall.name, arguments: JSON.stringify(part.functionCall.args || {}) }
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const message = { role: "assistant" };
|
||||
if (textContent) message.content = textContent;
|
||||
if (reasoningContent) message.reasoning_content = reasoningContent;
|
||||
if (toolCalls.length > 0) message.tool_calls = toolCalls;
|
||||
if (!message.content && !message.tool_calls) message.content = "";
|
||||
|
||||
let finishReason = (candidate.finishReason || "stop").toLowerCase();
|
||||
if (finishReason === "stop" && toolCalls.length > 0) finishReason = "tool_calls";
|
||||
|
||||
const result = {
|
||||
id: `chatcmpl-${response.responseId || Date.now()}`,
|
||||
object: "chat.completion",
|
||||
created: Math.floor(new Date(response.createTime || Date.now()).getTime() / 1000),
|
||||
model: response.modelVersion || "gemini",
|
||||
choices: [{ index: 0, message, finish_reason: finishReason }]
|
||||
};
|
||||
|
||||
if (usage) {
|
||||
result.usage = {
|
||||
prompt_tokens: (usage.promptTokenCount || 0) + (usage.thoughtsTokenCount || 0),
|
||||
completion_tokens: usage.candidatesTokenCount || 0,
|
||||
total_tokens: usage.totalTokenCount || 0
|
||||
};
|
||||
if (usage.thoughtsTokenCount > 0) {
|
||||
result.usage.completion_tokens_details = { reasoning_tokens: usage.thoughtsTokenCount };
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
// Claude
|
||||
if (targetFormat === FORMATS.CLAUDE) {
|
||||
if (!responseBody.content) return responseBody;
|
||||
|
||||
let textContent = "", thinkingContent = "";
|
||||
const toolCalls = [];
|
||||
|
||||
for (const block of responseBody.content) {
|
||||
if (block.type === "text") textContent += block.text;
|
||||
else if (block.type === "thinking") thinkingContent += block.thinking || "";
|
||||
else if (block.type === "tool_use") {
|
||||
toolCalls.push({ id: block.id, type: "function", function: { name: block.name, arguments: JSON.stringify(block.input || {}) } });
|
||||
}
|
||||
}
|
||||
|
||||
const message = { role: "assistant" };
|
||||
if (textContent) message.content = textContent;
|
||||
if (thinkingContent) message.reasoning_content = thinkingContent;
|
||||
if (toolCalls.length > 0) message.tool_calls = toolCalls;
|
||||
if (!message.content && !message.tool_calls) message.content = "";
|
||||
|
||||
let finishReason = responseBody.stop_reason || "stop";
|
||||
if (finishReason === "end_turn") finishReason = "stop";
|
||||
if (finishReason === "tool_use") finishReason = "tool_calls";
|
||||
|
||||
const result = {
|
||||
id: `chatcmpl-${responseBody.id || Date.now()}`,
|
||||
object: "chat.completion",
|
||||
created: Math.floor(Date.now() / 1000),
|
||||
model: responseBody.model || "claude",
|
||||
choices: [{ index: 0, message, finish_reason: finishReason }]
|
||||
};
|
||||
|
||||
if (responseBody.usage) {
|
||||
result.usage = {
|
||||
prompt_tokens: responseBody.usage.input_tokens || 0,
|
||||
completion_tokens: responseBody.usage.output_tokens || 0,
|
||||
total_tokens: (responseBody.usage.input_tokens || 0) + (responseBody.usage.output_tokens || 0)
|
||||
};
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
return responseBody;
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle non-streaming response from provider.
|
||||
*/
|
||||
export async function handleNonStreamingResponse({ providerResponse, provider, model, sourceFormat, targetFormat, body, stream, translatedBody, finalBody, requestStartTime, connectionId, apiKey, clientRawRequest, onRequestSuccess, reqLogger, trackDone, appendLog }) {
|
||||
trackDone();
|
||||
const contentType = providerResponse.headers.get("content-type") || "";
|
||||
let responseBody;
|
||||
|
||||
if (contentType.includes("text/event-stream")) {
|
||||
const sseText = await providerResponse.text();
|
||||
const parsed = parseSSEToOpenAIResponse(sseText, model);
|
||||
if (!parsed) {
|
||||
appendLog({ status: `FAILED ${HTTP_STATUS.BAD_GATEWAY}` });
|
||||
return createErrorResult(HTTP_STATUS.BAD_GATEWAY, "Invalid SSE response for non-streaming request");
|
||||
}
|
||||
responseBody = parsed;
|
||||
} else {
|
||||
try {
|
||||
responseBody = await providerResponse.json();
|
||||
} catch (err) {
|
||||
appendLog({ status: `FAILED ${HTTP_STATUS.BAD_GATEWAY}` });
|
||||
console.error(`[ChatCore] Failed to parse JSON from ${provider}:`, err.message);
|
||||
return createErrorResult(HTTP_STATUS.BAD_GATEWAY, `Invalid JSON response from ${provider}`);
|
||||
}
|
||||
}
|
||||
|
||||
reqLogger.logProviderResponse(providerResponse.status, providerResponse.statusText, providerResponse.headers, responseBody);
|
||||
if (onRequestSuccess) await onRequestSuccess();
|
||||
|
||||
const usage = extractUsageFromResponse(responseBody);
|
||||
appendLog({ tokens: usage, status: "200 OK" });
|
||||
saveUsageStats({ provider, model, tokens: usage, connectionId, apiKey, endpoint: clientRawRequest?.endpoint });
|
||||
|
||||
const translatedResponse = needsTranslation(targetFormat, sourceFormat)
|
||||
? translateNonStreamingResponse(responseBody, targetFormat, sourceFormat)
|
||||
: responseBody;
|
||||
|
||||
// Ensure OpenAI-required fields
|
||||
if (!translatedResponse.object) translatedResponse.object = "chat.completion";
|
||||
if (!translatedResponse.created) translatedResponse.created = Math.floor(Date.now() / 1000);
|
||||
|
||||
// Strip Azure-specific fields
|
||||
delete translatedResponse.prompt_filter_results;
|
||||
if (translatedResponse?.choices) {
|
||||
for (const choice of translatedResponse.choices) delete choice.content_filter_results;
|
||||
}
|
||||
|
||||
if (translatedResponse?.usage) {
|
||||
translatedResponse.usage = filterUsageForFormat(addBufferToUsage(translatedResponse.usage), sourceFormat);
|
||||
}
|
||||
|
||||
reqLogger.logConvertedResponse(translatedResponse);
|
||||
|
||||
const totalLatency = Date.now() - requestStartTime;
|
||||
saveRequestDetail(buildRequestDetail({
|
||||
provider, model, connectionId,
|
||||
latency: { ttft: totalLatency, total: totalLatency },
|
||||
tokens: usage || { prompt_tokens: 0, completion_tokens: 0 },
|
||||
request: extractRequestConfig(body, stream),
|
||||
providerRequest: finalBody || translatedBody || null,
|
||||
providerResponse: responseBody || null,
|
||||
response: {
|
||||
content: translatedResponse?.choices?.[0]?.message?.content || translatedResponse?.content || null,
|
||||
thinking: translatedResponse?.choices?.[0]?.message?.reasoning_content || translatedResponse?.reasoning_content || null,
|
||||
finish_reason: translatedResponse?.choices?.[0]?.finish_reason || "unknown"
|
||||
},
|
||||
status: "success"
|
||||
}, { endpoint: clientRawRequest?.endpoint || null })).catch(err => {
|
||||
console.error("[RequestDetail] Failed to save:", err.message);
|
||||
});
|
||||
|
||||
return {
|
||||
success: true,
|
||||
response: new Response(JSON.stringify(translatedResponse), {
|
||||
headers: { "Content-Type": "application/json", "Access-Control-Allow-Origin": "*" }
|
||||
})
|
||||
};
|
||||
}
|
||||
102
open-sse/handlers/chatCore/requestDetail.js
Normal file
102
open-sse/handlers/chatCore/requestDetail.js
Normal file
|
|
@ -0,0 +1,102 @@
|
|||
import { saveRequestUsage, appendRequestLog, saveRequestDetail } from "@/lib/usageDb.js";
|
||||
import { COLORS } from "../../utils/stream.js";
|
||||
|
||||
const OPTIONAL_PARAMS = [
|
||||
"temperature", "top_p", "top_k",
|
||||
"max_tokens", "max_completion_tokens",
|
||||
"thinking", "reasoning", "enable_thinking",
|
||||
"presence_penalty", "frequency_penalty",
|
||||
"seed", "stop", "tools", "tool_choice",
|
||||
"response_format", "prediction", "store", "metadata",
|
||||
"n", "logprobs", "top_logprobs", "logit_bias",
|
||||
"user", "parallel_tool_calls"
|
||||
];
|
||||
|
||||
export function extractRequestConfig(body, stream) {
|
||||
const config = { messages: body.messages || [], model: body.model, stream };
|
||||
for (const param of OPTIONAL_PARAMS) {
|
||||
if (body[param] !== undefined) config[param] = body[param];
|
||||
}
|
||||
return config;
|
||||
}
|
||||
|
||||
export function extractUsageFromResponse(responseBody) {
|
||||
if (!responseBody || typeof responseBody !== "object") return null;
|
||||
|
||||
// Claude format
|
||||
if (responseBody.usage?.input_tokens !== undefined) {
|
||||
return {
|
||||
prompt_tokens: responseBody.usage.input_tokens || 0,
|
||||
completion_tokens: responseBody.usage.output_tokens || 0,
|
||||
cache_read_input_tokens: responseBody.usage.cache_read_input_tokens,
|
||||
cache_creation_input_tokens: responseBody.usage.cache_creation_input_tokens
|
||||
};
|
||||
}
|
||||
|
||||
// OpenAI format
|
||||
if (responseBody.usage?.prompt_tokens !== undefined) {
|
||||
return {
|
||||
prompt_tokens: responseBody.usage.prompt_tokens || 0,
|
||||
completion_tokens: responseBody.usage.completion_tokens || 0,
|
||||
cached_tokens: responseBody.usage.prompt_tokens_details?.cached_tokens,
|
||||
reasoning_tokens: responseBody.usage.completion_tokens_details?.reasoning_tokens
|
||||
};
|
||||
}
|
||||
|
||||
// Gemini format
|
||||
if (responseBody.usageMetadata) {
|
||||
return {
|
||||
prompt_tokens: responseBody.usageMetadata.promptTokenCount || 0,
|
||||
completion_tokens: responseBody.usageMetadata.candidatesTokenCount || 0,
|
||||
reasoning_tokens: responseBody.usageMetadata.thoughtsTokenCount
|
||||
};
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
export function buildRequestDetail(base, overrides = {}) {
|
||||
return {
|
||||
provider: base.provider || "unknown",
|
||||
model: base.model || "unknown",
|
||||
connectionId: base.connectionId || undefined,
|
||||
timestamp: new Date().toISOString(),
|
||||
latency: base.latency || { ttft: 0, total: 0 },
|
||||
tokens: base.tokens || { prompt_tokens: 0, completion_tokens: 0 },
|
||||
request: base.request,
|
||||
providerRequest: base.providerRequest || null,
|
||||
providerResponse: base.providerResponse || null,
|
||||
response: base.response || {},
|
||||
status: base.status || "success",
|
||||
...overrides
|
||||
};
|
||||
}
|
||||
|
||||
export function saveUsageStats({ provider, model, tokens, connectionId, apiKey, endpoint, label = "USAGE" }) {
|
||||
if (!tokens || typeof tokens !== "object") return;
|
||||
|
||||
const inTokens = tokens.input_tokens ?? tokens.prompt_tokens ?? 0;
|
||||
const outTokens = tokens.output_tokens ?? tokens.completion_tokens ?? 0;
|
||||
|
||||
if (inTokens === 0 && outTokens === 0) return;
|
||||
|
||||
const time = new Date().toLocaleTimeString("en-US", { hour12: false, hour: "2-digit", minute: "2-digit" });
|
||||
const accountSuffix = connectionId ? ` | account=${connectionId.slice(0, 8)}...` : "";
|
||||
console.log(`${COLORS.green}[${time}] 📊 [${label}] ${provider.toUpperCase()} | in=${inTokens} | out=${outTokens}${accountSuffix}${COLORS.reset}`);
|
||||
|
||||
// Normalize to OpenAI token shape for storage
|
||||
const normalized = {
|
||||
prompt_tokens: tokens.prompt_tokens ?? tokens.input_tokens ?? 0,
|
||||
completion_tokens: tokens.completion_tokens ?? tokens.output_tokens ?? 0
|
||||
};
|
||||
|
||||
saveRequestUsage({
|
||||
provider: provider || "unknown",
|
||||
model: model || "unknown",
|
||||
tokens: normalized,
|
||||
timestamp: new Date().toISOString(),
|
||||
connectionId: connectionId || undefined,
|
||||
apiKey: apiKey || undefined,
|
||||
endpoint: endpoint || null
|
||||
}).catch(() => {});
|
||||
}
|
||||
161
open-sse/handlers/chatCore/sseToJsonHandler.js
Normal file
161
open-sse/handlers/chatCore/sseToJsonHandler.js
Normal file
|
|
@ -0,0 +1,161 @@
|
|||
import { convertResponsesStreamToJson } from "../../transformer/streamToJsonConverter.js";
|
||||
import { createErrorResult } from "../../utils/error.js";
|
||||
import { HTTP_STATUS } from "../../config/constants.js";
|
||||
import { FORMATS } from "../../translator/formats.js";
|
||||
import { buildRequestDetail, extractRequestConfig, saveUsageStats } from "./requestDetail.js";
|
||||
import { saveRequestDetail, appendRequestLog } from "@/lib/usageDb.js";
|
||||
|
||||
/**
|
||||
* Parse OpenAI-style SSE text into a single chat completion JSON.
|
||||
* Used when provider forces streaming but client wants non-streaming.
|
||||
*/
|
||||
export function parseSSEToOpenAIResponse(rawSSE, fallbackModel) {
|
||||
const chunks = [];
|
||||
|
||||
for (const line of String(rawSSE || "").split("\n")) {
|
||||
const trimmed = line.trim();
|
||||
if (!trimmed.startsWith("data:")) continue;
|
||||
const payload = trimmed.slice(5).trim();
|
||||
if (!payload || payload === "[DONE]") continue;
|
||||
try { chunks.push(JSON.parse(payload)); } catch { /* ignore malformed lines */ }
|
||||
}
|
||||
|
||||
if (chunks.length === 0) return null;
|
||||
|
||||
const first = chunks[0];
|
||||
const contentParts = [];
|
||||
const reasoningParts = [];
|
||||
let finishReason = "stop";
|
||||
let usage = null;
|
||||
|
||||
for (const chunk of chunks) {
|
||||
const choice = chunk?.choices?.[0];
|
||||
const delta = choice?.delta || {};
|
||||
if (typeof delta.content === "string" && delta.content.length > 0) contentParts.push(delta.content);
|
||||
if (typeof delta.reasoning_content === "string" && delta.reasoning_content.length > 0) reasoningParts.push(delta.reasoning_content);
|
||||
if (choice?.finish_reason) finishReason = choice.finish_reason;
|
||||
if (chunk?.usage && typeof chunk.usage === "object") usage = chunk.usage;
|
||||
}
|
||||
|
||||
const message = { role: "assistant", content: contentParts.join("") };
|
||||
if (reasoningParts.length > 0) message.reasoning_content = reasoningParts.join("");
|
||||
|
||||
const result = {
|
||||
id: first.id || `chatcmpl-${Date.now()}`,
|
||||
object: "chat.completion",
|
||||
created: first.created || Math.floor(Date.now() / 1000),
|
||||
model: first.model || fallbackModel || "unknown",
|
||||
choices: [{ index: 0, message, finish_reason: finishReason }]
|
||||
};
|
||||
if (usage) result.usage = usage;
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle case: provider forced streaming but client wants JSON.
|
||||
* Supports both Codex/Responses API SSE and standard Chat Completions SSE.
|
||||
*/
|
||||
export async function handleForcedSSEToJson({ providerResponse, sourceFormat, provider, model, body, stream, translatedBody, finalBody, requestStartTime, connectionId, apiKey, clientRawRequest, onRequestSuccess, trackDone, appendLog }) {
|
||||
const contentType = providerResponse.headers.get("content-type") || "";
|
||||
const isSSE = contentType.includes("text/event-stream") || (contentType === "" && provider === "codex");
|
||||
if (!isSSE) return null; // not handled here
|
||||
|
||||
trackDone();
|
||||
|
||||
const ctx = {
|
||||
provider, model, connectionId,
|
||||
request: extractRequestConfig(body, stream),
|
||||
providerRequest: finalBody || translatedBody || null
|
||||
};
|
||||
|
||||
// Codex/Responses API SSE path
|
||||
const isCodexResponsesApi = provider === "codex" || sourceFormat === FORMATS.OPENAI_RESPONSES;
|
||||
if (isCodexResponsesApi) {
|
||||
try {
|
||||
const jsonResponse = await convertResponsesStreamToJson(providerResponse.body);
|
||||
if (onRequestSuccess) await onRequestSuccess();
|
||||
|
||||
const usage = jsonResponse.usage || {};
|
||||
appendLog({ tokens: usage, status: "200 OK" });
|
||||
saveUsageStats({ provider, model, tokens: usage, connectionId, apiKey, endpoint: clientRawRequest?.endpoint });
|
||||
|
||||
const msgItem = jsonResponse.output?.find(item => item.type === "message");
|
||||
const textContent = msgItem?.content?.find(c => c.type === "output_text")?.text || msgItem?.content?.[0]?.text || null;
|
||||
const totalLatency = Date.now() - requestStartTime;
|
||||
|
||||
saveRequestDetail(buildRequestDetail({
|
||||
...ctx,
|
||||
latency: { ttft: totalLatency, total: totalLatency },
|
||||
tokens: { prompt_tokens: usage.input_tokens || 0, completion_tokens: usage.output_tokens || 0 },
|
||||
response: { content: textContent, thinking: null, finish_reason: jsonResponse.status || "unknown" },
|
||||
status: "success"
|
||||
}, { endpoint: clientRawRequest?.endpoint || null })).catch(() => {});
|
||||
|
||||
// Client is Responses API → return as-is
|
||||
if (sourceFormat === FORMATS.OPENAI_RESPONSES) {
|
||||
return { success: true, response: new Response(JSON.stringify(jsonResponse), { headers: { "Content-Type": "application/json", "Access-Control-Allow-Origin": "*" } }) };
|
||||
}
|
||||
|
||||
// Build client-format response
|
||||
const inTokens = usage.input_tokens || 0;
|
||||
const outTokens = usage.output_tokens || 0;
|
||||
let finalResp;
|
||||
|
||||
if (sourceFormat === FORMATS.ANTIGRAVITY || sourceFormat === FORMATS.GEMINI || sourceFormat === FORMATS.GEMINI_CLI) {
|
||||
finalResp = {
|
||||
response: {
|
||||
candidates: [{ content: { role: "model", parts: [{ text: textContent || "" }] }, finishReason: "STOP", index: 0 }],
|
||||
usageMetadata: { promptTokenCount: inTokens, candidatesTokenCount: outTokens, totalTokenCount: inTokens + outTokens },
|
||||
modelVersion: model,
|
||||
responseId: jsonResponse.id || `resp_${Date.now()}`
|
||||
}
|
||||
};
|
||||
} else {
|
||||
finalResp = {
|
||||
id: jsonResponse.id || `chatcmpl-${Date.now()}`,
|
||||
object: "chat.completion",
|
||||
created: jsonResponse.created_at || Math.floor(Date.now() / 1000),
|
||||
model: jsonResponse.model || model,
|
||||
choices: [{ index: 0, message: { role: "assistant", content: textContent || "" }, finish_reason: jsonResponse.status === "completed" ? "stop" : (jsonResponse.status || "stop") }],
|
||||
usage: { prompt_tokens: inTokens, completion_tokens: outTokens, total_tokens: inTokens + outTokens }
|
||||
};
|
||||
}
|
||||
|
||||
return { success: true, response: new Response(JSON.stringify(finalResp), { headers: { "Content-Type": "application/json", "Access-Control-Allow-Origin": "*" } }) };
|
||||
} catch (err) {
|
||||
console.error("[ChatCore] Responses API SSE→JSON failed:", err);
|
||||
return createErrorResult(HTTP_STATUS.BAD_GATEWAY, "Failed to convert streaming response to JSON");
|
||||
}
|
||||
}
|
||||
|
||||
// Standard Chat Completions SSE path
|
||||
try {
|
||||
const sseText = await providerResponse.text();
|
||||
const parsed = parseSSEToOpenAIResponse(sseText, model);
|
||||
if (!parsed) return createErrorResult(HTTP_STATUS.BAD_GATEWAY, "Invalid SSE response for non-streaming request");
|
||||
|
||||
if (onRequestSuccess) await onRequestSuccess();
|
||||
|
||||
const usage = parsed.usage || {};
|
||||
appendLog({ tokens: usage, status: "200 OK" });
|
||||
saveUsageStats({ provider, model, tokens: usage, connectionId, apiKey, endpoint: clientRawRequest?.endpoint });
|
||||
|
||||
const totalLatency = Date.now() - requestStartTime;
|
||||
saveRequestDetail(buildRequestDetail({
|
||||
...ctx,
|
||||
latency: { ttft: totalLatency, total: totalLatency },
|
||||
tokens: usage,
|
||||
response: {
|
||||
content: parsed.choices?.[0]?.message?.content || null,
|
||||
thinking: parsed.choices?.[0]?.message?.reasoning_content || null,
|
||||
finish_reason: parsed.choices?.[0]?.finish_reason || "unknown"
|
||||
},
|
||||
status: "success"
|
||||
}, { endpoint: clientRawRequest?.endpoint || null })).catch(() => {});
|
||||
|
||||
return { success: true, response: new Response(JSON.stringify(parsed), { headers: { "Content-Type": "application/json", "Access-Control-Allow-Origin": "*" } }) };
|
||||
} catch (err) {
|
||||
console.error("[ChatCore] Chat Completions SSE→JSON failed:", err);
|
||||
return createErrorResult(HTTP_STATUS.BAD_GATEWAY, "Failed to convert streaming response to JSON");
|
||||
}
|
||||
}
|
||||
97
open-sse/handlers/chatCore/streamingHandler.js
Normal file
97
open-sse/handlers/chatCore/streamingHandler.js
Normal file
|
|
@ -0,0 +1,97 @@
|
|||
import { FORMATS } from "../../translator/formats.js";
|
||||
import { needsTranslation } from "../../translator/index.js";
|
||||
import { createSSETransformStreamWithLogger, createPassthroughStreamWithLogger } from "../../utils/stream.js";
|
||||
import { pipeWithDisconnect } from "../../utils/streamHandler.js";
|
||||
import { buildRequestDetail, extractRequestConfig, saveUsageStats } from "./requestDetail.js";
|
||||
import { saveRequestDetail } from "@/lib/usageDb.js";
|
||||
|
||||
const SSE_HEADERS = {
|
||||
"Content-Type": "text/event-stream",
|
||||
"Cache-Control": "no-cache",
|
||||
"Connection": "keep-alive",
|
||||
"Access-Control-Allow-Origin": "*"
|
||||
};
|
||||
|
||||
/**
|
||||
* Determine which SSE transform stream to use based on provider/format.
|
||||
*/
|
||||
function buildTransformStream({ provider, sourceFormat, targetFormat, userAgent, reqLogger, toolNameMap, model, connectionId, body, onStreamComplete, apiKey }) {
|
||||
const isDroidCLI = userAgent?.toLowerCase().includes("droid") || userAgent?.toLowerCase().includes("codex-cli");
|
||||
const needsCodexTranslation = provider === "codex" && targetFormat === FORMATS.OPENAI_RESPONSES && !isDroidCLI;
|
||||
|
||||
if (needsCodexTranslation) {
|
||||
// Codex returns Responses API SSE → translate to client format
|
||||
let codexTarget;
|
||||
if (sourceFormat === FORMATS.OPENAI_RESPONSES) codexTarget = FORMATS.OPENAI_RESPONSES;
|
||||
else if (sourceFormat === FORMATS.CLAUDE) codexTarget = FORMATS.CLAUDE;
|
||||
else if (sourceFormat === FORMATS.ANTIGRAVITY || sourceFormat === FORMATS.GEMINI || sourceFormat === FORMATS.GEMINI_CLI) codexTarget = FORMATS.ANTIGRAVITY;
|
||||
else codexTarget = FORMATS.OPENAI;
|
||||
return createSSETransformStreamWithLogger(FORMATS.OPENAI_RESPONSES, codexTarget, provider, reqLogger, toolNameMap, model, connectionId, body, onStreamComplete, apiKey);
|
||||
}
|
||||
|
||||
if (needsTranslation(targetFormat, sourceFormat)) {
|
||||
return createSSETransformStreamWithLogger(targetFormat, sourceFormat, provider, reqLogger, toolNameMap, model, connectionId, body, onStreamComplete, apiKey);
|
||||
}
|
||||
|
||||
return createPassthroughStreamWithLogger(provider, reqLogger, model, connectionId, body, onStreamComplete, apiKey);
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle streaming response — pipe provider SSE through transform stream to client.
|
||||
*/
|
||||
export function handleStreamingResponse({ providerResponse, provider, model, sourceFormat, targetFormat, userAgent, body, stream, translatedBody, finalBody, requestStartTime, connectionId, apiKey, clientRawRequest, onRequestSuccess, reqLogger, toolNameMap, streamController, onStreamComplete }) {
|
||||
if (onRequestSuccess) onRequestSuccess();
|
||||
|
||||
const transformStream = buildTransformStream({ provider, sourceFormat, targetFormat, userAgent, reqLogger, toolNameMap, model, connectionId, body, onStreamComplete, apiKey });
|
||||
const transformedBody = pipeWithDisconnect(providerResponse, transformStream, streamController);
|
||||
|
||||
const streamDetailId = `${Date.now()}-${Math.random().toString(36).slice(2, 11)}`;
|
||||
saveRequestDetail(buildRequestDetail({
|
||||
provider, model, connectionId,
|
||||
latency: { ttft: 0, total: Date.now() - requestStartTime },
|
||||
tokens: { prompt_tokens: 0, completion_tokens: 0 },
|
||||
request: extractRequestConfig(body, stream),
|
||||
providerRequest: finalBody || translatedBody || null,
|
||||
providerResponse: "[Streaming - raw response not captured]",
|
||||
response: { content: "[Streaming in progress...]", thinking: null, type: "streaming" },
|
||||
status: "success"
|
||||
}, { id: streamDetailId })).catch(err => {
|
||||
console.error("[RequestDetail] Failed to save streaming request:", err.message);
|
||||
});
|
||||
|
||||
return {
|
||||
success: true,
|
||||
response: new Response(transformedBody, { headers: SSE_HEADERS })
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Build onStreamComplete callback for streaming usage tracking.
|
||||
*/
|
||||
export function buildOnStreamComplete({ provider, model, connectionId, apiKey, requestStartTime, body, stream, finalBody, translatedBody, clientRawRequest }) {
|
||||
const streamDetailId = `${Date.now()}-${Math.random().toString(36).slice(2, 11)}`;
|
||||
|
||||
const onStreamComplete = (contentObj, usage, ttftAt) => {
|
||||
const latency = {
|
||||
ttft: ttftAt ? ttftAt - requestStartTime : Date.now() - requestStartTime,
|
||||
total: Date.now() - requestStartTime
|
||||
};
|
||||
|
||||
saveRequestDetail(buildRequestDetail({
|
||||
provider, model, connectionId,
|
||||
latency,
|
||||
tokens: usage || { prompt_tokens: 0, completion_tokens: 0 },
|
||||
request: extractRequestConfig(body, stream),
|
||||
providerRequest: finalBody || translatedBody || null,
|
||||
providerResponse: contentObj.content || "[Empty streaming response]",
|
||||
response: { content: contentObj.content || "[Empty streaming response]", thinking: contentObj.thinking || null, type: "streaming" },
|
||||
status: "success"
|
||||
}, { id: streamDetailId })).catch(err => {
|
||||
console.error("[RequestDetail] Failed to update streaming content:", err.message);
|
||||
});
|
||||
|
||||
saveUsageStats({ provider, model, tokens: usage, connectionId, apiKey, endpoint: clientRawRequest?.endpoint, label: "STREAM USAGE" });
|
||||
};
|
||||
|
||||
return { onStreamComplete, streamDetailId };
|
||||
}
|
||||
|
|
@ -32,7 +32,7 @@ export async function handleResponsesCore({ body, modelInfo, credentials, log, o
|
|||
convertedBody.stream = false;
|
||||
}
|
||||
|
||||
// Call chat core handler
|
||||
// Call chat core handler — force sourceFormat so streaming path knows this is a Responses API client
|
||||
const result = await handleChatCore({
|
||||
body: convertedBody,
|
||||
modelInfo,
|
||||
|
|
@ -41,7 +41,8 @@ export async function handleResponsesCore({ body, modelInfo, credentials, log, o
|
|||
onCredentialsRefreshed,
|
||||
onRequestSuccess,
|
||||
onDisconnect,
|
||||
connectionId
|
||||
connectionId,
|
||||
sourceFormatOverride: "openai-responses"
|
||||
});
|
||||
|
||||
if (!result.success || !result.response) {
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
{
|
||||
"name": "9router-app",
|
||||
"version": "0.2.99",
|
||||
"version": "0.3.1",
|
||||
"description": "9Router web dashboard",
|
||||
"private": true,
|
||||
"scripts": {
|
||||
|
|
|
|||
|
|
@ -24,6 +24,8 @@ export default function ProviderDetailPage() {
|
|||
const [selectedConnection, setSelectedConnection] = useState(null);
|
||||
const [modelAliases, setModelAliases] = useState({});
|
||||
const [headerImgError, setHeaderImgError] = useState(false);
|
||||
const [modelTestResults, setModelTestResults] = useState({});
|
||||
const [testingModels, setTestingModels] = useState(false);
|
||||
const { copied, copy } = useCopyToClipboard();
|
||||
|
||||
const providerInfo = providerNode
|
||||
|
|
@ -256,6 +258,27 @@ export default function ProviderDetailPage() {
|
|||
}
|
||||
};
|
||||
|
||||
const handleTestModels = async () => {
|
||||
if (testingModels) return;
|
||||
const conn = connections.find((c) => c.isActive !== false) || connections[0];
|
||||
if (!conn) return;
|
||||
setTestingModels(true);
|
||||
setModelTestResults({});
|
||||
try {
|
||||
const res = await fetch(`/api/providers/${conn.id}/test-models`, { method: "POST" });
|
||||
const data = await res.json();
|
||||
if (res.ok) {
|
||||
const map = {};
|
||||
for (const r of data.results || []) map[r.modelId] = r.ok ? "ok" : "error";
|
||||
setModelTestResults(map);
|
||||
}
|
||||
} catch {
|
||||
// silent fail
|
||||
} finally {
|
||||
setTestingModels(false);
|
||||
}
|
||||
};
|
||||
|
||||
const renderModelsSection = () => {
|
||||
if (isCompatible) {
|
||||
return (
|
||||
|
|
@ -305,6 +328,7 @@ export default function ProviderDetailPage() {
|
|||
onCopy={copy}
|
||||
onSetAlias={(alias) => handleSetAlias(model.id, alias, providerStorageAlias)}
|
||||
onDeleteAlias={() => handleDeleteAlias(existingAlias)}
|
||||
testStatus={modelTestResults[model.id]}
|
||||
/>
|
||||
);
|
||||
})}
|
||||
|
|
@ -494,11 +518,23 @@ export default function ProviderDetailPage() {
|
|||
|
||||
{/* Models */}
|
||||
<Card>
|
||||
<h2 className="text-lg font-semibold mb-4">
|
||||
{providerInfo.passthroughModels ? "Model Aliases" : "Available Models"}
|
||||
</h2>
|
||||
<div className="flex items-center justify-between mb-4">
|
||||
<h2 className="text-lg font-semibold">
|
||||
{providerInfo.passthroughModels ? "Model Aliases" : "Available Models"}
|
||||
</h2>
|
||||
{connections.length > 0 && (
|
||||
<Button
|
||||
size="sm"
|
||||
variant="secondary"
|
||||
icon={testingModels ? "progress_activity" : "science"}
|
||||
onClick={handleTestModels}
|
||||
disabled={testingModels}
|
||||
>
|
||||
{testingModels ? "Testing…" : "Test Models"}
|
||||
</Button>
|
||||
)}
|
||||
</div>
|
||||
{renderModelsSection()}
|
||||
|
||||
</Card>
|
||||
|
||||
{/* Modals */}
|
||||
|
|
@ -552,10 +588,27 @@ export default function ProviderDetailPage() {
|
|||
);
|
||||
}
|
||||
|
||||
function ModelRow({ model, fullModel, alias, copied, onCopy }) {
|
||||
function ModelRow({ model, fullModel, alias, copied, onCopy, testStatus }) {
|
||||
const borderColor = testStatus === "ok"
|
||||
? "border-green-500/40"
|
||||
: testStatus === "error"
|
||||
? "border-red-500/40"
|
||||
: "border-border";
|
||||
|
||||
const iconColor = testStatus === "ok"
|
||||
? "#22c55e"
|
||||
: testStatus === "error"
|
||||
? "#ef4444"
|
||||
: undefined;
|
||||
|
||||
return (
|
||||
<div className="flex items-center gap-2 px-3 py-2 rounded-lg border border-border hover:bg-sidebar/50">
|
||||
<span className="material-symbols-outlined text-base text-text-muted">smart_toy</span>
|
||||
<div className={`flex items-center gap-2 px-3 py-2 rounded-lg border ${borderColor} hover:bg-sidebar/50`}>
|
||||
<span
|
||||
className="material-symbols-outlined text-base"
|
||||
style={iconColor ? { color: iconColor } : undefined}
|
||||
>
|
||||
{testStatus === "ok" ? "check_circle" : testStatus === "error" ? "cancel" : "smart_toy"}
|
||||
</span>
|
||||
<code className="text-xs text-text-muted font-mono bg-sidebar px-1.5 py-0.5 rounded">{fullModel}</code>
|
||||
<button
|
||||
onClick={() => onCopy(fullModel, `model-${model.id}`)}
|
||||
|
|
@ -578,6 +631,7 @@ ModelRow.propTypes = {
|
|||
alias: PropTypes.string,
|
||||
copied: PropTypes.string,
|
||||
onCopy: PropTypes.func.isRequired,
|
||||
testStatus: PropTypes.oneOf(["ok", "error"]),
|
||||
};
|
||||
|
||||
function PassthroughModelsSection({ providerAlias, modelAliases, copied, onCopy, onSetAlias, onDeleteAlias }) {
|
||||
|
|
@ -1498,3 +1552,4 @@ EditCompatibleNodeModal.propTypes = {
|
|||
onClose: PropTypes.func.isRequired,
|
||||
isAnthropic: PropTypes.bool,
|
||||
};
|
||||
|
||||
|
|
|
|||
111
src/app/api/providers/[id]/test-models/route.js
Normal file
111
src/app/api/providers/[id]/test-models/route.js
Normal file
|
|
@ -0,0 +1,111 @@
|
|||
import { NextResponse } from "next/server";
|
||||
import { getProviderConnectionById, getApiKeys } from "@/lib/localDb";
|
||||
import { getProviderModels, PROVIDER_ID_TO_ALIAS } from "open-sse/config/providerModels.js";
|
||||
import { isOpenAICompatibleProvider, isAnthropicCompatibleProvider } from "@/shared/constants/providers";
|
||||
|
||||
/**
|
||||
* Get an active API key to pass through auth when requireApiKey is enabled.
|
||||
*/
|
||||
async function getInternalApiKey() {
|
||||
const keys = await getApiKeys();
|
||||
return keys.find((k) => k.isActive !== false)?.key || null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Ping a single model via internal completions endpoint (OpenAI format).
|
||||
* open-sse handles all provider translation automatically.
|
||||
*/
|
||||
async function pingModel(modelId, baseUrl, apiKey) {
|
||||
const start = Date.now();
|
||||
try {
|
||||
const headers = { "Content-Type": "application/json" };
|
||||
if (apiKey) headers["Authorization"] = `Bearer ${apiKey}`;
|
||||
const res = await fetch(`${baseUrl}/api/v1/chat/completions`, {
|
||||
method: "POST",
|
||||
headers,
|
||||
body: JSON.stringify({
|
||||
model: modelId,
|
||||
max_tokens: 1,
|
||||
stream: false,
|
||||
messages: [{ role: "user", content: "hi" }],
|
||||
}),
|
||||
signal: AbortSignal.timeout(15000),
|
||||
});
|
||||
const latencyMs = Date.now() - start;
|
||||
// 200 = working; 400 = bad request but auth passed (model reachable)
|
||||
const ok = res.status === 200 || res.status === 400;
|
||||
let error = null;
|
||||
if (!ok) {
|
||||
const text = await res.text().catch(() => "");
|
||||
error = `HTTP ${res.status}${text ? `: ${text.slice(0, 120)}` : ""}`;
|
||||
}
|
||||
return { ok, latencyMs, error };
|
||||
} catch (err) {
|
||||
return { ok: false, latencyMs: Date.now() - start, error: err.message };
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* POST /api/providers/[id]/test-models
|
||||
* id = connectionId — used only to resolve provider + model list.
|
||||
* Actual requests go through /api/v1/chat/completions (open-sse handles everything).
|
||||
*/
|
||||
export async function POST(request, { params }) {
|
||||
try {
|
||||
const { id } = await params;
|
||||
const connection = await getProviderConnectionById(id);
|
||||
if (!connection) {
|
||||
return NextResponse.json({ error: "Connection not found" }, { status: 404 });
|
||||
}
|
||||
|
||||
const providerId = connection.provider;
|
||||
const isCompatible = isOpenAICompatibleProvider(providerId) || isAnthropicCompatibleProvider(providerId);
|
||||
const alias = PROVIDER_ID_TO_ALIAS[providerId] || providerId;
|
||||
|
||||
let models = getProviderModels(alias);
|
||||
|
||||
// Compatible providers: fetch live model list
|
||||
if (isCompatible && models.length === 0) {
|
||||
try {
|
||||
const modelsRes = await fetch(`${getBaseUrl(request)}/api/providers/${id}/models`);
|
||||
if (modelsRes.ok) {
|
||||
const data = await modelsRes.json();
|
||||
models = (data.models || []).map((m) => ({ id: m.id || m.name, name: m.name || m.id }));
|
||||
}
|
||||
} catch { /* fallback to empty */ }
|
||||
}
|
||||
|
||||
if (models.length === 0) {
|
||||
return NextResponse.json({ error: "No models configured for this provider" }, { status: 400 });
|
||||
}
|
||||
|
||||
const baseUrl = getBaseUrl(request);
|
||||
const apiKey = await getInternalApiKey();
|
||||
|
||||
// Warm up with first model to trigger token refresh (if needed) before parallel calls.
|
||||
// This prevents race condition where multiple requests concurrently refresh the same token.
|
||||
const [first, ...rest] = models;
|
||||
const firstResult = await pingModel(`${alias}/${first.id}`, baseUrl, apiKey);
|
||||
const results = [{ modelId: first.id, name: first.name || first.id, ...firstResult }];
|
||||
|
||||
if (rest.length > 0) {
|
||||
const restResults = await Promise.all(
|
||||
rest.map(async (model) => {
|
||||
const result = await pingModel(`${alias}/${model.id}`, baseUrl, apiKey);
|
||||
return { modelId: model.id, name: model.name || model.id, ...result };
|
||||
})
|
||||
);
|
||||
results.push(...restResults);
|
||||
}
|
||||
|
||||
return NextResponse.json({ provider: providerId, connectionId: id, results });
|
||||
} catch (error) {
|
||||
console.log("Error testing models:", error);
|
||||
return NextResponse.json({ error: "Test failed" }, { status: 500 });
|
||||
}
|
||||
}
|
||||
|
||||
function getBaseUrl(request) {
|
||||
const url = new URL(request.url);
|
||||
return `${url.protocol}//${url.host}`;
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue