feat(agent-proxy): segment streaming responses at ~54s to bypass Netlify Edge's ~60s response cap

Netlify Edge Functions empirically cap a single streaming response at
~60s wall-clock, regardless of activity. Confirmed against Netlify's
own canonical SSE example (edge-functions-examples.netlify.app/sse)
which also cuts at +60.1s. The Anthropic Managed Agent session is
fine for several minutes; the cap is per-HTTP-response.

This commit splits a long brief across multiple HTTP responses, while
keeping the UX of one continuous stream:

netlify/edge-functions/agent-proxy.ts
- Accept either a new-session payload {stationName, stationLocation,
  stationWebsite} or a resume payload {sessionId, lastEventId,
  startedAt}.
- On resume, skip session creation + user.message send. Just reopen
  the live SSE stream and backfill via
  GET /v1/sessions/{id}/events?after_id=lastEventId (deduped by
  event.id), then keep tailing.
- Single AbortController per segment. A 54s timer aborts the upstream,
  the for-await loops exit, and we write one final NDJSON line:
  {type:'segment_end', sessionId, lastEventId, startedAt}.
- The 20-min OVERALL_BUDGET_MS is enforced via Date.now() - startedAt
  so it spans across all segments.
- Refactor main loop so every iteration is openStream + backfill +
  tail. Cleaner than the previous initial-stream + reconnect-only-on-
  drop pattern.

src/App.jsx
- readStream() now returns a {sessionId, lastEventId, startedAt}
  payload if it saw a segment_end, or null if the stream ended
  cleanly.
- handleSubmit() loops, reopening /api/agent-proxy with the resume
  payload until readStream returns null. Spinner/status state stays
  on across segments so the UI shows one continuous stream.

README.md
- Document the segmented-streaming protocol and why it exists.

Co-Authored-By: alex <alex@semipublic.co>
This commit is contained in:
Devin AI
2026-05-13 13:00:54 +00:00
parent d1c5be112e
commit 8e44de5271
3 changed files with 491 additions and 297 deletions
+32 -10
View File
@@ -34,27 +34,49 @@ Open http://localhost:8888.
1. The browser POSTs the form to `/api/agent-proxy`. 1. The browser POSTs the form to `/api/agent-proxy`.
2. A Netlify Edge Function (Deno runtime) creates an Anthropic Managed 2. A Netlify Edge Function (Deno runtime) creates an Anthropic Managed
Agent session, opens the upstream SSE event stream, and sends the Agent session, opens the upstream SSE event stream, and sends the
user message. It then tails the stream and re-opens it (with an user message. It tails the stream and re-opens it (with an
`events.list` backfill, deduped by event id) whenever it drops mid `events.list` backfill, deduped by event id) whenever it drops mid
session, until the session reports a terminal `session.status_*` session, until the session reports a terminal `session.status_*`
event or a 20-minute wall-clock budget is hit. event or a 20-minute wall-clock budget is hit.
3. Downstream to the browser the function emits newline-delimited JSON 3. Downstream to the browser the function emits newline-delimited JSON
(`application/x-ndjson`) — `text`, `status`, `heartbeat`, `done`, (`application/x-ndjson`) — `text`, `status`, `heartbeat`,
`error` — one object per line. `segment_end`, `done`, `error` — one object per line.
4. The React app reads `response.body.getReader()`, splits on `\n`, and 4. The React app reads `response.body.getReader()`, splits on `\n`, and
parses each line. `text` lines append to the brief; `status` lines parses each line. `text` lines append to the brief; `status` lines
drive a separate “what the agent is doing now” banner. drive a separate “what the agent is doing now” banner.
5. A `Thinking` flag stays `true` until the first chunk arrives, then 5. A `Thinking` flag stays `true` until the first chunk arrives, then
flips to a streaming state with a pulsing cursor. flips to a streaming state with a pulsing cursor.
Edge Functions are required here — the previous v2 Node Function ran on ### Why segmented streaming
AWS Lambda and got killed at ~27s, well before any reconnect could
fire. Edge Functions run on Deno Deploy with no streaming-duration cap Netlify Edge Functions empirically cap a single streaming response at
as long as the function keeps writing to the response body. ~60 s wall-clock (Netlify's own canonical SSE example cuts at the same
mark, even though the docs imply "indefinite" streaming). The Anthropic
Managed Agent session itself is fine for several minutes; the limit is
per-HTTP-response. So the proxy segments the stream:
- Just before ~54 s the function writes one final NDJSON line —
`{"type":"segment_end","sessionId":"sess_…","lastEventId":"evt_…","startedAt":…}`
and closes the response.
- The React side sees `segment_end`, immediately POSTs to
`/api/agent-proxy` again with `{sessionId, lastEventId, startedAt}`
in the body.
- The function recognises this as a resume payload: it does **not**
create a new session or re-send the user message. It just reopens
the live SSE stream, backfills via
`GET /v1/sessions/{id}/events?after_id=lastEventId` (deduped by
`event.id`), and keeps tailing.
The 20-minute overall budget is enforced via `Date.now() - startedAt`
so it spans across all segments. The previous v2 Node Function (on AWS
Lambda) was killed at ~27 s, well before any reconnect could fire —
moving to Edge Functions extended that to ~60 s per response, and the
segment loop extends it the rest of the way.
## Files ## Files
- [src/App.jsx](src/App.jsx) — form, NDJSON streaming reader, UI states. - [src/App.jsx](src/App.jsx) — form, NDJSON streaming reader,
segment-end resume loop, UI states.
- [netlify/edge-functions/agent-proxy.ts](netlify/edge-functions/agent-proxy.ts) - [netlify/edge-functions/agent-proxy.ts](netlify/edge-functions/agent-proxy.ts)
— Managed Agent proxy with reconnect, backfill, and NDJSON wire — Managed Agent proxy with reconnect, backfill, segmentation, and
protocol. NDJSON wire protocol.
+349 -214
View File
@@ -4,21 +4,31 @@ import type { Config, Context } from '@netlify/edge-functions'
* Netlify Edge Function (Deno runtime). * Netlify Edge Function (Deno runtime).
* *
* Proxies a request from the React frontend to a Claude Console-defined * Proxies a request from the React frontend to a Claude Console-defined
* Managed Agent via the `/v1/sessions` endpoints. The agent's model, * Managed Agent. The agent's model, system prompt, tools, MCP servers
* system prompt, tools, MCP servers, and skills are configured in the * and skills are configured in the Console and referenced here by ID.
* Console and referenced here by ID.
* *
* This replaces the v2 Node Function at `netlify/functions/agent-proxy.js`. * ## Why this exists
* The Node Function process was killed at ~27 s by the platform — well
* before any SSE reconnect could fire — so the brief always stopped after
* the first MCP tool batch. Edge Functions run on Deno Deploy with a
* 40 s response-header timeout but no streaming-duration cap as long as
* we keep writing to the body, so the 20-minute reconnect budget below
* is actually reachable.
* *
* Wire protocol (downstream to the browser): newline-delimited JSON * Two layered problems are in scope:
* (`application/x-ndjson`). One JSON object per line; the React side *
* parses incrementally as bytes arrive. Object shapes: * 1. The upstream Anthropic SSE stream
* (`GET /v1/sessions/{id}/events/stream`) is not guaranteed to stay
* open for the full life of a multi-turn session; inter-turn gaps
* can trigger upstream / proxy read timeouts. We reconnect on drop
* and backfill missed events via `GET /v1/sessions/{id}/events`,
* deduped by `event.id`.
*
* 2. Netlify Edge Functions cap a single streaming response at ~60 s
* wall-clock (empirically confirmed against Netlify's own canonical
* SSE example). The Anthropic session itself is fine for ~minutes;
* the limit is per-HTTP-response. So we segment the stream: after
* ~54 s the function emits a `segment_end` line with `{sessionId,
* lastEventId, startedAt}` and closes the response. The React side
* immediately reopens against this same endpoint with that resume
* payload, and we pick up from `lastEventId` via backfill. To the
* user this looks like one continuous stream.
*
* ## Wire protocol (downstream to the browser): NDJSON
* *
* {"type":"text","text":"...markdown..."} // append to brief * {"type":"text","text":"...markdown..."} // append to brief
* {"type":"status","kind":"thinking"} * {"type":"status","kind":"thinking"}
@@ -26,49 +36,48 @@ import type { Config, Context } from '@netlify/edge-functions'
* {"type":"status","kind":"tool_result","ok":true} * {"type":"status","kind":"tool_result","ok":true}
* {"type":"status","kind":"tool_result","ok":false,"message":"..."} * {"type":"status","kind":"tool_result","ok":false,"message":"..."}
* {"type":"status","kind":"session_error","message":"..."} * {"type":"status","kind":"session_error","message":"..."}
* {"type":"heartbeat"} // keep-alive only * {"type":"heartbeat"} // keep-alive
* {"type":"done"} // session terminated cleanly * {"type":"segment_end","sessionId":"sess_...",
* "lastEventId":"evt_...","startedAt":1234567890} // reopen me
* {"type":"done"} // terminal
* {"type":"error","message":"..."} // unrecoverable * {"type":"error","message":"..."} // unrecoverable
* *
* Reconnect strategy: the upstream Anthropic SSE stream * ## Request payloads
* (`GET /v1/sessions/{id}/events/stream`) is not guaranteed to stay open
* for the full life of a multi-turn session — long inter-turn gaps (e.g.
* while MCP tools or the next model request are running) can trigger
* upstream / proxy read timeouts. When the upstream stream ends without
* a terminal `session.status_*` event we treat it as a drop, fetch any
* events we missed via the events-list endpoint (deduped by event id),
* reopen the live stream, and keep going — up to a wall-clock budget.
* *
* We deliberately use plain `fetch()` against the Anthropic REST API * - New session (first call):
* rather than the `@anthropic-ai/sdk` npm package because npm support in * {stationName, stationLocation, stationWebsite}
* Netlify Edge Functions is still in beta, and the SSE / list endpoints * - Resume (subsequent calls after a segment_end):
* we need are easy to call directly. * {sessionId, lastEventId?, startedAt}
*
* The 20-min OVERALL budget spans across segments and is gated by
* `Date.now() - startedAt`. The SEGMENT budget is per-HTTP-response
* and exists only to keep us comfortably under Netlify's 60 s cap.
*/ */
// All Managed Agents endpoints require this beta header.
const MANAGED_AGENTS_BETA = 'managed-agents-2026-04-01' const MANAGED_AGENTS_BETA = 'managed-agents-2026-04-01'
const ANTHROPIC_VERSION = '2023-06-01' const ANTHROPIC_VERSION = '2023-06-01'
const ANTHROPIC_API_BASE = 'https://api.anthropic.com' const ANTHROPIC_API_BASE = 'https://api.anthropic.com'
// Downstream keep-alive cadence. The browser's fetch reader will time // Downstream keep-alive cadence. The browser's fetch reader appears
// out / appear stuck if no bytes flow for too long; we emit a // stuck if no bytes flow for too long.
// `{"type":"heartbeat"}` line when the upstream is quiet.
const HEARTBEAT_MS = 10_000 const HEARTBEAT_MS = 10_000
// Hard ceiling on total wall-clock time the function will keep // Close the current response just before Netlify's empirical ~60 s
// reconnecting upstream on behalf of a single browser request. Past this // streaming cap. Leaves a few seconds for a final `segment_end` write
// we emit an `error` line and close the stream so the user can retry // + `controller.close()` to flush cleanly.
// rather than having the request hang indefinitely. const SEGMENT_BUDGET_MS = 54_000
const RECONNECT_BUDGET_MS = 20 * 60 * 1000 // 20 minutes
// Short backoff between an upstream drop and the next reconnect attempt, // Total wall-clock allowance across ALL segments for a single user
// so a tight error loop can't hammer the API. // brief. Past this we emit a final `error` line and stop reopening.
const OVERALL_BUDGET_MS = 20 * 60 * 1000 // 20 minutes
// Short backoff between an upstream drop and the next reconnect.
const RECONNECT_BACKOFF_MS = 500 const RECONNECT_BACKOFF_MS = 500
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// Types (kept loose — Anthropic Managed Agents events are large and we // Types
// only inspect a small subset of fields).
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
type ContentBlock = { type?: string; text?: string } type ContentBlock = { type?: string; text?: string }
type AgentEvent = { type AgentEvent = {
@@ -88,6 +97,18 @@ type EventsListPage = {
last_id?: string last_id?: string
} }
type NewSessionPayload = {
stationName?: string
stationLocation?: string
stationWebsite?: string
}
type ResumePayload = {
sessionId: string
lastEventId?: string
startedAt?: number
}
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// Helpers // Helpers
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
@@ -100,11 +121,11 @@ function authHeaders(apiKey: string): Record<string, string> {
} }
} }
// A `session.status_idle` with `stop_reason: requires_action` means the // A `session.status_idle` with `stop_reason: requires_action` means
// agent is waiting on a client-side response (custom tool result, tool // the agent is waiting on a client-side response (custom tool result,
// confirmation). For this app we don't expose custom tools, so it should // tool confirmation). For this app we don't expose custom tools, so
// never fire but if it ever does we explicitly do NOT treat it as // it shouldn't fire, but if it ever does we explicitly do NOT treat
// terminal and let the loop keep tailing. // it as terminal let the loop keep tailing.
function isTerminal(event: AgentEvent | undefined): boolean { function isTerminal(event: AgentEvent | undefined): boolean {
if (!event) return false if (!event) return false
if (event.type === 'session.status_terminated') return true if (event.type === 'session.status_terminated') return true
@@ -151,8 +172,8 @@ async function anthropicFetch(
return await fetch(`${ANTHROPIC_API_BASE}${path}`, { ...init, headers }) return await fetch(`${ANTHROPIC_API_BASE}${path}`, { ...init, headers })
} }
// Parse the SSE stream into an async iterable of decoded `AgentEvent` // Parse the SSE stream into an async iterable of decoded events.
// objects. Each SSE message is `event: <type>\ndata: <json>\n\n`. // Each SSE message is `event: <type>\ndata: <json>\n\n`.
async function* parseSse( async function* parseSse(
stream: ReadableStream<Uint8Array>, stream: ReadableStream<Uint8Array>,
): AsyncGenerator<AgentEvent> { ): AsyncGenerator<AgentEvent> {
@@ -160,13 +181,12 @@ async function* parseSse(
const decoder = new TextDecoder('utf-8') const decoder = new TextDecoder('utf-8')
let buffer = '' let buffer = ''
try {
while (true) { while (true) {
const { value, done } = await reader.read() const { value, done } = await reader.read()
if (done) break if (done) break
buffer += decoder.decode(value, { stream: true }) buffer += decoder.decode(value, { stream: true })
// SSE events are separated by a blank line (`\n\n`). Process each
// complete event.
let sep let sep
while ((sep = buffer.indexOf('\n\n')) !== -1) { while ((sep = buffer.indexOf('\n\n')) !== -1) {
const raw = buffer.slice(0, sep) const raw = buffer.slice(0, sep)
@@ -180,15 +200,10 @@ async function* parseSse(
} else if (line.startsWith('data:')) { } else if (line.startsWith('data:')) {
dataPayload += line.slice(5).trim() dataPayload += line.slice(5).trim()
} }
// Anthropic doesn't multi-line `data:`, but tolerate it just in
// case — multiple `data:` lines are concatenated per the SSE
// spec.
} }
if (!dataPayload) continue if (!dataPayload) continue
try { try {
const parsed = JSON.parse(dataPayload) as AgentEvent const parsed = JSON.parse(dataPayload) as AgentEvent
// `event:` header takes precedence; fall back to the `type`
// field inside `data` (Anthropic sets both).
if (evType && !parsed.type) parsed.type = evType if (evType && !parsed.type) parsed.type = evType
yield parsed yield parsed
} catch { } catch {
@@ -196,22 +211,31 @@ async function* parseSse(
} }
} }
} }
} finally {
try {
reader.releaseLock()
} catch {
/* ignore */
}
}
} }
// Page through `GET /v1/sessions/{id}/events` to backfill anything we // Page through `GET /v1/sessions/{id}/events` to backfill anything
// missed while disconnected. The endpoint returns `{data, has_more, // we missed. Accepts an `after` id so we only fetch new events.
// last_id}`; we keep paging while `has_more` is true.
async function* listAllEvents( async function* listAllEvents(
sessionId: string, sessionId: string,
apiKey: string, apiKey: string,
after: string | undefined,
signal?: AbortSignal,
): AsyncGenerator<AgentEvent> { ): AsyncGenerator<AgentEvent> {
let after: string | undefined let cursor = after
for (;;) { for (;;) {
const params = new URLSearchParams({ order: 'asc', limit: '100' }) const params = new URLSearchParams({ order: 'asc', limit: '100' })
if (after) params.set('after_id', after) if (cursor) params.set('after_id', cursor)
const res = await anthropicFetch( const res = await anthropicFetch(
`/v1/sessions/${sessionId}/events?${params.toString()}`, `/v1/sessions/${sessionId}/events?${params.toString()}`,
apiKey, apiKey,
{ signal },
) )
if (!res.ok) { if (!res.ok) {
throw new Error( throw new Error(
@@ -223,11 +247,49 @@ async function* listAllEvents(
for (const ev of items) yield ev for (const ev of items) yield ev
if (!page.has_more) break if (!page.has_more) break
const lastId = page.last_id ?? items[items.length - 1]?.id const lastId = page.last_id ?? items[items.length - 1]?.id
if (!lastId) break // can't paginate without an anchor if (!lastId) break
after = lastId cursor = lastId
} }
} }
async function openStream(
sessionId: string,
apiKey: string,
signal: AbortSignal,
): Promise<ReadableStream<Uint8Array>> {
const res = await anthropicFetch(
`/v1/sessions/${sessionId}/events/stream`,
apiKey,
{
method: 'GET',
headers: { accept: 'text/event-stream' },
signal,
},
)
if (!res.ok || !res.body) {
throw new Error(
`events.stream returned ${res.status} ${res.statusText}`,
)
}
return res.body
}
// `setTimeout` that resolves early on abort.
function abortableSleep(ms: number, signal: AbortSignal): Promise<void> {
return new Promise<void>((resolve) => {
const t = setTimeout(resolve, ms)
const onAbort = () => {
clearTimeout(t)
resolve()
}
if (signal.aborted) {
onAbort()
return
}
signal.addEventListener('abort', onAbort, { once: true })
})
}
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// Handler // Handler
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
@@ -240,28 +302,56 @@ export default async (req: Request, _context: Context) => {
return new Response('Method Not Allowed', { status: 405 }) return new Response('Method Not Allowed', { status: 405 })
} }
let payload: { let payload: NewSessionPayload & Partial<ResumePayload>
stationName?: string
stationLocation?: string
stationWebsite?: string
}
try { try {
payload = await req.json() payload = await req.json()
} catch { } catch {
return new Response('Invalid JSON body', { status: 400 }) return new Response('Invalid JSON body', { status: 400 })
} }
const apiKey = Netlify.env.get('ANTHROPIC_API_KEY')
if (!apiKey) {
return new Response('Server is missing ANTHROPIC_API_KEY', {
status: 500,
})
}
const isResume =
typeof payload.sessionId === 'string' && payload.sessionId.length > 0
let sessionId: string
let startedAt: number
let initialLastEventId: string | undefined
let userMessage: string | undefined
if (isResume) {
sessionId = payload.sessionId as string
startedAt =
typeof payload.startedAt === 'number' ? payload.startedAt : Date.now()
initialLastEventId =
typeof payload.lastEventId === 'string' && payload.lastEventId
? payload.lastEventId
: undefined
if (Date.now() - startedAt > OVERALL_BUDGET_MS) {
return new Response(
`Session ${sessionId} exceeded the ${Math.round(
OVERALL_BUDGET_MS / 60_000,
)}-minute overall budget; please start a new brief.`,
{ status: 408 },
)
}
} else {
const { const {
stationName = '', stationName = '',
stationLocation = '', stationLocation = '',
stationWebsite = '', stationWebsite = '',
} = payload || {} } = payload
if (!URL_REGEX.test(stationWebsite)) { if (!URL_REGEX.test(stationWebsite)) {
return new Response('Invalid station website URL', { status: 400 }) return new Response('Invalid station website URL', { status: 400 })
} }
const apiKey = Netlify.env.get('ANTHROPIC_API_KEY')
const AGENT_ID = Netlify.env.get('ANTHROPIC_AGENT_ID') const AGENT_ID = Netlify.env.get('ANTHROPIC_AGENT_ID')
const ENVIRONMENT_ID = Netlify.env.get('ANTHROPIC_ENVIRONMENT_ID') const ENVIRONMENT_ID = Netlify.env.get('ANTHROPIC_ENVIRONMENT_ID')
const VAULT_IDS = (Netlify.env.get('ANTHROPIC_VAULT_IDS') || '') const VAULT_IDS = (Netlify.env.get('ANTHROPIC_VAULT_IDS') || '')
@@ -269,19 +359,17 @@ export default async (req: Request, _context: Context) => {
.map((s) => s.trim()) .map((s) => s.trim())
.filter(Boolean) .filter(Boolean)
if (!apiKey) {
return new Response('Server is missing ANTHROPIC_API_KEY', { status: 500 })
}
const missing: string[] = [] const missing: string[] = []
if (!AGENT_ID) missing.push('ANTHROPIC_AGENT_ID') if (!AGENT_ID) missing.push('ANTHROPIC_AGENT_ID')
if (!ENVIRONMENT_ID) missing.push('ANTHROPIC_ENVIRONMENT_ID') if (!ENVIRONMENT_ID) missing.push('ANTHROPIC_ENVIRONMENT_ID')
if (missing.length) { if (missing.length) {
return new Response(`Server is missing env var(s): ${missing.join(', ')}`, { return new Response(
status: 500, `Server is missing env var(s): ${missing.join(', ')}`,
}) { status: 500 },
)
} }
const userMessage = `Here is a new public media station intake: userMessage = `Here is a new public media station intake:
- Station Name: ${stationName} - Station Name: ${stationName}
- Station Location: ${stationLocation} - Station Location: ${stationLocation}
@@ -289,14 +377,12 @@ export default async (req: Request, _context: Context) => {
Please follow your instructions to produce the funding outlook brief.` Please follow your instructions to produce the funding outlook brief.`
// ----- Create the session -----
const createBody: Record<string, unknown> = { const createBody: Record<string, unknown> = {
agent: AGENT_ID, agent: AGENT_ID,
environment_id: ENVIRONMENT_ID, environment_id: ENVIRONMENT_ID,
} }
if (VAULT_IDS.length) createBody.vault_ids = VAULT_IDS if (VAULT_IDS.length) createBody.vault_ids = VAULT_IDS
let sessionId: string
try { try {
const createRes = await anthropicFetch('/v1/sessions', apiKey, { const createRes = await anthropicFetch('/v1/sessions', apiKey, {
method: 'POST', method: 'POST',
@@ -317,40 +403,111 @@ Please follow your instructions to produce the funding outlook brief.`
}) })
} }
sessionId = created.id sessionId = created.id
startedAt = Date.now()
} catch (err) { } catch (err) {
return new Response( return new Response(
`Failed to create session: ${(err as Error)?.message || err}`, `Failed to create session: ${(err as Error)?.message || err}`,
{ status: 500 }, { status: 500 },
) )
} }
// ----- Open the event stream BEFORE sending the user message -----
// (Stream-first ensures we don't miss any events the agent emits.)
const openStream = async (): Promise<ReadableStream<Uint8Array>> => {
const res = await anthropicFetch(
`/v1/sessions/${sessionId}/events/stream`,
apiKey,
{ method: 'GET', headers: { accept: 'text/event-stream' } },
)
if (!res.ok || !res.body) {
throw new Error(
`events.stream returned ${res.status} ${res.statusText}`,
)
}
return res.body
} }
let upstream: ReadableStream<Uint8Array> const encoder = new TextEncoder()
const seenEventIds = new Set<string>()
const body = new ReadableStream<Uint8Array>({
async start(controller) {
let lastSendAt = Date.now()
let lastEventIdSeen = initialLastEventId
const writeJson = (obj: unknown) => {
try { try {
upstream = await openStream() controller.enqueue(encoder.encode(JSON.stringify(obj) + '\n'))
} catch (err) { lastSendAt = Date.now()
return new Response( } catch {
`Failed to open session event stream: ${(err as Error)?.message || err}`, /* controller closed */
{ status: 500 }, }
)
} }
// ----- Send the kickoff user.message event ----- const heartbeat = setInterval(() => {
if (Date.now() - lastSendAt >= HEARTBEAT_MS) {
writeJson({ type: 'heartbeat' })
}
}, HEARTBEAT_MS / 2)
// Single AbortController for this segment. The segment timer
// calls .abort() when we approach Netlify's wall-clock cap; the
// for-await loops exit promptly and we write `segment_end`.
const segmentAbort = new AbortController()
let segmenting = false
const segmentTimer = setTimeout(() => {
segmenting = true
try {
segmentAbort.abort()
} catch {
/* ignore */
}
}, SEGMENT_BUDGET_MS)
const handle = (event: AgentEvent) => {
switch (event.type) {
case 'agent.message': {
if (Array.isArray(event.content)) {
for (const block of event.content) {
if (block?.type === 'text' && block.text) {
writeJson({ type: 'text', text: block.text + '\n\n' })
}
}
}
break
}
case 'agent.thinking': {
writeJson({ type: 'status', kind: 'thinking' })
break
}
case 'agent.tool_use':
case 'agent.mcp_tool_use': {
writeJson({
type: 'status',
kind: 'tool_use',
name: event.name || 'tool',
label: describeToolUse(event),
})
break
}
case 'agent.tool_result':
case 'agent.mcp_tool_result': {
if (event.is_error) {
const msg =
(Array.isArray(event.content) && event.content[0]?.text) ||
'tool error'
writeJson({
type: 'status',
kind: 'tool_result',
ok: false,
message: String(msg).slice(0, 300),
})
} else {
writeJson({ type: 'status', kind: 'tool_result', ok: true })
}
break
}
case 'session.error': {
const msg = event.error?.message || 'unknown session error'
writeJson({
type: 'status',
kind: 'session_error',
message: msg,
})
break
}
}
}
// Brand new session: send the kickoff user.message BEFORE we
// start tailing. (For resume, the user.message was sent on the
// first segment, so we never re-send it.)
if (!isResume && userMessage) {
try { try {
const sendRes = await anthropicFetch( const sendRes = await anthropicFetch(
`/v1/sessions/${sessionId}/events`, `/v1/sessions/${sessionId}/events`,
@@ -370,167 +527,137 @@ Please follow your instructions to produce the funding outlook brief.`
) )
if (!sendRes.ok) { if (!sendRes.ok) {
const text = await sendRes.text() const text = await sendRes.text()
return new Response( writeJson({
`Failed to send user message: ${sendRes.status} ${text}`, type: 'error',
{ status: 500 }, message: `Failed to send user message: ${sendRes.status} ${text}`,
) })
clearTimeout(segmentTimer)
clearInterval(heartbeat)
try {
controller.close()
} catch {
/* ignore */
}
return
} }
} catch (err) { } catch (err) {
return new Response( writeJson({
`Failed to send user message: ${(err as Error)?.message || err}`, type: 'error',
{ status: 500 }, message: `Failed to send user message: ${(err as Error)?.message || err}`,
) })
} clearTimeout(segmentTimer)
clearInterval(heartbeat)
const encoder = new TextEncoder()
const seenEventIds = new Set<string>()
const deadline = Date.now() + RECONNECT_BUDGET_MS
const body = new ReadableStream<Uint8Array>({
async start(controller) {
let lastSendAt = Date.now()
const writeJson = (obj: unknown) => {
try { try {
controller.enqueue(encoder.encode(JSON.stringify(obj) + '\n')) controller.close()
lastSendAt = Date.now()
} catch { } catch {
/* controller closed */ /* ignore */
}
}
const heartbeat = setInterval(() => {
if (Date.now() - lastSendAt >= HEARTBEAT_MS) {
writeJson({ type: 'heartbeat' })
}
}, HEARTBEAT_MS / 2)
const handle = (event: AgentEvent) => {
switch (event.type) {
case 'agent.message': {
if (Array.isArray(event.content)) {
for (const block of event.content) {
if (block?.type === 'text' && block.text) {
writeJson({ type: 'text', text: block.text + '\n\n' })
}
}
}
break
}
case 'agent.thinking': {
writeJson({ type: 'status', kind: 'thinking' })
break
}
case 'agent.tool_use':
case 'agent.mcp_tool_use': {
writeJson({
type: 'status',
kind: 'tool_use',
name: event.name || 'tool',
label: describeToolUse(event),
})
break
}
case 'agent.tool_result':
case 'agent.mcp_tool_result': {
if (event.is_error) {
const msg =
(Array.isArray(event.content) && event.content[0]?.text) ||
'tool error'
writeJson({
type: 'status',
kind: 'tool_result',
ok: false,
message: String(msg).slice(0, 300),
})
} else {
writeJson({ type: 'status', kind: 'tool_result', ok: true })
}
break
}
case 'session.error': {
const msg = event.error?.message || 'unknown session error'
writeJson({
type: 'status',
kind: 'session_error',
message: msg,
})
break
} }
return
} }
} }
let done = false let done = false
let currentStream: ReadableStream<Uint8Array> = upstream
let iteration = 0 let iteration = 0
try { try {
while (!done) { while (!done && !segmenting) {
iteration++ iteration++
// On reconnect (every iteration after the first), reopen the
// live stream and backfill anything we missed during the gap.
if (iteration > 1) { if (iteration > 1) {
if (Date.now() >= deadline) { await abortableSleep(RECONNECT_BACKOFF_MS, segmentAbort.signal)
if (segmenting) break
}
if (Date.now() - startedAt >= OVERALL_BUDGET_MS) {
writeJson({ writeJson({
type: 'error', type: 'error',
message: `Stream reconnect budget (${Math.round( message: `Stream budget (${Math.round(
RECONNECT_BUDGET_MS / 60_000, OVERALL_BUDGET_MS / 60_000,
)} min) exhausted. The session may still be running — try again or check the Console.`, )} min) exhausted. The session may still be running — try again or check the Console.`,
}) })
break break
} }
await new Promise((r) => setTimeout(r, RECONNECT_BACKOFF_MS)) // Open the live stream FIRST so any events emitted while
// we're backfilling are still captured on the wire.
let upstream: ReadableStream<Uint8Array>
try { try {
currentStream = await openStream() upstream = await openStream(
sessionId,
apiKey,
segmentAbort.signal,
)
} catch (err) { } catch (err) {
if (segmenting) break
writeJson({ writeJson({
type: 'error', type: 'error',
message: `Failed to reopen event stream: ${(err as Error)?.message || err}`, message: `Failed to open event stream: ${(err as Error)?.message || err}`,
}) })
break break
} }
// Backfill: pull everything the session has emitted so far // Backfill anything we missed since `lastEventIdSeen`.
// and dedupe by event.id. // Dedupes happen by event id; terminal events still flip
// `done` even if we've seen the id before.
try { try {
for await (const event of listAllEvents(sessionId, apiKey)) { for await (const event of listAllEvents(
if (event.id && !seenEventIds.has(event.id)) { sessionId,
apiKey,
lastEventIdSeen,
segmentAbort.signal,
)) {
if (segmenting) break
if (event.id) {
if (!seenEventIds.has(event.id)) {
seenEventIds.add(event.id) seenEventIds.add(event.id)
lastEventIdSeen = event.id
handle(event) handle(event)
} else {
// Already handled in a previous iteration — but
// keep the cursor moving forward.
lastEventIdSeen = event.id
}
} }
// Terminal checks must run even for already-seen events,
// or a terminal event that came in via the backfill gets
// skipped and the loop never exits.
if (isTerminal(event)) { if (isTerminal(event)) {
done = true done = true
break break
} }
} }
} catch (err) { } catch (err) {
if (!segmenting) {
writeJson({ writeJson({
type: 'status', type: 'status',
kind: 'session_error', kind: 'session_error',
message: `Backfill failed: ${(err as Error)?.message || err}`, message: `Backfill failed: ${(err as Error)?.message || err}`,
}) })
// Don't break — fall through and try the live stream. // Fall through and try the live stream.
}
} }
if (done) break if (done || segmenting) {
} // Drain the live-stream body we opened above so we don't
// leak the connection.
// Tail the currently-open live stream until it ends, errors,
// or hits a terminal session event.
try { try {
for await (const event of parseSse(currentStream)) { await upstream.cancel()
if (event.id && !seenEventIds.has(event.id)) { } catch {
/* ignore */
}
break
}
// Tail the live stream until upstream EOFs (drop), the
// segment timer fires, or a terminal event arrives.
try {
for await (const event of parseSse(upstream)) {
if (segmenting) break
if (event.id) {
if (!seenEventIds.has(event.id)) {
seenEventIds.add(event.id) seenEventIds.add(event.id)
lastEventIdSeen = event.id
handle(event) handle(event)
} else {
lastEventIdSeen = event.id
}
} }
if (isTerminal(event)) { if (isTerminal(event)) {
done = true done = true
@@ -538,14 +665,21 @@ Please follow your instructions to produce the funding outlook brief.`
} }
} }
} catch { } catch {
// Treat as transient. If it's actually persistent, the // Most commonly: AbortError from segmentAbort. Treated
// reopen + backfill on the next iteration will surface it // as a soft drop — next loop iteration will reopen and
// (or burn down the reconnect budget cleanly). // backfill (or exit cleanly because `segmenting` is set).
} }
} }
if (done) { if (done) {
writeJson({ type: 'done' }) writeJson({ type: 'done' })
} else if (segmenting) {
writeJson({
type: 'segment_end',
sessionId,
lastEventId: lastEventIdSeen,
startedAt,
})
} }
} catch (err) { } catch (err) {
writeJson({ writeJson({
@@ -553,6 +687,7 @@ Please follow your instructions to produce the funding outlook brief.`
message: (err as Error)?.message || String(err), message: (err as Error)?.message || String(err),
}) })
} finally { } finally {
clearTimeout(segmentTimer)
clearInterval(heartbeat) clearInterval(heartbeat)
try { try {
controller.close() controller.close()
+49 -12
View File
@@ -48,6 +48,20 @@ export default function App() {
return Object.keys(next).length === 0 return Object.keys(next).length === 0
} }
// Parse the NDJSON stream from /api/agent-proxy: one JSON object per
// line. Returns a resume payload `{sessionId, lastEventId, startedAt}`
// if the server emitted a `segment_end` (Netlify Edge caps a single
// streaming response at ~60 s, so longer briefs span multiple
// segments). Returns null when the stream ends cleanly (`done`,
// `error`, or upstream EOF without a segment_end).
const readStream = async (response) => {
const reader = response.body.getReader()
const decoder = new TextDecoder('utf-8')
let buffer = ''
let firstByte = true
let segmentEnd = null
let fatalError = null
const handleEvent = (msg) => { const handleEvent = (msg) => {
switch (msg?.type) { switch (msg?.type) {
case 'text': case 'text':
@@ -58,9 +72,19 @@ export default function App() {
case 'status': case 'status':
setStatus(msg) setStatus(msg)
break break
case 'segment_end':
if (typeof msg.sessionId === 'string' && msg.sessionId) {
segmentEnd = {
sessionId: msg.sessionId,
lastEventId: msg.lastEventId,
startedAt: msg.startedAt,
}
}
break
case 'error': case 'error':
case 'session_error': case 'session_error':
setStreamError(msg.message || 'streaming error') fatalError = msg.message || 'streaming error'
setStreamError(fatalError)
break break
case 'done': case 'done':
case 'heartbeat': case 'heartbeat':
@@ -69,13 +93,6 @@ export default function App() {
} }
} }
// Parse the NDJSON stream from /agent-proxy: one JSON object per line.
const readStream = async (response) => {
const reader = response.body.getReader()
const decoder = new TextDecoder('utf-8')
let buffer = ''
let firstByte = true
const flushLines = () => { const flushLines = () => {
let nl let nl
while ((nl = buffer.indexOf('\n')) !== -1) { while ((nl = buffer.indexOf('\n')) !== -1) {
@@ -112,8 +129,10 @@ export default function App() {
/* ignore */ /* ignore */
} }
} }
setIsStreaming(false)
setStatus(null) // Only return a resume payload if we got a clean segment_end AND
// no fatal error fired in the same segment.
return fatalError ? null : segmentEnd
} }
const handleSubmit = async (e) => { const handleSubmit = async (e) => {
@@ -128,10 +147,25 @@ export default function App() {
setIsStreaming(false) setIsStreaming(false)
try { try {
let resume = null
// Loop across segments. Each iteration is one HTTP request;
// the server closes it at ~54 s and the next iteration picks
// up from `lastEventId` so the user sees one continuous brief.
// Hard cap on iterations as a belt-and-braces guard against a
// runaway segment loop.
for (let i = 0; i < 40; i++) {
const body = resume
? {
sessionId: resume.sessionId,
lastEventId: resume.lastEventId,
startedAt: resume.startedAt,
}
: form
const response = await fetch('/api/agent-proxy', { const response = await fetch('/api/agent-proxy', {
method: 'POST', method: 'POST',
headers: { 'Content-Type': 'application/json' }, headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(form), body: JSON.stringify(body),
}) })
if (!response.ok || !response.body) { if (!response.ok || !response.body) {
@@ -140,7 +174,10 @@ export default function App() {
) )
} }
await readStream(response) const next = await readStream(response)
if (!next) break
resume = next
}
} catch (err) { } catch (err) {
console.error(err) console.error(err)
setStreamError(err.message || 'Something went wrong while streaming.') setStreamError(err.message || 'Something went wrong while streaming.')