* fix: enhance stall detection in stream handling for improved disconnect management * fix: improve stall detection handling in pipeWithDisconnect to prevent stale aborts
193 lines
5.9 KiB
JavaScript
193 lines
5.9 KiB
JavaScript
// Stream handler with disconnect detection - shared for all providers
|
|
import { STREAM_STALL_TIMEOUT_MS } from "../config/runtimeConfig.js";
|
|
|
|
// Get HH:MM:SS timestamp
|
|
function getTimeString() {
|
|
return new Date().toLocaleTimeString("en-US", { hour12: false, hour: "2-digit", minute: "2-digit", second: "2-digit" });
|
|
}
|
|
|
|
/**
|
|
* Create stream controller with abort and disconnect detection
|
|
* @param {object} options
|
|
* @param {function} options.onDisconnect - Callback when client disconnects
|
|
* @param {object} options.log - Logger instance
|
|
* @param {string} options.provider - Provider name
|
|
* @param {string} options.model - Model name
|
|
*/
|
|
export function createStreamController({ onDisconnect, onError, log, provider, model } = {}) {
|
|
const abortController = new AbortController();
|
|
const startTime = Date.now();
|
|
let disconnected = false;
|
|
let abortTimeout = null;
|
|
|
|
const logStream = (status) => {
|
|
const duration = Date.now() - startTime;
|
|
const p = provider?.toUpperCase() || "UNKNOWN";
|
|
console.log(`[${getTimeString()}] 🌊 [STREAM] ${p} | ${model || "unknown"} | ${duration}ms | ${status}`);
|
|
};
|
|
|
|
return {
|
|
signal: abortController.signal,
|
|
startTime,
|
|
|
|
isConnected: () => !disconnected,
|
|
|
|
// Call when client disconnects
|
|
handleDisconnect: (reason = "client_closed") => {
|
|
if (disconnected) return;
|
|
disconnected = true;
|
|
|
|
logStream(`disconnect: ${reason}`);
|
|
|
|
// Delay abort to allow cleanup
|
|
abortTimeout = setTimeout(() => {
|
|
abortController.abort();
|
|
}, 500);
|
|
|
|
onDisconnect?.({ reason, duration: Date.now() - startTime });
|
|
},
|
|
|
|
// Call when stream completes normally
|
|
handleComplete: () => {
|
|
if (disconnected) return;
|
|
disconnected = true;
|
|
|
|
logStream("complete");
|
|
|
|
if (abortTimeout) {
|
|
clearTimeout(abortTimeout);
|
|
abortTimeout = null;
|
|
}
|
|
},
|
|
|
|
// Call on error
|
|
handleError: (error) => {
|
|
if (disconnected) return;
|
|
disconnected = true;
|
|
|
|
if (abortTimeout) {
|
|
clearTimeout(abortTimeout);
|
|
abortTimeout = null;
|
|
}
|
|
|
|
if (error.name === "AbortError") {
|
|
logStream("aborted");
|
|
return;
|
|
}
|
|
|
|
logStream(`error: ${error.message}`);
|
|
onError?.(error);
|
|
},
|
|
|
|
abort: () => abortController.abort()
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Create transform stream with disconnect detection
|
|
* Wraps existing transform stream and adds abort capability.
|
|
*
|
|
* Stall detection lives in pipeWithDisconnect (tied to upstream byte
|
|
* activity), not here — output of the transform stream may be silent
|
|
* for long periods while raw bytes still flow (e.g. Kiro EventStream
|
|
* binary frames buffering, Claude reasoning streams).
|
|
*/
|
|
export function createDisconnectAwareStream(transformStream, streamController) {
|
|
const reader = transformStream.readable.getReader();
|
|
const writer = transformStream.writable.getWriter();
|
|
|
|
return new ReadableStream({
|
|
async pull(controller) {
|
|
if (!streamController.isConnected()) {
|
|
controller.close();
|
|
return;
|
|
}
|
|
|
|
try {
|
|
const { done, value } = await reader.read();
|
|
|
|
if (done) {
|
|
streamController.handleComplete();
|
|
controller.close();
|
|
return;
|
|
}
|
|
controller.enqueue(value);
|
|
} catch (error) {
|
|
streamController.handleError(error);
|
|
reader.cancel().catch(() => {});
|
|
writer.abort().catch(() => {});
|
|
controller.error(error);
|
|
}
|
|
},
|
|
|
|
cancel(reason) {
|
|
streamController.handleDisconnect(reason || "cancelled");
|
|
reader.cancel();
|
|
writer.abort();
|
|
}
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Pipe provider response through transform with disconnect detection.
|
|
*
|
|
* Stall watchdog tracks raw upstream byte activity, not transform output.
|
|
* Reasoning models (Claude thinking via Kiro, etc.) can produce zero SSE
|
|
* output for long stretches while partial EventStream frames keep arriving.
|
|
* Measuring stall on the transform output caused false stalls and the
|
|
* "failed to pipe response" error in Next.
|
|
*
|
|
* Any upstream chunk resets the timer. If no bytes arrive for
|
|
* STREAM_STALL_TIMEOUT_MS, abort the underlying fetch via the controller.
|
|
*
|
|
* @param {Response} providerResponse - Response from provider
|
|
* @param {TransformStream} transformStream - Transform stream for SSE
|
|
* @param {object} streamController - Stream controller from createStreamController
|
|
*/
|
|
export function pipeWithDisconnect(providerResponse, transformStream, streamController) {
|
|
let stallTimer = null;
|
|
const clearStall = () => {
|
|
if (stallTimer) { clearTimeout(stallTimer); stallTimer = null; }
|
|
};
|
|
const armStall = () => {
|
|
clearStall();
|
|
stallTimer = setTimeout(() => {
|
|
stallTimer = null;
|
|
streamController.handleError?.(new Error("stream stall timeout"));
|
|
streamController.abort?.();
|
|
}, STREAM_STALL_TIMEOUT_MS);
|
|
};
|
|
|
|
// Wrap controller so every termination path clears the stall timer.
|
|
// Without this, abort/cancel/downstream-error paths leave the timer armed
|
|
// and a stale abort could fire after the request has already ended.
|
|
const wrappedController = {
|
|
signal: streamController.signal,
|
|
startTime: streamController.startTime,
|
|
isConnected: () => streamController.isConnected(),
|
|
handleComplete: () => { clearStall(); streamController.handleComplete(); },
|
|
handleError: (e) => { clearStall(); streamController.handleError(e); },
|
|
handleDisconnect: (r) => { clearStall(); streamController.handleDisconnect(r); },
|
|
abort: () => { clearStall(); streamController.abort(); }
|
|
};
|
|
|
|
armStall();
|
|
|
|
const upstreamTap = new TransformStream({
|
|
transform(chunk, controller) {
|
|
armStall();
|
|
controller.enqueue(chunk);
|
|
},
|
|
flush() { clearStall(); }
|
|
});
|
|
|
|
const transformedBody = providerResponse.body
|
|
.pipeThrough(upstreamTap)
|
|
.pipeThrough(transformStream);
|
|
|
|
return createDisconnectAwareStream(
|
|
{ readable: transformedBody, writable: { getWriter: () => ({ abort: () => Promise.resolve() }) } },
|
|
wrappedController
|
|
);
|
|
}
|
|
|