From 8e44de52714da3c25760829310c71faac570006a Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 13 May 2026 13:00:54 +0000 Subject: [PATCH] feat(agent-proxy): segment streaming responses at ~54s to bypass Netlify Edge's ~60s response cap Netlify Edge Functions empirically cap a single streaming response at ~60s wall-clock, regardless of activity. Confirmed against Netlify's own canonical SSE example (edge-functions-examples.netlify.app/sse) which also cuts at +60.1s. The Anthropic Managed Agent session is fine for several minutes; the cap is per-HTTP-response. This commit splits a long brief across multiple HTTP responses, while keeping the UX of one continuous stream: netlify/edge-functions/agent-proxy.ts - Accept either a new-session payload {stationName, stationLocation, stationWebsite} or a resume payload {sessionId, lastEventId, startedAt}. - On resume, skip session creation + user.message send. Just reopen the live SSE stream and backfill via GET /v1/sessions/{id}/events?after_id=lastEventId (deduped by event.id), then keep tailing. - Single AbortController per segment. A 54s timer aborts the upstream, the for-await loops exit, and we write one final NDJSON line: {type:'segment_end', sessionId, lastEventId, startedAt}. - The 20-min OVERALL_BUDGET_MS is enforced via Date.now() - startedAt so it spans across all segments. - Refactor main loop so every iteration is openStream + backfill + tail. Cleaner than the previous initial-stream + reconnect-only-on- drop pattern. src/App.jsx - readStream() now returns a {sessionId, lastEventId, startedAt} payload if it saw a segment_end, or null if the stream ended cleanly. - handleSubmit() loops, reopening /api/agent-proxy with the resume payload until readStream returns null. Spinner/status state stays on across segments so the UI shows one continuous stream. README.md - Document the segmented-streaming protocol and why it exists. Co-Authored-By: alex --- README.md | 42 +- netlify/edge-functions/agent-proxy.ts | 639 ++++++++++++++++---------- src/App.jsx | 107 +++-- 3 files changed, 491 insertions(+), 297 deletions(-) diff --git a/README.md b/README.md index f62f816..8cfc7e4 100644 --- a/README.md +++ b/README.md @@ -34,27 +34,49 @@ Open http://localhost:8888. 1. The browser POSTs the form to `/api/agent-proxy`. 2. A Netlify Edge Function (Deno runtime) creates an Anthropic Managed Agent session, opens the upstream SSE event stream, and sends the - user message. It then tails the stream and re-opens it (with an + user message. It tails the stream and re-opens it (with an `events.list` backfill, deduped by event id) whenever it drops mid session, until the session reports a terminal `session.status_*` event or a 20-minute wall-clock budget is hit. 3. Downstream to the browser the function emits newline-delimited JSON - (`application/x-ndjson`) — `text`, `status`, `heartbeat`, `done`, - `error` — one object per line. + (`application/x-ndjson`) — `text`, `status`, `heartbeat`, + `segment_end`, `done`, `error` — one object per line. 4. The React app reads `response.body.getReader()`, splits on `\n`, and parses each line. `text` lines append to the brief; `status` lines drive a separate “what the agent is doing now” banner. 5. A `Thinking` flag stays `true` until the first chunk arrives, then flips to a streaming state with a pulsing cursor. -Edge Functions are required here — the previous v2 Node Function ran on -AWS Lambda and got killed at ~27 s, well before any reconnect could -fire. Edge Functions run on Deno Deploy with no streaming-duration cap -as long as the function keeps writing to the response body. +### Why segmented streaming + +Netlify Edge Functions empirically cap a single streaming response at +~60 s wall-clock (Netlify's own canonical SSE example cuts at the same +mark, even though the docs imply "indefinite" streaming). The Anthropic +Managed Agent session itself is fine for several minutes; the limit is +per-HTTP-response. So the proxy segments the stream: + +- Just before ~54 s the function writes one final NDJSON line — + `{"type":"segment_end","sessionId":"sess_…","lastEventId":"evt_…","startedAt":…}` — + and closes the response. +- The React side sees `segment_end`, immediately POSTs to + `/api/agent-proxy` again with `{sessionId, lastEventId, startedAt}` + in the body. +- The function recognises this as a resume payload: it does **not** + create a new session or re-send the user message. It just reopens + the live SSE stream, backfills via + `GET /v1/sessions/{id}/events?after_id=lastEventId` (deduped by + `event.id`), and keeps tailing. + +The 20-minute overall budget is enforced via `Date.now() - startedAt` +so it spans across all segments. The previous v2 Node Function (on AWS +Lambda) was killed at ~27 s, well before any reconnect could fire — +moving to Edge Functions extended that to ~60 s per response, and the +segment loop extends it the rest of the way. ## Files -- [src/App.jsx](src/App.jsx) — form, NDJSON streaming reader, UI states. +- [src/App.jsx](src/App.jsx) — form, NDJSON streaming reader, + segment-end resume loop, UI states. - [netlify/edge-functions/agent-proxy.ts](netlify/edge-functions/agent-proxy.ts) - — Managed Agent proxy with reconnect, backfill, and NDJSON wire - protocol. + — Managed Agent proxy with reconnect, backfill, segmentation, and + NDJSON wire protocol. diff --git a/netlify/edge-functions/agent-proxy.ts b/netlify/edge-functions/agent-proxy.ts index 58aedf6..f191b5b 100644 --- a/netlify/edge-functions/agent-proxy.ts +++ b/netlify/edge-functions/agent-proxy.ts @@ -4,71 +4,80 @@ import type { Config, Context } from '@netlify/edge-functions' * Netlify Edge Function (Deno runtime). * * Proxies a request from the React frontend to a Claude Console-defined - * Managed Agent via the `/v1/sessions` endpoints. The agent's model, - * system prompt, tools, MCP servers, and skills are configured in the - * Console and referenced here by ID. + * Managed Agent. The agent's model, system prompt, tools, MCP servers + * and skills are configured in the Console and referenced here by ID. * - * This replaces the v2 Node Function at `netlify/functions/agent-proxy.js`. - * The Node Function process was killed at ~27 s by the platform — well - * before any SSE reconnect could fire — so the brief always stopped after - * the first MCP tool batch. Edge Functions run on Deno Deploy with a - * 40 s response-header timeout but no streaming-duration cap as long as - * we keep writing to the body, so the 20-minute reconnect budget below - * is actually reachable. + * ## Why this exists * - * Wire protocol (downstream to the browser): newline-delimited JSON - * (`application/x-ndjson`). One JSON object per line; the React side - * parses incrementally as bytes arrive. Object shapes: + * Two layered problems are in scope: * - * {"type":"text","text":"...markdown..."} // append to brief + * 1. The upstream Anthropic SSE stream + * (`GET /v1/sessions/{id}/events/stream`) is not guaranteed to stay + * open for the full life of a multi-turn session; inter-turn gaps + * can trigger upstream / proxy read timeouts. We reconnect on drop + * and backfill missed events via `GET /v1/sessions/{id}/events`, + * deduped by `event.id`. + * + * 2. Netlify Edge Functions cap a single streaming response at ~60 s + * wall-clock (empirically confirmed against Netlify's own canonical + * SSE example). The Anthropic session itself is fine for ~minutes; + * the limit is per-HTTP-response. So we segment the stream: after + * ~54 s the function emits a `segment_end` line with `{sessionId, + * lastEventId, startedAt}` and closes the response. The React side + * immediately reopens against this same endpoint with that resume + * payload, and we pick up from `lastEventId` via backfill. To the + * user this looks like one continuous stream. + * + * ## Wire protocol (downstream to the browser): NDJSON + * + * {"type":"text","text":"...markdown..."} // append to brief * {"type":"status","kind":"thinking"} * {"type":"status","kind":"tool_use","name":"...","label":"..."} * {"type":"status","kind":"tool_result","ok":true} * {"type":"status","kind":"tool_result","ok":false,"message":"..."} * {"type":"status","kind":"session_error","message":"..."} - * {"type":"heartbeat"} // keep-alive only - * {"type":"done"} // session terminated cleanly - * {"type":"error","message":"..."} // unrecoverable + * {"type":"heartbeat"} // keep-alive + * {"type":"segment_end","sessionId":"sess_...", + * "lastEventId":"evt_...","startedAt":1234567890} // reopen me + * {"type":"done"} // terminal + * {"type":"error","message":"..."} // unrecoverable * - * Reconnect strategy: the upstream Anthropic SSE stream - * (`GET /v1/sessions/{id}/events/stream`) is not guaranteed to stay open - * for the full life of a multi-turn session — long inter-turn gaps (e.g. - * while MCP tools or the next model request are running) can trigger - * upstream / proxy read timeouts. When the upstream stream ends without - * a terminal `session.status_*` event we treat it as a drop, fetch any - * events we missed via the events-list endpoint (deduped by event id), - * reopen the live stream, and keep going — up to a wall-clock budget. + * ## Request payloads * - * We deliberately use plain `fetch()` against the Anthropic REST API - * rather than the `@anthropic-ai/sdk` npm package because npm support in - * Netlify Edge Functions is still in beta, and the SSE / list endpoints - * we need are easy to call directly. + * - New session (first call): + * {stationName, stationLocation, stationWebsite} + * - Resume (subsequent calls after a segment_end): + * {sessionId, lastEventId?, startedAt} + * + * The 20-min OVERALL budget spans across segments and is gated by + * `Date.now() - startedAt`. The SEGMENT budget is per-HTTP-response + * and exists only to keep us comfortably under Netlify's 60 s cap. */ -// All Managed Agents endpoints require this beta header. const MANAGED_AGENTS_BETA = 'managed-agents-2026-04-01' const ANTHROPIC_VERSION = '2023-06-01' const ANTHROPIC_API_BASE = 'https://api.anthropic.com' -// Downstream keep-alive cadence. The browser's fetch reader will time -// out / appear stuck if no bytes flow for too long; we emit a -// `{"type":"heartbeat"}` line when the upstream is quiet. +// Downstream keep-alive cadence. The browser's fetch reader appears +// stuck if no bytes flow for too long. const HEARTBEAT_MS = 10_000 -// Hard ceiling on total wall-clock time the function will keep -// reconnecting upstream on behalf of a single browser request. Past this -// we emit an `error` line and close the stream so the user can retry -// rather than having the request hang indefinitely. -const RECONNECT_BUDGET_MS = 20 * 60 * 1000 // 20 minutes +// Close the current response just before Netlify's empirical ~60 s +// streaming cap. Leaves a few seconds for a final `segment_end` write +// + `controller.close()` to flush cleanly. +const SEGMENT_BUDGET_MS = 54_000 -// Short backoff between an upstream drop and the next reconnect attempt, -// so a tight error loop can't hammer the API. +// Total wall-clock allowance across ALL segments for a single user +// brief. Past this we emit a final `error` line and stop reopening. +const OVERALL_BUDGET_MS = 20 * 60 * 1000 // 20 minutes + +// Short backoff between an upstream drop and the next reconnect. const RECONNECT_BACKOFF_MS = 500 // --------------------------------------------------------------------------- -// Types (kept loose — Anthropic Managed Agents events are large and we -// only inspect a small subset of fields). +// Types // --------------------------------------------------------------------------- + type ContentBlock = { type?: string; text?: string } type AgentEvent = { @@ -88,6 +97,18 @@ type EventsListPage = { last_id?: string } +type NewSessionPayload = { + stationName?: string + stationLocation?: string + stationWebsite?: string +} + +type ResumePayload = { + sessionId: string + lastEventId?: string + startedAt?: number +} + // --------------------------------------------------------------------------- // Helpers // --------------------------------------------------------------------------- @@ -100,11 +121,11 @@ function authHeaders(apiKey: string): Record { } } -// A `session.status_idle` with `stop_reason: requires_action` means the -// agent is waiting on a client-side response (custom tool result, tool -// confirmation). For this app we don't expose custom tools, so it should -// never fire — but if it ever does we explicitly do NOT treat it as -// terminal and let the loop keep tailing. +// A `session.status_idle` with `stop_reason: requires_action` means +// the agent is waiting on a client-side response (custom tool result, +// tool confirmation). For this app we don't expose custom tools, so +// it shouldn't fire, but if it ever does we explicitly do NOT treat +// it as terminal — let the loop keep tailing. function isTerminal(event: AgentEvent | undefined): boolean { if (!event) return false if (event.type === 'session.status_terminated') return true @@ -151,8 +172,8 @@ async function anthropicFetch( return await fetch(`${ANTHROPIC_API_BASE}${path}`, { ...init, headers }) } -// Parse the SSE stream into an async iterable of decoded `AgentEvent` -// objects. Each SSE message is `event: \ndata: \n\n`. +// Parse the SSE stream into an async iterable of decoded events. +// Each SSE message is `event: \ndata: \n\n`. async function* parseSse( stream: ReadableStream, ): AsyncGenerator { @@ -160,58 +181,61 @@ async function* parseSse( const decoder = new TextDecoder('utf-8') let buffer = '' - while (true) { - const { value, done } = await reader.read() - if (done) break - buffer += decoder.decode(value, { stream: true }) + try { + while (true) { + const { value, done } = await reader.read() + if (done) break + buffer += decoder.decode(value, { stream: true }) - // SSE events are separated by a blank line (`\n\n`). Process each - // complete event. - let sep - while ((sep = buffer.indexOf('\n\n')) !== -1) { - const raw = buffer.slice(0, sep) - buffer = buffer.slice(sep + 2) + let sep + while ((sep = buffer.indexOf('\n\n')) !== -1) { + const raw = buffer.slice(0, sep) + buffer = buffer.slice(sep + 2) - let dataPayload = '' - let evType: string | undefined - for (const line of raw.split('\n')) { - if (line.startsWith('event:')) { - evType = line.slice(6).trim() - } else if (line.startsWith('data:')) { - dataPayload += line.slice(5).trim() + let dataPayload = '' + let evType: string | undefined + for (const line of raw.split('\n')) { + if (line.startsWith('event:')) { + evType = line.slice(6).trim() + } else if (line.startsWith('data:')) { + dataPayload += line.slice(5).trim() + } + } + if (!dataPayload) continue + try { + const parsed = JSON.parse(dataPayload) as AgentEvent + if (evType && !parsed.type) parsed.type = evType + yield parsed + } catch { + // Malformed event — skip. } - // Anthropic doesn't multi-line `data:`, but tolerate it just in - // case — multiple `data:` lines are concatenated per the SSE - // spec. - } - if (!dataPayload) continue - try { - const parsed = JSON.parse(dataPayload) as AgentEvent - // `event:` header takes precedence; fall back to the `type` - // field inside `data` (Anthropic sets both). - if (evType && !parsed.type) parsed.type = evType - yield parsed - } catch { - // Malformed event — skip. } } + } finally { + try { + reader.releaseLock() + } catch { + /* ignore */ + } } } -// Page through `GET /v1/sessions/{id}/events` to backfill anything we -// missed while disconnected. The endpoint returns `{data, has_more, -// last_id}`; we keep paging while `has_more` is true. +// Page through `GET /v1/sessions/{id}/events` to backfill anything +// we missed. Accepts an `after` id so we only fetch new events. async function* listAllEvents( sessionId: string, apiKey: string, + after: string | undefined, + signal?: AbortSignal, ): AsyncGenerator { - let after: string | undefined + let cursor = after for (;;) { const params = new URLSearchParams({ order: 'asc', limit: '100' }) - if (after) params.set('after_id', after) + if (cursor) params.set('after_id', cursor) const res = await anthropicFetch( `/v1/sessions/${sessionId}/events?${params.toString()}`, apiKey, + { signal }, ) if (!res.ok) { throw new Error( @@ -223,11 +247,49 @@ async function* listAllEvents( for (const ev of items) yield ev if (!page.has_more) break const lastId = page.last_id ?? items[items.length - 1]?.id - if (!lastId) break // can't paginate without an anchor - after = lastId + if (!lastId) break + cursor = lastId } } +async function openStream( + sessionId: string, + apiKey: string, + signal: AbortSignal, +): Promise> { + const res = await anthropicFetch( + `/v1/sessions/${sessionId}/events/stream`, + apiKey, + { + method: 'GET', + headers: { accept: 'text/event-stream' }, + signal, + }, + ) + if (!res.ok || !res.body) { + throw new Error( + `events.stream returned ${res.status} ${res.statusText}`, + ) + } + return res.body +} + +// `setTimeout` that resolves early on abort. +function abortableSleep(ms: number, signal: AbortSignal): Promise { + return new Promise((resolve) => { + const t = setTimeout(resolve, ms) + const onAbort = () => { + clearTimeout(t) + resolve() + } + if (signal.aborted) { + onAbort() + return + } + signal.addEventListener('abort', onAbort, { once: true }) + }) +} + // --------------------------------------------------------------------------- // Handler // --------------------------------------------------------------------------- @@ -240,48 +302,74 @@ export default async (req: Request, _context: Context) => { return new Response('Method Not Allowed', { status: 405 }) } - let payload: { - stationName?: string - stationLocation?: string - stationWebsite?: string - } + let payload: NewSessionPayload & Partial try { payload = await req.json() } catch { return new Response('Invalid JSON body', { status: 400 }) } - const { - stationName = '', - stationLocation = '', - stationWebsite = '', - } = payload || {} - - if (!URL_REGEX.test(stationWebsite)) { - return new Response('Invalid station website URL', { status: 400 }) - } - const apiKey = Netlify.env.get('ANTHROPIC_API_KEY') - const AGENT_ID = Netlify.env.get('ANTHROPIC_AGENT_ID') - const ENVIRONMENT_ID = Netlify.env.get('ANTHROPIC_ENVIRONMENT_ID') - const VAULT_IDS = (Netlify.env.get('ANTHROPIC_VAULT_IDS') || '') - .split(',') - .map((s) => s.trim()) - .filter(Boolean) - if (!apiKey) { - return new Response('Server is missing ANTHROPIC_API_KEY', { status: 500 }) - } - const missing: string[] = [] - if (!AGENT_ID) missing.push('ANTHROPIC_AGENT_ID') - if (!ENVIRONMENT_ID) missing.push('ANTHROPIC_ENVIRONMENT_ID') - if (missing.length) { - return new Response(`Server is missing env var(s): ${missing.join(', ')}`, { + return new Response('Server is missing ANTHROPIC_API_KEY', { status: 500, }) } - const userMessage = `Here is a new public media station intake: + const isResume = + typeof payload.sessionId === 'string' && payload.sessionId.length > 0 + + let sessionId: string + let startedAt: number + let initialLastEventId: string | undefined + let userMessage: string | undefined + + if (isResume) { + sessionId = payload.sessionId as string + startedAt = + typeof payload.startedAt === 'number' ? payload.startedAt : Date.now() + initialLastEventId = + typeof payload.lastEventId === 'string' && payload.lastEventId + ? payload.lastEventId + : undefined + + if (Date.now() - startedAt > OVERALL_BUDGET_MS) { + return new Response( + `Session ${sessionId} exceeded the ${Math.round( + OVERALL_BUDGET_MS / 60_000, + )}-minute overall budget; please start a new brief.`, + { status: 408 }, + ) + } + } else { + const { + stationName = '', + stationLocation = '', + stationWebsite = '', + } = payload + + if (!URL_REGEX.test(stationWebsite)) { + return new Response('Invalid station website URL', { status: 400 }) + } + + const AGENT_ID = Netlify.env.get('ANTHROPIC_AGENT_ID') + const ENVIRONMENT_ID = Netlify.env.get('ANTHROPIC_ENVIRONMENT_ID') + const VAULT_IDS = (Netlify.env.get('ANTHROPIC_VAULT_IDS') || '') + .split(',') + .map((s) => s.trim()) + .filter(Boolean) + + const missing: string[] = [] + if (!AGENT_ID) missing.push('ANTHROPIC_AGENT_ID') + if (!ENVIRONMENT_ID) missing.push('ANTHROPIC_ENVIRONMENT_ID') + if (missing.length) { + return new Response( + `Server is missing env var(s): ${missing.join(', ')}`, + { status: 500 }, + ) + } + + userMessage = `Here is a new public media station intake: - Station Name: ${stationName} - Station Location: ${stationLocation} @@ -289,106 +377,49 @@ export default async (req: Request, _context: Context) => { Please follow your instructions to produce the funding outlook brief.` - // ----- Create the session ----- - const createBody: Record = { - agent: AGENT_ID, - environment_id: ENVIRONMENT_ID, - } - if (VAULT_IDS.length) createBody.vault_ids = VAULT_IDS - - let sessionId: string - try { - const createRes = await anthropicFetch('/v1/sessions', apiKey, { - method: 'POST', - headers: { 'content-type': 'application/json' }, - body: JSON.stringify(createBody), - }) - if (!createRes.ok) { - const text = await createRes.text() - return new Response( - `Failed to create session: ${createRes.status} ${text}`, - { status: 500 }, - ) + const createBody: Record = { + agent: AGENT_ID, + environment_id: ENVIRONMENT_ID, } - const created = (await createRes.json()) as { id?: string } - if (!created.id) { - return new Response('Failed to create session: no id returned', { - status: 500, - }) - } - sessionId = created.id - } catch (err) { - return new Response( - `Failed to create session: ${(err as Error)?.message || err}`, - { status: 500 }, - ) - } + if (VAULT_IDS.length) createBody.vault_ids = VAULT_IDS - // ----- Open the event stream BEFORE sending the user message ----- - // (Stream-first ensures we don't miss any events the agent emits.) - const openStream = async (): Promise> => { - const res = await anthropicFetch( - `/v1/sessions/${sessionId}/events/stream`, - apiKey, - { method: 'GET', headers: { accept: 'text/event-stream' } }, - ) - if (!res.ok || !res.body) { - throw new Error( - `events.stream returned ${res.status} ${res.statusText}`, - ) - } - return res.body - } - - let upstream: ReadableStream - try { - upstream = await openStream() - } catch (err) { - return new Response( - `Failed to open session event stream: ${(err as Error)?.message || err}`, - { status: 500 }, - ) - } - - // ----- Send the kickoff user.message event ----- - try { - const sendRes = await anthropicFetch( - `/v1/sessions/${sessionId}/events`, - apiKey, - { + try { + const createRes = await anthropicFetch('/v1/sessions', apiKey, { method: 'POST', headers: { 'content-type': 'application/json' }, - body: JSON.stringify({ - events: [ - { - type: 'user.message', - content: [{ type: 'text', text: userMessage }], - }, - ], - }), - }, - ) - if (!sendRes.ok) { - const text = await sendRes.text() + body: JSON.stringify(createBody), + }) + if (!createRes.ok) { + const text = await createRes.text() + return new Response( + `Failed to create session: ${createRes.status} ${text}`, + { status: 500 }, + ) + } + const created = (await createRes.json()) as { id?: string } + if (!created.id) { + return new Response('Failed to create session: no id returned', { + status: 500, + }) + } + sessionId = created.id + startedAt = Date.now() + } catch (err) { return new Response( - `Failed to send user message: ${sendRes.status} ${text}`, + `Failed to create session: ${(err as Error)?.message || err}`, { status: 500 }, ) } - } catch (err) { - return new Response( - `Failed to send user message: ${(err as Error)?.message || err}`, - { status: 500 }, - ) } const encoder = new TextEncoder() const seenEventIds = new Set() - const deadline = Date.now() + RECONNECT_BUDGET_MS const body = new ReadableStream({ async start(controller) { let lastSendAt = Date.now() + let lastEventIdSeen = initialLastEventId + const writeJson = (obj: unknown) => { try { controller.enqueue(encoder.encode(JSON.stringify(obj) + '\n')) @@ -404,6 +435,20 @@ Please follow your instructions to produce the funding outlook brief.` } }, HEARTBEAT_MS / 2) + // Single AbortController for this segment. The segment timer + // calls .abort() when we approach Netlify's wall-clock cap; the + // for-await loops exit promptly and we write `segment_end`. + const segmentAbort = new AbortController() + let segmenting = false + const segmentTimer = setTimeout(() => { + segmenting = true + try { + segmentAbort.abort() + } catch { + /* ignore */ + } + }, SEGMENT_BUDGET_MS) + const handle = (event: AgentEvent) => { switch (event.type) { case 'agent.message': { @@ -416,12 +461,10 @@ Please follow your instructions to produce the funding outlook brief.` } break } - case 'agent.thinking': { writeJson({ type: 'status', kind: 'thinking' }) break } - case 'agent.tool_use': case 'agent.mcp_tool_use': { writeJson({ @@ -432,7 +475,6 @@ Please follow your instructions to produce the funding outlook brief.` }) break } - case 'agent.tool_result': case 'agent.mcp_tool_result': { if (event.is_error) { @@ -450,7 +492,6 @@ Please follow your instructions to produce the funding outlook brief.` } break } - case 'session.error': { const msg = event.error?.message || 'unknown session error' writeJson({ @@ -463,74 +504,160 @@ Please follow your instructions to produce the funding outlook brief.` } } + // Brand new session: send the kickoff user.message BEFORE we + // start tailing. (For resume, the user.message was sent on the + // first segment, so we never re-send it.) + if (!isResume && userMessage) { + try { + const sendRes = await anthropicFetch( + `/v1/sessions/${sessionId}/events`, + apiKey, + { + method: 'POST', + headers: { 'content-type': 'application/json' }, + body: JSON.stringify({ + events: [ + { + type: 'user.message', + content: [{ type: 'text', text: userMessage }], + }, + ], + }), + }, + ) + if (!sendRes.ok) { + const text = await sendRes.text() + writeJson({ + type: 'error', + message: `Failed to send user message: ${sendRes.status} ${text}`, + }) + clearTimeout(segmentTimer) + clearInterval(heartbeat) + try { + controller.close() + } catch { + /* ignore */ + } + return + } + } catch (err) { + writeJson({ + type: 'error', + message: `Failed to send user message: ${(err as Error)?.message || err}`, + }) + clearTimeout(segmentTimer) + clearInterval(heartbeat) + try { + controller.close() + } catch { + /* ignore */ + } + return + } + } + let done = false - let currentStream: ReadableStream = upstream let iteration = 0 try { - while (!done) { + while (!done && !segmenting) { iteration++ - // On reconnect (every iteration after the first), reopen the - // live stream and backfill anything we missed during the gap. if (iteration > 1) { - if (Date.now() >= deadline) { - writeJson({ - type: 'error', - message: `Stream reconnect budget (${Math.round( - RECONNECT_BUDGET_MS / 60_000, - )} min) exhausted. The session may still be running — try again or check the Console.`, - }) - break - } + await abortableSleep(RECONNECT_BACKOFF_MS, segmentAbort.signal) + if (segmenting) break + } - await new Promise((r) => setTimeout(r, RECONNECT_BACKOFF_MS)) + if (Date.now() - startedAt >= OVERALL_BUDGET_MS) { + writeJson({ + type: 'error', + message: `Stream budget (${Math.round( + OVERALL_BUDGET_MS / 60_000, + )} min) exhausted. The session may still be running — try again or check the Console.`, + }) + break + } - try { - currentStream = await openStream() - } catch (err) { - writeJson({ - type: 'error', - message: `Failed to reopen event stream: ${(err as Error)?.message || err}`, - }) - break - } + // Open the live stream FIRST so any events emitted while + // we're backfilling are still captured on the wire. + let upstream: ReadableStream + try { + upstream = await openStream( + sessionId, + apiKey, + segmentAbort.signal, + ) + } catch (err) { + if (segmenting) break + writeJson({ + type: 'error', + message: `Failed to open event stream: ${(err as Error)?.message || err}`, + }) + break + } - // Backfill: pull everything the session has emitted so far - // and dedupe by event.id. - try { - for await (const event of listAllEvents(sessionId, apiKey)) { - if (event.id && !seenEventIds.has(event.id)) { + // Backfill anything we missed since `lastEventIdSeen`. + // Dedupes happen by event id; terminal events still flip + // `done` even if we've seen the id before. + try { + for await (const event of listAllEvents( + sessionId, + apiKey, + lastEventIdSeen, + segmentAbort.signal, + )) { + if (segmenting) break + if (event.id) { + if (!seenEventIds.has(event.id)) { seenEventIds.add(event.id) + lastEventIdSeen = event.id handle(event) - } - // Terminal checks must run even for already-seen events, - // or a terminal event that came in via the backfill gets - // skipped and the loop never exits. - if (isTerminal(event)) { - done = true - break + } else { + // Already handled in a previous iteration — but + // keep the cursor moving forward. + lastEventIdSeen = event.id } } - } catch (err) { + if (isTerminal(event)) { + done = true + break + } + } + } catch (err) { + if (!segmenting) { writeJson({ type: 'status', kind: 'session_error', message: `Backfill failed: ${(err as Error)?.message || err}`, }) - // Don't break — fall through and try the live stream. + // Fall through and try the live stream. } - - if (done) break } - // Tail the currently-open live stream until it ends, errors, - // or hits a terminal session event. + if (done || segmenting) { + // Drain the live-stream body we opened above so we don't + // leak the connection. + try { + await upstream.cancel() + } catch { + /* ignore */ + } + break + } + + // Tail the live stream until upstream EOFs (drop), the + // segment timer fires, or a terminal event arrives. try { - for await (const event of parseSse(currentStream)) { - if (event.id && !seenEventIds.has(event.id)) { - seenEventIds.add(event.id) - handle(event) + for await (const event of parseSse(upstream)) { + if (segmenting) break + if (event.id) { + if (!seenEventIds.has(event.id)) { + seenEventIds.add(event.id) + lastEventIdSeen = event.id + handle(event) + } else { + lastEventIdSeen = event.id + } } if (isTerminal(event)) { done = true @@ -538,14 +665,21 @@ Please follow your instructions to produce the funding outlook brief.` } } } catch { - // Treat as transient. If it's actually persistent, the - // reopen + backfill on the next iteration will surface it - // (or burn down the reconnect budget cleanly). + // Most commonly: AbortError from segmentAbort. Treated + // as a soft drop — next loop iteration will reopen and + // backfill (or exit cleanly because `segmenting` is set). } } if (done) { writeJson({ type: 'done' }) + } else if (segmenting) { + writeJson({ + type: 'segment_end', + sessionId, + lastEventId: lastEventIdSeen, + startedAt, + }) } } catch (err) { writeJson({ @@ -553,6 +687,7 @@ Please follow your instructions to produce the funding outlook brief.` message: (err as Error)?.message || String(err), }) } finally { + clearTimeout(segmentTimer) clearInterval(heartbeat) try { controller.close() diff --git a/src/App.jsx b/src/App.jsx index d05329a..86e38cc 100644 --- a/src/App.jsx +++ b/src/App.jsx @@ -48,33 +48,50 @@ export default function App() { return Object.keys(next).length === 0 } - const handleEvent = (msg) => { - switch (msg?.type) { - case 'text': - if (typeof msg.text === 'string' && msg.text) { - setResult((prev) => prev + msg.text) - } - break - case 'status': - setStatus(msg) - break - case 'error': - case 'session_error': - setStreamError(msg.message || 'streaming error') - break - case 'done': - case 'heartbeat': - default: - break - } - } - - // Parse the NDJSON stream from /agent-proxy: one JSON object per line. + // Parse the NDJSON stream from /api/agent-proxy: one JSON object per + // line. Returns a resume payload `{sessionId, lastEventId, startedAt}` + // if the server emitted a `segment_end` (Netlify Edge caps a single + // streaming response at ~60 s, so longer briefs span multiple + // segments). Returns null when the stream ends cleanly (`done`, + // `error`, or upstream EOF without a segment_end). const readStream = async (response) => { const reader = response.body.getReader() const decoder = new TextDecoder('utf-8') let buffer = '' let firstByte = true + let segmentEnd = null + let fatalError = null + + const handleEvent = (msg) => { + switch (msg?.type) { + case 'text': + if (typeof msg.text === 'string' && msg.text) { + setResult((prev) => prev + msg.text) + } + break + case 'status': + setStatus(msg) + break + case 'segment_end': + if (typeof msg.sessionId === 'string' && msg.sessionId) { + segmentEnd = { + sessionId: msg.sessionId, + lastEventId: msg.lastEventId, + startedAt: msg.startedAt, + } + } + break + case 'error': + case 'session_error': + fatalError = msg.message || 'streaming error' + setStreamError(fatalError) + break + case 'done': + case 'heartbeat': + default: + break + } + } const flushLines = () => { let nl @@ -112,8 +129,10 @@ export default function App() { /* ignore */ } } - setIsStreaming(false) - setStatus(null) + + // Only return a resume payload if we got a clean segment_end AND + // no fatal error fired in the same segment. + return fatalError ? null : segmentEnd } const handleSubmit = async (e) => { @@ -128,19 +147,37 @@ export default function App() { setIsStreaming(false) try { - const response = await fetch('/api/agent-proxy', { - method: 'POST', - headers: { 'Content-Type': 'application/json' }, - body: JSON.stringify(form), - }) + let resume = null + // Loop across segments. Each iteration is one HTTP request; + // the server closes it at ~54 s and the next iteration picks + // up from `lastEventId` so the user sees one continuous brief. + // Hard cap on iterations as a belt-and-braces guard against a + // runaway segment loop. + for (let i = 0; i < 40; i++) { + const body = resume + ? { + sessionId: resume.sessionId, + lastEventId: resume.lastEventId, + startedAt: resume.startedAt, + } + : form - if (!response.ok || !response.body) { - throw new Error( - `Request failed: ${response.status} ${response.statusText}`, - ) + const response = await fetch('/api/agent-proxy', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify(body), + }) + + if (!response.ok || !response.body) { + throw new Error( + `Request failed: ${response.status} ${response.statusText}`, + ) + } + + const next = await readStream(response) + if (!next) break + resume = next } - - await readStream(response) } catch (err) { console.error(err) setStreamError(err.message || 'Something went wrong while streaming.')