From 94960c6cf5373eaaa3f316b771bb8ca7e706b2b9 Mon Sep 17 00:00:00 2001 From: Muhammad Zakir Ramadhan <61570975+zakirkun@users.noreply.github.com> Date: Mon, 18 May 2026 12:00:33 +0700 Subject: [PATCH] =?UTF-8?q?fix:=20enhance=20stall=20detection=20in=20strea?= =?UTF-8?q?m=20handling=20for=20improved=20disconne=E2=80=A6=20(#1243)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix: enhance stall detection in stream handling for improved disconnect management * fix: improve stall detection handling in pipeWithDisconnect to prevent stale aborts --- open-sse/utils/streamHandler.js | 75 ++++++++++++++++++++++++++------- 1 file changed, 59 insertions(+), 16 deletions(-) diff --git a/open-sse/utils/streamHandler.js b/open-sse/utils/streamHandler.js index 7922746..6a501f5 100644 --- a/open-sse/utils/streamHandler.js +++ b/open-sse/utils/streamHandler.js @@ -85,7 +85,12 @@ export function createStreamController({ onDisconnect, onError, log, provider, m /** * Create transform stream with disconnect detection - * Wraps existing transform stream and adds abort capability + * Wraps existing transform stream and adds abort capability. + * + * Stall detection lives in pipeWithDisconnect (tied to upstream byte + * activity), not here — output of the transform stream may be silent + * for long periods while raw bytes still flow (e.g. Kiro EventStream + * binary frames buffering, Claude reasoning streams). */ export function createDisconnectAwareStream(transformStream, streamController) { const reader = transformStream.readable.getReader(); @@ -99,18 +104,7 @@ export function createDisconnectAwareStream(transformStream, streamController) { } try { - // Race between chunk arrival and stall timeout - let stallTimer; - const stallPromise = new Promise((_, reject) => { - stallTimer = setTimeout(() => reject(new Error("stream stall timeout")), STREAM_STALL_TIMEOUT_MS); - }); - - let done, value; - try { - ({ done, value } = await Promise.race([reader.read(), stallPromise])); - } finally { - clearTimeout(stallTimer); - } + const { done, value } = await reader.read(); if (done) { streamController.handleComplete(); @@ -135,16 +129,65 @@ export function createDisconnectAwareStream(transformStream, streamController) { } /** - * Pipe provider response through transform with disconnect detection + * Pipe provider response through transform with disconnect detection. + * + * Stall watchdog tracks raw upstream byte activity, not transform output. + * Reasoning models (Claude thinking via Kiro, etc.) can produce zero SSE + * output for long stretches while partial EventStream frames keep arriving. + * Measuring stall on the transform output caused false stalls and the + * "failed to pipe response" error in Next. + * + * Any upstream chunk resets the timer. If no bytes arrive for + * STREAM_STALL_TIMEOUT_MS, abort the underlying fetch via the controller. + * * @param {Response} providerResponse - Response from provider * @param {TransformStream} transformStream - Transform stream for SSE * @param {object} streamController - Stream controller from createStreamController */ export function pipeWithDisconnect(providerResponse, transformStream, streamController) { - const transformedBody = providerResponse.body.pipeThrough(transformStream); + let stallTimer = null; + const clearStall = () => { + if (stallTimer) { clearTimeout(stallTimer); stallTimer = null; } + }; + const armStall = () => { + clearStall(); + stallTimer = setTimeout(() => { + stallTimer = null; + streamController.handleError?.(new Error("stream stall timeout")); + streamController.abort?.(); + }, STREAM_STALL_TIMEOUT_MS); + }; + + // Wrap controller so every termination path clears the stall timer. + // Without this, abort/cancel/downstream-error paths leave the timer armed + // and a stale abort could fire after the request has already ended. + const wrappedController = { + signal: streamController.signal, + startTime: streamController.startTime, + isConnected: () => streamController.isConnected(), + handleComplete: () => { clearStall(); streamController.handleComplete(); }, + handleError: (e) => { clearStall(); streamController.handleError(e); }, + handleDisconnect: (r) => { clearStall(); streamController.handleDisconnect(r); }, + abort: () => { clearStall(); streamController.abort(); } + }; + + armStall(); + + const upstreamTap = new TransformStream({ + transform(chunk, controller) { + armStall(); + controller.enqueue(chunk); + }, + flush() { clearStall(); } + }); + + const transformedBody = providerResponse.body + .pipeThrough(upstreamTap) + .pipeThrough(transformStream); + return createDisconnectAwareStream( { readable: transformedBody, writable: { getWriter: () => ({ abort: () => Promise.resolve() }) } }, - streamController + wrappedController ); }