diff --git a/open-sse/executors/codex.js b/open-sse/executors/codex.js index 0814e4f..0c0ee6d 100644 --- a/open-sse/executors/codex.js +++ b/open-sse/executors/codex.js @@ -1,6 +1,7 @@ import { BaseExecutor } from "./base.js"; import { CODEX_DEFAULT_INSTRUCTIONS } from "../config/codexInstructions.js"; import { PROVIDERS } from "../config/constants.js"; +import { normalizeResponsesInput } from "../translator/helpers/responsesApiHelper.js"; /** * Codex Executor - handles OpenAI Codex API (Responses API format) @@ -24,6 +25,10 @@ export class CodexExecutor extends BaseExecutor { * Transform request before sending - inject default instructions if missing */ transformRequest(model, body, stream, credentials) { + // Convert string input to array format (Codex API requires input as array) + const normalized = normalizeResponsesInput(body.input); + if (normalized) body.input = normalized; + // Ensure input is present and non-empty (Codex API rejects empty input) if (!body.input || (Array.isArray(body.input) && body.input.length === 0)) { body.input = [{ type: "message", role: "user", content: [{ type: "input_text", text: "..." }] }]; diff --git a/open-sse/handlers/chatCore.js b/open-sse/handlers/chatCore.js index f8161a6..51541b3 100644 --- a/open-sse/handlers/chatCore.js +++ b/open-sse/handlers/chatCore.js @@ -12,6 +12,7 @@ import { HTTP_STATUS } from "../config/constants.js"; import { handleBypassRequest } from "../utils/bypassHandler.js"; import { saveRequestUsage, trackPendingRequest, appendRequestLog, saveRequestDetail } from "@/lib/usageDb.js"; import { getExecutor } from "../executors/index.js"; +import { convertResponsesStreamToJson } from "../transformer/streamToJsonConverter.js"; /** * Translate non-streaming response to OpenAI format @@ -365,8 +366,12 @@ export async function handleChatCore({ body, modelInfo, credentials, log, onCred const modelTargetFormat = getModelTargetFormat(alias, model); const targetFormat = modelTargetFormat || getTargetFormat(provider); + // Track if client actually wants streaming (before we force it for providers) + const clientRequestedStreaming = body.stream === true; + const providerRequiresStreaming = provider === 'openai' || provider === 'codex'; + // Force streaming for OpenAI/Codex models (they don't support non-streaming mode properly) - const stream = (provider === 'openai' || provider === 'codex') ? true : (body.stream !== false); + const stream = providerRequiresStreaming ? true : (body.stream !== false); // Create request logger for this session: sourceFormat_targetFormat_model const reqLogger = await createRequestLogger(sourceFormat, targetFormat, model); @@ -552,6 +557,77 @@ export async function handleChatCore({ body, modelInfo, credentials, log, onCred return createErrorResult(statusCode, errMsg, retryAfterMs); } + // Provider forced streaming but client wants JSON - convert SSE to JSON + if (!clientRequestedStreaming && providerRequiresStreaming) { + trackPendingRequest(model, provider, connectionId, false); + const contentType = providerResponse.headers.get("content-type") || ""; + + // Treat as SSE if content-type says so OR if it's empty/missing + // (Codex API doesn't always set Content-Type on streaming responses) + const isSSEResponse = contentType.includes("text/event-stream") || (contentType === "" && provider === "codex"); + if (isSSEResponse) { + const isResponsesApi = sourceFormat === 'openai-responses'; + + if (isResponsesApi) { + // Responses API SSE → Responses API JSON (for pydantic_ai, OpenAI SDK, etc.) + try { + const jsonResponse = await convertResponsesStreamToJson(providerResponse.body); + log?.info?.("STREAM", `Converted Responses API SSE → JSON for non-streaming client`); + + if (onRequestSuccess) await onRequestSuccess(); + + const usage = jsonResponse.usage || {}; + appendRequestLog({ model, provider, connectionId, tokens: usage, status: "200 OK" }).catch(() => { }); + + if (usage && typeof usage === 'object') { + const msg = `[${new Date().toLocaleTimeString("en-US", { hour12: false, hour: "2-digit", minute: "2-digit" })}] 📊 [USAGE] ${provider.toUpperCase()} | in=${usage?.input_tokens || 0} | out=${usage?.output_tokens || 0}${connectionId ? ` | account=${connectionId.slice(0, 8)}...` : ""}`; + console.log(`${COLORS.green}${msg}${COLORS.reset}`); + + saveRequestUsage({ + provider: provider || "unknown", + model: model || "unknown", + tokens: { prompt_tokens: usage.input_tokens || 0, completion_tokens: usage.output_tokens || 0 }, + timestamp: new Date().toISOString(), + connectionId: connectionId || undefined, + apiKey: apiKey || undefined + }).catch(() => { }); + } + + return { + success: true, + response: new Response(JSON.stringify(jsonResponse), { + headers: { + "Content-Type": "application/json", + "Access-Control-Allow-Origin": "*" + } + }) + }; + } catch (error) { + console.error("[ChatCore] Responses API SSE→JSON conversion failed:", error); + return createErrorResult(HTTP_STATUS.BAD_GATEWAY, "Failed to convert streaming response to JSON"); + } + } else { + // Chat Completions SSE → Chat Completions JSON + const sseText = await providerResponse.text(); + const parsed = parseSSEToOpenAIResponse(sseText, model); + if (parsed) { + if (onRequestSuccess) await onRequestSuccess(); + appendRequestLog({ model, provider, connectionId, tokens: parsed.usage, status: "200 OK" }).catch(() => { }); + return { + success: true, + response: new Response(JSON.stringify(parsed), { + headers: { + "Content-Type": "application/json", + "Access-Control-Allow-Origin": "*" + } + }) + }; + } + return createErrorResult(HTTP_STATUS.BAD_GATEWAY, "Invalid SSE response for non-streaming request"); + } + } + } + // Non-streaming response if (!stream) { trackPendingRequest(model, provider, connectionId, false); diff --git a/open-sse/handlers/responsesHandler.js b/open-sse/handlers/responsesHandler.js index 3f9dbb1..c256985 100644 --- a/open-sse/handlers/responsesHandler.js +++ b/open-sse/handlers/responsesHandler.js @@ -6,6 +6,7 @@ import { handleChatCore } from "./chatCore.js"; import { convertResponsesApiFormat } from "../translator/helpers/responsesApiHelper.js"; import { createResponsesApiTransformStream } from "../transformer/responsesTransformer.js"; +import { convertResponsesStreamToJson } from "../transformer/streamToJsonConverter.js"; /** * Handle /v1/responses request @@ -24,8 +25,12 @@ export async function handleResponsesCore({ body, modelInfo, credentials, log, o // Convert Responses API format to Chat Completions format const convertedBody = convertResponsesApiFormat(body); - // Ensure stream is enabled - convertedBody.stream = true; + // Preserve client's stream preference (matches OpenClaw behavior) + // Default to false if omitted: Boolean(undefined) = false + const clientRequestedStreaming = convertedBody.stream === true; + if (convertedBody.stream === undefined) { + convertedBody.stream = false; + } // Call chat core handler const result = await handleChatCore({ @@ -46,26 +51,52 @@ export async function handleResponsesCore({ body, modelInfo, credentials, log, o const response = result.response; const contentType = response.headers.get("Content-Type") || ""; - // If not SSE or error, return as-is - if (!contentType.includes("text/event-stream") || response.status !== 200) { - return result; + // Case 1: Client wants non-streaming, but got SSE (provider forced it, e.g., Codex) + if (!clientRequestedStreaming && contentType.includes("text/event-stream")) { + try { + const jsonResponse = await convertResponsesStreamToJson(response.body); + + return { + success: true, + response: new Response(JSON.stringify(jsonResponse), { + status: 200, + headers: { + "Content-Type": "application/json", + "Cache-Control": "no-cache", + "Access-Control-Allow-Origin": "*" + } + }) + }; + } catch (error) { + console.error("[Responses API] Stream-to-JSON conversion failed:", error); + return { + success: false, + status: 500, + error: "Failed to convert streaming response to JSON" + }; + } } - // Transform SSE stream to Responses API format (no logging in worker) - const transformStream = createResponsesApiTransformStream(null); - const transformedBody = response.body.pipeThrough(transformStream); + // Case 2: Client wants streaming, got SSE - transform it + if (clientRequestedStreaming && contentType.includes("text/event-stream")) { + const transformStream = createResponsesApiTransformStream(null); + const transformedBody = response.body.pipeThrough(transformStream); - return { - success: true, - response: new Response(transformedBody, { - status: 200, - headers: { - "Content-Type": "text/event-stream", - "Cache-Control": "no-cache", - "Connection": "keep-alive", - "Access-Control-Allow-Origin": "*" - } - }) - }; + return { + success: true, + response: new Response(transformedBody, { + status: 200, + headers: { + "Content-Type": "text/event-stream", + "Cache-Control": "no-cache", + "Connection": "keep-alive", + "Access-Control-Allow-Origin": "*" + } + }) + }; + } + + // Case 3: Non-SSE response (error or non-streaming from provider) - return as-is + return result; } diff --git a/open-sse/services/provider.js b/open-sse/services/provider.js index 4c50efe..4f7250e 100644 --- a/open-sse/services/provider.js +++ b/open-sse/services/provider.js @@ -36,8 +36,9 @@ function buildAnthropicCompatibleUrl(baseUrl) { // Detect request format from body structure export function detectFormat(body) { - // OpenAI Responses API: has input[] array instead of messages[] - if (body.input && Array.isArray(body.input)) { + // OpenAI Responses API: has input (array or string) instead of messages[] + // The Responses API accepts both input as array and input as a plain string + if (body.input && (Array.isArray(body.input) || typeof body.input === "string") && !body.messages) { return "openai-responses"; } diff --git a/open-sse/transformer/streamToJsonConverter.js b/open-sse/transformer/streamToJsonConverter.js new file mode 100644 index 0000000..282f9cf --- /dev/null +++ b/open-sse/transformer/streamToJsonConverter.js @@ -0,0 +1,103 @@ +/** + * Stream-to-JSON Converter + * Converts Responses API SSE stream to single JSON response + * Used when client requests non-streaming but provider forces streaming (e.g., Codex) + */ + +/** + * Convert Responses API SSE stream to single JSON response + * @param {ReadableStream} stream - SSE stream from provider + * @returns {Promise} Final JSON response in Responses API format + */ +export async function convertResponsesStreamToJson(stream) { + const reader = stream.getReader(); + const decoder = new TextDecoder(); + + let buffer = ""; + let responseId = ""; + let output = []; + let created = Math.floor(Date.now() / 1000); + let status = "in_progress"; + let usage = { input_tokens: 0, output_tokens: 0, total_tokens: 0 }; + + // Map of output_index -> item (for ordered output array) + const items = new Map(); + + try { + while (true) { + const { done, value } = await reader.read(); + if (done) break; + + buffer += decoder.decode(value, { stream: true }); + + // Split by double newline (SSE event separator) + const messages = buffer.split("\n\n"); + buffer = messages.pop() || ""; // Keep incomplete message in buffer + + for (const msg of messages) { + if (!msg.trim()) continue; + + // Parse SSE event + const eventMatch = msg.match(/^event:\s*(.+)$/m); + const dataMatch = msg.match(/^data:\s*(.+)$/m); + + if (!eventMatch || !dataMatch) continue; + + const eventType = eventMatch[1].trim(); + const dataStr = dataMatch[1].trim(); + + if (dataStr === "[DONE]") continue; + + let parsed; + try { + parsed = JSON.parse(dataStr); + } catch { + // Skip malformed JSON + continue; + } + + // Handle different event types + if (eventType === "response.created") { + responseId = parsed.response?.id || responseId; + created = parsed.response?.created_at || created; + } + else if (eventType === "response.output_item.done") { + const idx = parsed.output_index ?? 0; + items.set(idx, parsed.item); + } + else if (eventType === "response.completed") { + status = "completed"; + if (parsed.response?.usage) { + usage.input_tokens = parsed.response.usage.input_tokens || 0; + usage.output_tokens = parsed.response.usage.output_tokens || 0; + usage.total_tokens = parsed.response.usage.total_tokens || 0; + } + } + else if (eventType === "response.failed") { + status = "failed"; + } + } + } + } finally { + reader.releaseLock(); + } + + // Build output array from accumulated items (ordered by index) + const maxIndex = items.size > 0 ? Math.max(...items.keys()) : -1; + for (let i = 0; i <= maxIndex; i++) { + output.push(items.get(i) || { + type: "message", + content: [], + role: "assistant" + }); + } + + return { + id: responseId || `resp_${Date.now()}_${Math.random().toString(36).slice(2, 8)}`, + object: "response", + created_at: created, + status: status || "completed", + output, + usage + }; +} diff --git a/open-sse/translator/helpers/responsesApiHelper.js b/open-sse/translator/helpers/responsesApiHelper.js index 423b2a2..5fff8bd 100644 --- a/open-sse/translator/helpers/responsesApiHelper.js +++ b/open-sse/translator/helpers/responsesApiHelper.js @@ -1,3 +1,18 @@ +/** + * Normalize Responses API input to array format. + * Accepts string or array, returns array of message items. + * @param {string|Array} input - raw input from Responses API body + * @returns {Array|null} normalized array or null if invalid + */ +export function normalizeResponsesInput(input) { + if (typeof input === "string") { + const text = input.trim() === "" ? "..." : input; + return [{ type: "message", role: "user", content: [{ type: "input_text", text }] }]; + } + if (Array.isArray(input)) return input; + return null; +} + /** * Convert OpenAI Responses API format to standard chat completions format * Responses API uses: { input: [...], instructions: "..." } @@ -19,7 +34,10 @@ export function convertResponsesApiFormat(body) { let pendingToolCalls = []; let pendingToolResults = []; - for (const item of body.input) { + const inputItems = normalizeResponsesInput(body.input); + if (!inputItems) return body; + + for (const item of inputItems) { // Determine item type - Droid CLI sends role-based items without 'type' field // Fallback: if no type but has role property, treat as message const itemType = item.type || (item.role ? "message" : null); diff --git a/open-sse/translator/request/openai-responses.js b/open-sse/translator/request/openai-responses.js index 2efa01e..66e0541 100644 --- a/open-sse/translator/request/openai-responses.js +++ b/open-sse/translator/request/openai-responses.js @@ -6,6 +6,7 @@ */ import { register } from "../index.js"; import { FORMATS } from "../formats.js"; +import { normalizeResponsesInput } from "../helpers/responsesApiHelper.js"; /** * Convert OpenAI Responses API request to OpenAI Chat Completions format @@ -25,7 +26,10 @@ export function openaiResponsesToOpenAIRequest(model, body, stream, credentials) let currentAssistantMsg = null; let pendingToolResults = []; - for (const item of body.input) { + const inputItems = normalizeResponsesInput(body.input); + if (!inputItems) return body; + + for (const item of inputItems) { // Determine item type - Droid CLI sends role-based items without 'type' field // Fallback: if no type but has role property, treat as message const itemType = item.type || (item.role ? "message" : null); @@ -234,4 +238,3 @@ export function openaiToOpenAIResponsesRequest(model, body, stream, credentials) // Register both directions register(FORMATS.OPENAI_RESPONSES, FORMATS.OPENAI, openaiResponsesToOpenAIRequest, null); register(FORMATS.OPENAI, FORMATS.OPENAI_RESPONSES, openaiToOpenAIResponsesRequest, null); -