9router/open-sse/utils/stream.js

325 lines
12 KiB
JavaScript

import { translateResponse, initState } from "../translator/index.js";
import { FORMATS } from "../translator/formats.js";
import { trackPendingRequest, appendRequestLog } from "@/lib/usageDb.js";
import { extractUsage, hasValidUsage, estimateUsage, logUsage, addBufferToUsage, filterUsageForFormat, COLORS } from "./usageTracking.js";
import { parseSSELine, hasValuableContent, fixInvalidId, formatSSE } from "./streamHelpers.js";
export { COLORS, formatSSE };
const sharedDecoder = new TextDecoder();
const sharedEncoder = new TextEncoder();
/**
* Stream modes
*/
const STREAM_MODE = {
TRANSLATE: "translate", // Full translation between formats
PASSTHROUGH: "passthrough" // No translation, normalize output, extract usage
};
/**
* Create unified SSE transform stream
* @param {object} options
* @param {string} options.mode - Stream mode: translate, passthrough
* @param {string} options.targetFormat - Provider format (for translate mode)
* @param {string} options.sourceFormat - Client format (for translate mode)
* @param {string} options.provider - Provider name
* @param {object} options.reqLogger - Request logger instance
* @param {string} options.model - Model name
* @param {string} options.connectionId - Connection ID for usage tracking
* @param {object} options.body - Request body (for input token estimation)
*/
export function createSSEStream(options = {}) {
const {
mode = STREAM_MODE.TRANSLATE,
targetFormat,
sourceFormat,
provider = null,
reqLogger = null,
toolNameMap = null,
model = null,
connectionId = null,
body = null
} = options;
let buffer = "";
let usage = null;
// State for translate mode
const state = mode === STREAM_MODE.TRANSLATE ? { ...initState(sourceFormat), provider, toolNameMap } : null;
// Track content length for usage estimation (both modes)
let totalContentLength = 0;
return new TransformStream({
transform(chunk, controller) {
const text = sharedDecoder.decode(chunk, { stream: true });
buffer += text;
reqLogger?.appendProviderChunk?.(text);
const lines = buffer.split("\n");
buffer = lines.pop() || "";
for (const line of lines) {
const trimmed = line.trim();
// Passthrough mode: normalize and forward
if (mode === STREAM_MODE.PASSTHROUGH) {
let output;
let injectedUsage = false;
if (trimmed.startsWith("data:") && trimmed.slice(5).trim() !== "[DONE]") {
try {
const parsed = JSON.parse(trimmed.slice(5).trim());
const idFixed = fixInvalidId(parsed);
if (!hasValuableContent(parsed, FORMATS.OPENAI)) {
continue;
}
const delta = parsed.choices?.[0]?.delta;
const content = delta?.content || delta?.reasoning_content;
if (content && typeof content === "string") {
totalContentLength += content.length;
}
const extracted = extractUsage(parsed);
if (extracted) {
usage = extracted;
}
const isFinishChunk = parsed.choices?.[0]?.finish_reason;
if (isFinishChunk && !hasValidUsage(parsed.usage)) {
const estimated = estimateUsage(body, totalContentLength, FORMATS.OPENAI);
parsed.usage = filterUsageForFormat(estimated, FORMATS.OPENAI);
output = `data: ${JSON.stringify(parsed)}\n`;
usage = estimated;
injectedUsage = true;
} else if (isFinishChunk && usage) {
const buffered = addBufferToUsage(usage);
parsed.usage = filterUsageForFormat(buffered, FORMATS.OPENAI);
output = `data: ${JSON.stringify(parsed)}\n`;
injectedUsage = true;
} else if (idFixed) {
output = `data: ${JSON.stringify(parsed)}\n`;
injectedUsage = true;
}
} catch { }
}
if (!injectedUsage) {
if (line.startsWith("data:") && !line.startsWith("data: ")) {
output = "data: " + line.slice(5) + "\n";
} else {
output = line + "\n";
}
}
reqLogger?.appendConvertedChunk?.(output);
controller.enqueue(sharedEncoder.encode(output));
continue;
}
// Translate mode
if (!trimmed) continue;
const parsed = parseSSELine(trimmed);
if (!parsed) continue;
if (parsed && parsed.done) {
const output = "data: [DONE]\n\n";
reqLogger?.appendConvertedChunk?.(output);
controller.enqueue(sharedEncoder.encode(output));
continue;
}
// Track content length for estimation (from various formats)
// Include both regular content and reasoning/thinking content
// Claude format
if (parsed.delta?.text) {
totalContentLength += parsed.delta.text.length;
}
if (parsed.delta?.thinking) {
totalContentLength += parsed.delta.thinking.length;
}
// OpenAI format
if (parsed.choices?.[0]?.delta?.content) {
totalContentLength += parsed.choices[0].delta.content.length;
}
if (parsed.choices?.[0]?.delta?.reasoning_content) {
totalContentLength += parsed.choices[0].delta.reasoning_content.length;
}
// Gemini format - may have multiple parts
if (parsed.candidates?.[0]?.content?.parts) {
for (const part of parsed.candidates[0].content.parts) {
if (part.text && typeof part.text === "string") {
totalContentLength += part.text.length;
}
}
}
// Extract usage
const extracted = extractUsage(parsed);
if (extracted) state.usage = extracted; // Keep original usage for logging
// Translate: targetFormat -> openai -> sourceFormat
const translated = translateResponse(targetFormat, sourceFormat, parsed, state);
// Log OpenAI intermediate chunks (if available)
if (translated?._openaiIntermediate) {
for (const item of translated._openaiIntermediate) {
const openaiOutput = formatSSE(item, FORMATS.OPENAI);
reqLogger?.appendOpenAIChunk?.(openaiOutput);
}
}
if (translated?.length > 0) {
for (const item of translated) {
// Filter empty chunks
if (!hasValuableContent(item, sourceFormat)) {
continue; // Skip this empty chunk
}
// Inject estimated usage if finish chunk has no valid usage
const isFinishChunk = item.type === "message_delta" || item.choices?.[0]?.finish_reason;
if (state.finishReason && isFinishChunk && !hasValidUsage(item.usage) && totalContentLength > 0) {
const estimated = estimateUsage(body, totalContentLength, sourceFormat);
item.usage = filterUsageForFormat(estimated, sourceFormat); // Filter + already has buffer
state.usage = estimated;
} else if (state.finishReason && isFinishChunk && state.usage) {
// Add buffer and filter usage for client (but keep original in state.usage for logging)
const buffered = addBufferToUsage(state.usage);
item.usage = filterUsageForFormat(buffered, sourceFormat);
}
const output = formatSSE(item, sourceFormat);
reqLogger?.appendConvertedChunk?.(output);
controller.enqueue(sharedEncoder.encode(output));
}
}
}
},
flush(controller) {
trackPendingRequest(model, provider, connectionId, false);
try {
const remaining = sharedDecoder.decode();
if (remaining) buffer += remaining;
if (mode === STREAM_MODE.PASSTHROUGH) {
if (buffer) {
let output = buffer;
if (buffer.startsWith("data:") && !buffer.startsWith("data: ")) {
output = "data: " + buffer.slice(5);
}
reqLogger?.appendConvertedChunk?.(output);
controller.enqueue(sharedEncoder.encode(output));
}
// Estimate usage if provider didn't return valid usage (PASSTHROUGH is always OpenAI format)
if (!hasValidUsage(usage) && totalContentLength > 0) {
usage = estimateUsage(body, totalContentLength, FORMATS.OPENAI);
}
if (hasValidUsage(usage)) {
logUsage(provider, usage, model, connectionId);
} else {
appendRequestLog({ model, provider, connectionId, tokens: null, status: "200 OK" }).catch(() => { });
}
return;
}
// Translate mode: process remaining buffer
if (buffer.trim()) {
const parsed = parseSSELine(buffer.trim());
if (parsed && !parsed.done) {
const translated = translateResponse(targetFormat, sourceFormat, parsed, state);
// Log OpenAI intermediate chunks
if (translated?._openaiIntermediate) {
for (const item of translated._openaiIntermediate) {
const openaiOutput = formatSSE(item, FORMATS.OPENAI);
reqLogger?.appendOpenAIChunk?.(openaiOutput);
}
}
if (translated?.length > 0) {
for (const item of translated) {
const output = formatSSE(item, sourceFormat);
reqLogger?.appendConvertedChunk?.(output);
controller.enqueue(sharedEncoder.encode(output));
}
}
}
}
// Flush remaining events (only once at stream end)
const flushed = translateResponse(targetFormat, sourceFormat, null, state);
// Log OpenAI intermediate chunks for flushed events
if (flushed?._openaiIntermediate) {
for (const item of flushed._openaiIntermediate) {
const openaiOutput = formatSSE(item, FORMATS.OPENAI);
reqLogger?.appendOpenAIChunk?.(openaiOutput);
}
}
if (flushed?.length > 0) {
for (const item of flushed) {
const output = formatSSE(item, sourceFormat);
reqLogger?.appendConvertedChunk?.(output);
controller.enqueue(sharedEncoder.encode(output));
}
}
// Send [DONE] and log usage
const doneOutput = "data: [DONE]\n\n";
reqLogger?.appendConvertedChunk?.(doneOutput);
controller.enqueue(sharedEncoder.encode(doneOutput));
// Estimate usage if provider didn't return valid usage (for translate mode)
if (!hasValidUsage(state?.usage) && totalContentLength > 0) {
state.usage = estimateUsage(body, totalContentLength, sourceFormat);
}
if (hasValidUsage(state?.usage)) {
logUsage(state.provider || targetFormat, state.usage, model, connectionId);
} else {
appendRequestLog({ model, provider, connectionId, tokens: null, status: "200 OK" }).catch(() => { });
}
} catch (error) {
console.log("Error in flush:", error);
}
}
});
}
// Convenience functions for backward compatibility
export function createSSETransformStreamWithLogger(targetFormat, sourceFormat, provider = null, reqLogger = null, toolNameMap = null, model = null, connectionId = null, body = null) {
return createSSEStream({
mode: STREAM_MODE.TRANSLATE,
targetFormat,
sourceFormat,
provider,
reqLogger,
toolNameMap,
model,
connectionId,
body
});
}
export function createPassthroughStreamWithLogger(provider = null, reqLogger = null, model = null, connectionId = null, body = null) {
return createSSEStream({
mode: STREAM_MODE.PASSTHROUGH,
provider,
reqLogger,
model,
connectionId,
body
});
}