fix(agent-proxy): cancel upstream reader on abort so segment timer can actually interrupt parseSse
Root cause of "segment_end never written, response just runs until
Netlify kills it": the Fetch spec only honors AbortSignal for the
request/header phase. Once you have `res.body` and start reading
from it, aborting that same signal does NOT close the body stream —
any in-flight `reader.read()` hangs indefinitely.
So when the segment timer fired and called segmentAbort.abort(), the
running `for await (const event of parseSse(upstream))` stayed
blocked on a `reader.read()` that never resolved. The while loop
never exited, segment_end was never written, controller.close() was
never called, and Netlify eventually pulled the rug somewhere
between ~49 s and ~60 s in.
Fix: parseSse now takes an optional AbortSignal and, when it fires,
calls `reader.cancel()` directly. The WHATWG ReadableStream spec
guarantees that any pending read() resolves with `{done: true}`
after cancel, which lets the for-await exit promptly and the body
loop re-evaluate `while (!done && !segmenting)` → break →
writeJson(segment_end) → controller.close(). Confirmed in the curl
trace: heartbeats now stop at the budget mark and segment_end fires
before Netlify's cap.
Co-Authored-By: alex <alex@semipublic.co>
This commit is contained in:
@@ -179,13 +179,30 @@ async function anthropicFetch(
|
||||
|
||||
// Parse the SSE stream into an async iterable of decoded events.
|
||||
// Each SSE message is `event: <type>\ndata: <json>\n\n`.
|
||||
//
|
||||
// IMPORTANT: per the Fetch spec, an `AbortSignal` passed to `fetch()`
|
||||
// only aborts the request/header phase — once `res.body` is returned,
|
||||
// aborting that signal does NOT close the body stream and any
|
||||
// in-flight `reader.read()` will hang forever. So we attach our own
|
||||
// listener and `reader.cancel()` directly when the caller's signal
|
||||
// aborts; the WHATWG spec guarantees pending reads resolve with
|
||||
// `{done: true}` after cancel.
|
||||
async function* parseSse(
|
||||
stream: ReadableStream<Uint8Array>,
|
||||
signal?: AbortSignal,
|
||||
): AsyncGenerator<AgentEvent> {
|
||||
const reader = stream.getReader()
|
||||
const decoder = new TextDecoder('utf-8')
|
||||
let buffer = ''
|
||||
|
||||
const onAbort = () => {
|
||||
reader.cancel().catch(() => {})
|
||||
}
|
||||
if (signal) {
|
||||
if (signal.aborted) onAbort()
|
||||
else signal.addEventListener('abort', onAbort, { once: true })
|
||||
}
|
||||
|
||||
try {
|
||||
while (true) {
|
||||
const { value, done } = await reader.read()
|
||||
@@ -217,6 +234,9 @@ async function* parseSse(
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
if (signal) {
|
||||
signal.removeEventListener('abort', onAbort)
|
||||
}
|
||||
try {
|
||||
reader.releaseLock()
|
||||
} catch {
|
||||
@@ -678,7 +698,10 @@ Please follow your instructions to produce the funding outlook brief.`
|
||||
// Tail the live stream until upstream EOFs (drop), the
|
||||
// segment timer fires, or a terminal event arrives.
|
||||
try {
|
||||
for await (const event of parseSse(upstream)) {
|
||||
for await (const event of parseSse(
|
||||
upstream,
|
||||
segmentAbort.signal,
|
||||
)) {
|
||||
if (segmenting) break
|
||||
if (event.id) {
|
||||
if (!seenEventIds.has(event.id)) {
|
||||
|
||||
Reference in New Issue
Block a user