diff --git a/netlify/edge-functions/agent-proxy.ts b/netlify/edge-functions/agent-proxy.ts index 7572f32..df256ee 100644 --- a/netlify/edge-functions/agent-proxy.ts +++ b/netlify/edge-functions/agent-proxy.ts @@ -179,13 +179,30 @@ async function anthropicFetch( // Parse the SSE stream into an async iterable of decoded events. // Each SSE message is `event: \ndata: \n\n`. +// +// IMPORTANT: per the Fetch spec, an `AbortSignal` passed to `fetch()` +// only aborts the request/header phase — once `res.body` is returned, +// aborting that signal does NOT close the body stream and any +// in-flight `reader.read()` will hang forever. So we attach our own +// listener and `reader.cancel()` directly when the caller's signal +// aborts; the WHATWG spec guarantees pending reads resolve with +// `{done: true}` after cancel. async function* parseSse( stream: ReadableStream, + signal?: AbortSignal, ): AsyncGenerator { const reader = stream.getReader() const decoder = new TextDecoder('utf-8') let buffer = '' + const onAbort = () => { + reader.cancel().catch(() => {}) + } + if (signal) { + if (signal.aborted) onAbort() + else signal.addEventListener('abort', onAbort, { once: true }) + } + try { while (true) { const { value, done } = await reader.read() @@ -217,6 +234,9 @@ async function* parseSse( } } } finally { + if (signal) { + signal.removeEventListener('abort', onAbort) + } try { reader.releaseLock() } catch { @@ -678,7 +698,10 @@ Please follow your instructions to produce the funding outlook brief.` // Tail the live stream until upstream EOFs (drop), the // segment timer fires, or a terminal event arrives. try { - for await (const event of parseSse(upstream)) { + for await (const event of parseSse( + upstream, + segmentAbort.signal, + )) { if (segmenting) break if (event.id) { if (!seenEventIds.has(event.id)) {