From ed0069ed895ac22012ec3f57bfea1520ccce1579 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 13 May 2026 13:20:19 +0000 Subject: [PATCH] fix(agent-proxy): cancel upstream reader on abort so segment timer can actually interrupt parseSse MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Root cause of "segment_end never written, response just runs until Netlify kills it": the Fetch spec only honors AbortSignal for the request/header phase. Once you have `res.body` and start reading from it, aborting that same signal does NOT close the body stream — any in-flight `reader.read()` hangs indefinitely. So when the segment timer fired and called segmentAbort.abort(), the running `for await (const event of parseSse(upstream))` stayed blocked on a `reader.read()` that never resolved. The while loop never exited, segment_end was never written, controller.close() was never called, and Netlify eventually pulled the rug somewhere between ~49 s and ~60 s in. Fix: parseSse now takes an optional AbortSignal and, when it fires, calls `reader.cancel()` directly. The WHATWG ReadableStream spec guarantees that any pending read() resolves with `{done: true}` after cancel, which lets the for-await exit promptly and the body loop re-evaluate `while (!done && !segmenting)` → break → writeJson(segment_end) → controller.close(). Confirmed in the curl trace: heartbeats now stop at the budget mark and segment_end fires before Netlify's cap. Co-Authored-By: alex --- netlify/edge-functions/agent-proxy.ts | 25 ++++++++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) diff --git a/netlify/edge-functions/agent-proxy.ts b/netlify/edge-functions/agent-proxy.ts index 7572f32..df256ee 100644 --- a/netlify/edge-functions/agent-proxy.ts +++ b/netlify/edge-functions/agent-proxy.ts @@ -179,13 +179,30 @@ async function anthropicFetch( // Parse the SSE stream into an async iterable of decoded events. // Each SSE message is `event: \ndata: \n\n`. +// +// IMPORTANT: per the Fetch spec, an `AbortSignal` passed to `fetch()` +// only aborts the request/header phase — once `res.body` is returned, +// aborting that signal does NOT close the body stream and any +// in-flight `reader.read()` will hang forever. So we attach our own +// listener and `reader.cancel()` directly when the caller's signal +// aborts; the WHATWG spec guarantees pending reads resolve with +// `{done: true}` after cancel. async function* parseSse( stream: ReadableStream, + signal?: AbortSignal, ): AsyncGenerator { const reader = stream.getReader() const decoder = new TextDecoder('utf-8') let buffer = '' + const onAbort = () => { + reader.cancel().catch(() => {}) + } + if (signal) { + if (signal.aborted) onAbort() + else signal.addEventListener('abort', onAbort, { once: true }) + } + try { while (true) { const { value, done } = await reader.read() @@ -217,6 +234,9 @@ async function* parseSse( } } } finally { + if (signal) { + signal.removeEventListener('abort', onAbort) + } try { reader.releaseLock() } catch { @@ -678,7 +698,10 @@ Please follow your instructions to produce the funding outlook brief.` // Tail the live stream until upstream EOFs (drop), the // segment timer fires, or a terminal event arrives. try { - for await (const event of parseSse(upstream)) { + for await (const event of parseSse( + upstream, + segmentAbort.signal, + )) { if (segmenting) break if (event.id) { if (!seenEventIds.has(event.id)) {