Files
pmc-funder-tool/netlify/edge-functions/agent-proxy.ts
T
Devin AI ed0069ed89 fix(agent-proxy): cancel upstream reader on abort so segment timer can actually interrupt parseSse
Root cause of "segment_end never written, response just runs until
Netlify kills it": the Fetch spec only honors AbortSignal for the
request/header phase. Once you have `res.body` and start reading
from it, aborting that same signal does NOT close the body stream —
any in-flight `reader.read()` hangs indefinitely.

So when the segment timer fired and called segmentAbort.abort(), the
running `for await (const event of parseSse(upstream))` stayed
blocked on a `reader.read()` that never resolved. The while loop
never exited, segment_end was never written, controller.close() was
never called, and Netlify eventually pulled the rug somewhere
between ~49 s and ~60 s in.

Fix: parseSse now takes an optional AbortSignal and, when it fires,
calls `reader.cancel()` directly. The WHATWG ReadableStream spec
guarantees that any pending read() resolves with `{done: true}`
after cancel, which lets the for-await exit promptly and the body
loop re-evaluate `while (!done && !segmenting)` → break →
writeJson(segment_end) → controller.close(). Confirmed in the curl
trace: heartbeats now stop at the budget mark and segment_end fires
before Netlify's cap.

Co-Authored-By: alex <alex@semipublic.co>
2026-05-13 13:20:19 +00:00

767 lines
24 KiB
TypeScript

import type { Config, Context } from '@netlify/edge-functions'
/**
* Netlify Edge Function (Deno runtime).
*
* Proxies a request from the React frontend to a Claude Console-defined
* Managed Agent. The agent's model, system prompt, tools, MCP servers
* and skills are configured in the Console and referenced here by ID.
*
* ## Why this exists
*
* Two layered problems are in scope:
*
* 1. The upstream Anthropic SSE stream
* (`GET /v1/sessions/{id}/events/stream`) is not guaranteed to stay
* open for the full life of a multi-turn session; inter-turn gaps
* can trigger upstream / proxy read timeouts. We reconnect on drop
* and backfill missed events via `GET /v1/sessions/{id}/events`,
* deduped by `event.id`.
*
* 2. Netlify Edge Functions cap a single streaming response at ~60 s
* wall-clock (empirically confirmed against Netlify's own canonical
* SSE example). The Anthropic session itself is fine for ~minutes;
* the limit is per-HTTP-response. So we segment the stream: after
* ~54 s the function emits a `segment_end` line with `{sessionId,
* lastEventId, startedAt}` and closes the response. The React side
* immediately reopens against this same endpoint with that resume
* payload, and we pick up from `lastEventId` via backfill. To the
* user this looks like one continuous stream.
*
* ## Wire protocol (downstream to the browser): NDJSON
*
* {"type":"text","text":"...markdown..."} // append to brief
* {"type":"status","kind":"thinking"}
* {"type":"status","kind":"tool_use","name":"...","label":"..."}
* {"type":"status","kind":"tool_result","ok":true}
* {"type":"status","kind":"tool_result","ok":false,"message":"..."}
* {"type":"status","kind":"session_error","message":"..."}
* {"type":"heartbeat"} // keep-alive
* {"type":"segment_end","sessionId":"sess_...",
* "lastEventId":"evt_...","startedAt":1234567890} // reopen me
* {"type":"done"} // terminal
* {"type":"error","message":"..."} // unrecoverable
*
* ## Request payloads
*
* - New session (first call):
* {stationName, stationLocation, stationWebsite}
* - Resume (subsequent calls after a segment_end):
* {sessionId, lastEventId?, startedAt}
*
* The 20-min OVERALL budget spans across segments and is gated by
* `Date.now() - startedAt`. The SEGMENT budget is per-HTTP-response
* and exists only to keep us comfortably under Netlify's 60 s cap.
*/
const MANAGED_AGENTS_BETA = 'managed-agents-2026-04-01'
const ANTHROPIC_VERSION = '2023-06-01'
const ANTHROPIC_API_BASE = 'https://api.anthropic.com'
// Downstream keep-alive cadence. The browser's fetch reader appears
// stuck if no bytes flow for too long. We keep this tighter than the
// segment budget so the connection stays warm even during long
// model-thinking gaps with no agent events.
const HEARTBEAT_MS = 5_000
// Close the current response well before Netlify's empirical streaming
// cap. We've observed responses cut anywhere between ~49 s and ~60 s,
// so a 54 s budget is too aggressive — sometimes the platform pulls
// the rug out before our final `segment_end` write + `controller.close()`
// can flush. 40 s gives ~10 s of headroom on the low end of the
// observed range while still amortising session setup (~1 segment per
// ~40 s of brief) across a typical 3-5 min brief.
const SEGMENT_BUDGET_MS = 40_000
// Total wall-clock allowance across ALL segments for a single user
// brief. Past this we emit a final `error` line and stop reopening.
const OVERALL_BUDGET_MS = 20 * 60 * 1000 // 20 minutes
// Short backoff between an upstream drop and the next reconnect.
const RECONNECT_BACKOFF_MS = 500
// ---------------------------------------------------------------------------
// Types
// ---------------------------------------------------------------------------
type ContentBlock = { type?: string; text?: string }
type AgentEvent = {
id?: string
type?: string
content?: ContentBlock[]
name?: string
input?: Record<string, unknown>
is_error?: boolean
stop_reason?: { type?: string }
error?: { message?: string }
}
type EventsListPage = {
data?: AgentEvent[]
next_page?: string
}
type NewSessionPayload = {
stationName?: string
stationLocation?: string
stationWebsite?: string
}
type ResumePayload = {
sessionId: string
lastEventId?: string
startedAt?: number
}
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
function authHeaders(apiKey: string): Record<string, string> {
return {
'x-api-key': apiKey,
'anthropic-version': ANTHROPIC_VERSION,
'anthropic-beta': MANAGED_AGENTS_BETA,
}
}
// A `session.status_idle` with `stop_reason: requires_action` means
// the agent is waiting on a client-side response (custom tool result,
// tool confirmation). For this app we don't expose custom tools, so
// it shouldn't fire, but if it ever does we explicitly do NOT treat
// it as terminal — let the loop keep tailing.
function isTerminal(event: AgentEvent | undefined): boolean {
if (!event) return false
if (event.type === 'session.status_terminated') return true
if (event.type === 'session.status_idle') {
const t = event.stop_reason?.type
return t === 'end_turn' || t === 'retries_exhausted'
}
return false
}
function describeToolUse(event: AgentEvent): string {
const name = event.name || 'tool'
const input = (event.input || {}) as Record<string, unknown>
if (name === 'web_search' && typeof input.query === 'string') {
return `Searching the web: ${input.query}`
}
if (name === 'web_fetch' && typeof input.url === 'string') {
return `Fetching: ${input.url}`
}
if (event.type === 'agent.mcp_tool_use') {
const args = Object.entries(input)
.filter(([k]) => k !== 'limit')
.slice(0, 3)
.map(([k, v]) =>
typeof v === 'object' && v !== null
? `${k}=…`
: `${k}=${String(v).slice(0, 40)}`,
)
.join(', ')
return `${name}${args ? ` (${args})` : ''}`
}
return name
}
async function anthropicFetch(
path: string,
apiKey: string,
init: RequestInit = {},
): Promise<Response> {
const headers = {
...authHeaders(apiKey),
...(init.headers as Record<string, string> | undefined),
}
return await fetch(`${ANTHROPIC_API_BASE}${path}`, { ...init, headers })
}
// Parse the SSE stream into an async iterable of decoded events.
// Each SSE message is `event: <type>\ndata: <json>\n\n`.
//
// IMPORTANT: per the Fetch spec, an `AbortSignal` passed to `fetch()`
// only aborts the request/header phase — once `res.body` is returned,
// aborting that signal does NOT close the body stream and any
// in-flight `reader.read()` will hang forever. So we attach our own
// listener and `reader.cancel()` directly when the caller's signal
// aborts; the WHATWG spec guarantees pending reads resolve with
// `{done: true}` after cancel.
async function* parseSse(
stream: ReadableStream<Uint8Array>,
signal?: AbortSignal,
): AsyncGenerator<AgentEvent> {
const reader = stream.getReader()
const decoder = new TextDecoder('utf-8')
let buffer = ''
const onAbort = () => {
reader.cancel().catch(() => {})
}
if (signal) {
if (signal.aborted) onAbort()
else signal.addEventListener('abort', onAbort, { once: true })
}
try {
while (true) {
const { value, done } = await reader.read()
if (done) break
buffer += decoder.decode(value, { stream: true })
let sep
while ((sep = buffer.indexOf('\n\n')) !== -1) {
const raw = buffer.slice(0, sep)
buffer = buffer.slice(sep + 2)
let dataPayload = ''
let evType: string | undefined
for (const line of raw.split('\n')) {
if (line.startsWith('event:')) {
evType = line.slice(6).trim()
} else if (line.startsWith('data:')) {
dataPayload += line.slice(5).trim()
}
}
if (!dataPayload) continue
try {
const parsed = JSON.parse(dataPayload) as AgentEvent
if (evType && !parsed.type) parsed.type = evType
yield parsed
} catch {
// Malformed event — skip.
}
}
}
} finally {
if (signal) {
signal.removeEventListener('abort', onAbort)
}
try {
reader.releaseLock()
} catch {
/* ignore */
}
}
}
// Page through `GET /v1/sessions/{id}/events` to backfill anything
// we missed. The Managed Agents endpoint uses an opaque `page` cursor
// returned as `next_page` on each response; there is no `after_id`
// filter, so we always pull the full history and the caller is
// responsible for skipping events it has already handled (via
// `seenEventIds` and the resume-payload `lastEventId`).
async function* listAllEvents(
sessionId: string,
apiKey: string,
signal?: AbortSignal,
): AsyncGenerator<AgentEvent> {
let pageCursor: string | undefined
for (;;) {
const params = new URLSearchParams({ order: 'asc', limit: '1000' })
if (pageCursor) params.set('page', pageCursor)
const res = await anthropicFetch(
`/v1/sessions/${sessionId}/events?${params.toString()}`,
apiKey,
{ signal },
)
if (!res.ok) {
throw new Error(
`events.list returned ${res.status} ${res.statusText}`,
)
}
const page = (await res.json()) as EventsListPage
const items = page.data || []
for (const ev of items) yield ev
if (!page.next_page) break
pageCursor = page.next_page
}
}
async function openStream(
sessionId: string,
apiKey: string,
signal: AbortSignal,
): Promise<ReadableStream<Uint8Array>> {
const res = await anthropicFetch(
`/v1/sessions/${sessionId}/events/stream`,
apiKey,
{
method: 'GET',
headers: { accept: 'text/event-stream' },
signal,
},
)
if (!res.ok || !res.body) {
throw new Error(
`events.stream returned ${res.status} ${res.statusText}`,
)
}
return res.body
}
// `setTimeout` that resolves early on abort.
function abortableSleep(ms: number, signal: AbortSignal): Promise<void> {
return new Promise<void>((resolve) => {
const t = setTimeout(resolve, ms)
const onAbort = () => {
clearTimeout(t)
resolve()
}
if (signal.aborted) {
onAbort()
return
}
signal.addEventListener('abort', onAbort, { once: true })
})
}
// ---------------------------------------------------------------------------
// Handler
// ---------------------------------------------------------------------------
const URL_REGEX =
/^(https?:\/\/)?([\w-]+\.)+[\w-]{2,}(\/[\w\-._~:/?#[\]@!$&'()*+,;=%]*)?$/i
export default async (req: Request, _context: Context) => {
if (req.method !== 'POST') {
return new Response('Method Not Allowed', { status: 405 })
}
let payload: NewSessionPayload & Partial<ResumePayload>
try {
payload = await req.json()
} catch {
return new Response('Invalid JSON body', { status: 400 })
}
const apiKey = Netlify.env.get('ANTHROPIC_API_KEY')
if (!apiKey) {
return new Response('Server is missing ANTHROPIC_API_KEY', {
status: 500,
})
}
const isResume =
typeof payload.sessionId === 'string' && payload.sessionId.length > 0
let sessionId: string
let startedAt: number
let initialLastEventId: string | undefined
let userMessage: string | undefined
if (isResume) {
sessionId = payload.sessionId as string
startedAt =
typeof payload.startedAt === 'number' ? payload.startedAt : Date.now()
initialLastEventId =
typeof payload.lastEventId === 'string' && payload.lastEventId
? payload.lastEventId
: undefined
if (Date.now() - startedAt > OVERALL_BUDGET_MS) {
return new Response(
`Session ${sessionId} exceeded the ${Math.round(
OVERALL_BUDGET_MS / 60_000,
)}-minute overall budget; please start a new brief.`,
{ status: 408 },
)
}
} else {
const {
stationName = '',
stationLocation = '',
stationWebsite = '',
} = payload
if (!URL_REGEX.test(stationWebsite)) {
return new Response('Invalid station website URL', { status: 400 })
}
const AGENT_ID = Netlify.env.get('ANTHROPIC_AGENT_ID')
const ENVIRONMENT_ID = Netlify.env.get('ANTHROPIC_ENVIRONMENT_ID')
const VAULT_IDS = (Netlify.env.get('ANTHROPIC_VAULT_IDS') || '')
.split(',')
.map((s) => s.trim())
.filter(Boolean)
const missing: string[] = []
if (!AGENT_ID) missing.push('ANTHROPIC_AGENT_ID')
if (!ENVIRONMENT_ID) missing.push('ANTHROPIC_ENVIRONMENT_ID')
if (missing.length) {
return new Response(
`Server is missing env var(s): ${missing.join(', ')}`,
{ status: 500 },
)
}
userMessage = `Here is a new public media station intake:
- Station Name: ${stationName}
- Station Location: ${stationLocation}
- Station Website: ${stationWebsite}
Please follow your instructions to produce the funding outlook brief.`
const createBody: Record<string, unknown> = {
agent: AGENT_ID,
environment_id: ENVIRONMENT_ID,
}
if (VAULT_IDS.length) createBody.vault_ids = VAULT_IDS
try {
const createRes = await anthropicFetch('/v1/sessions', apiKey, {
method: 'POST',
headers: { 'content-type': 'application/json' },
body: JSON.stringify(createBody),
})
if (!createRes.ok) {
const text = await createRes.text()
return new Response(
`Failed to create session: ${createRes.status} ${text}`,
{ status: 500 },
)
}
const created = (await createRes.json()) as { id?: string }
if (!created.id) {
return new Response('Failed to create session: no id returned', {
status: 500,
})
}
sessionId = created.id
startedAt = Date.now()
} catch (err) {
return new Response(
`Failed to create session: ${(err as Error)?.message || err}`,
{ status: 500 },
)
}
}
const encoder = new TextEncoder()
const seenEventIds = new Set<string>()
const body = new ReadableStream<Uint8Array>({
async start(controller) {
let lastSendAt = Date.now()
let lastEventIdSeen = initialLastEventId
// For resumes we replay the full session history via
// `listAllEvents` but mute every event up to and including
// `initialLastEventId` (the user has already seen those). Once
// we've cleared that cursor (or on a brand-new session where it
// is undefined), we start actually delivering events.
let pastInitialId = !initialLastEventId
const writeJson = (obj: unknown) => {
try {
controller.enqueue(encoder.encode(JSON.stringify(obj) + '\n'))
lastSendAt = Date.now()
} catch {
/* controller closed */
}
}
const heartbeat = setInterval(() => {
if (Date.now() - lastSendAt >= HEARTBEAT_MS) {
writeJson({ type: 'heartbeat' })
}
}, HEARTBEAT_MS / 2)
// Single AbortController for this segment. The segment timer
// calls .abort() when we approach Netlify's wall-clock cap; the
// for-await loops exit promptly and we write `segment_end`.
const segmentAbort = new AbortController()
let segmenting = false
const segmentTimer = setTimeout(() => {
segmenting = true
try {
segmentAbort.abort()
} catch {
/* ignore */
}
}, SEGMENT_BUDGET_MS)
const handle = (event: AgentEvent) => {
switch (event.type) {
case 'agent.message': {
if (Array.isArray(event.content)) {
for (const block of event.content) {
if (block?.type === 'text' && block.text) {
writeJson({ type: 'text', text: block.text + '\n\n' })
}
}
}
break
}
case 'agent.thinking': {
writeJson({ type: 'status', kind: 'thinking' })
break
}
case 'agent.tool_use':
case 'agent.mcp_tool_use': {
writeJson({
type: 'status',
kind: 'tool_use',
name: event.name || 'tool',
label: describeToolUse(event),
})
break
}
case 'agent.tool_result':
case 'agent.mcp_tool_result': {
if (event.is_error) {
const msg =
(Array.isArray(event.content) && event.content[0]?.text) ||
'tool error'
writeJson({
type: 'status',
kind: 'tool_result',
ok: false,
message: String(msg).slice(0, 300),
})
} else {
writeJson({ type: 'status', kind: 'tool_result', ok: true })
}
break
}
case 'session.error': {
const msg = event.error?.message || 'unknown session error'
writeJson({
type: 'status',
kind: 'session_error',
message: msg,
})
break
}
}
}
// Brand new session: send the kickoff user.message BEFORE we
// start tailing. (For resume, the user.message was sent on the
// first segment, so we never re-send it.)
if (!isResume && userMessage) {
try {
const sendRes = await anthropicFetch(
`/v1/sessions/${sessionId}/events`,
apiKey,
{
method: 'POST',
headers: { 'content-type': 'application/json' },
body: JSON.stringify({
events: [
{
type: 'user.message',
content: [{ type: 'text', text: userMessage }],
},
],
}),
},
)
if (!sendRes.ok) {
const text = await sendRes.text()
writeJson({
type: 'error',
message: `Failed to send user message: ${sendRes.status} ${text}`,
})
clearTimeout(segmentTimer)
clearInterval(heartbeat)
try {
controller.close()
} catch {
/* ignore */
}
return
}
} catch (err) {
writeJson({
type: 'error',
message: `Failed to send user message: ${(err as Error)?.message || err}`,
})
clearTimeout(segmentTimer)
clearInterval(heartbeat)
try {
controller.close()
} catch {
/* ignore */
}
return
}
}
let done = false
let iteration = 0
try {
while (!done && !segmenting) {
iteration++
if (iteration > 1) {
await abortableSleep(RECONNECT_BACKOFF_MS, segmentAbort.signal)
if (segmenting) break
}
if (Date.now() - startedAt >= OVERALL_BUDGET_MS) {
writeJson({
type: 'error',
message: `Stream budget (${Math.round(
OVERALL_BUDGET_MS / 60_000,
)} min) exhausted. The session may still be running — try again or check the Console.`,
})
break
}
// Open the live stream FIRST so any events emitted while
// we're backfilling are still captured on the wire.
let upstream: ReadableStream<Uint8Array>
try {
upstream = await openStream(
sessionId,
apiKey,
segmentAbort.signal,
)
} catch (err) {
if (segmenting) break
writeJson({
type: 'error',
message: `Failed to open event stream: ${(err as Error)?.message || err}`,
})
break
}
// Backfill anything we missed. The events endpoint does
// NOT support `after_id`, so we always pull the full session
// history and skip everything up to and including
// `initialLastEventId` (the cursor handed to us by the
// previous segment via the resume payload). Within this
// segment, `seenEventIds` dedupes against the live stream.
try {
for await (const event of listAllEvents(
sessionId,
apiKey,
segmentAbort.signal,
)) {
if (segmenting) break
if (event.id) {
if (!pastInitialId) {
// Already delivered to the user in a previous
// segment — mark seen so the live stream doesn't
// re-emit it, but do NOT call handle().
seenEventIds.add(event.id)
lastEventIdSeen = event.id
if (event.id === initialLastEventId) {
pastInitialId = true
}
} else if (!seenEventIds.has(event.id)) {
seenEventIds.add(event.id)
lastEventIdSeen = event.id
handle(event)
} else {
// Already handled earlier in this segment — keep
// the cursor moving forward.
lastEventIdSeen = event.id
}
}
if (isTerminal(event) && pastInitialId) {
done = true
break
}
}
} catch (err) {
if (!segmenting) {
writeJson({
type: 'status',
kind: 'session_error',
message: `Backfill failed: ${(err as Error)?.message || err}`,
})
// Fall through and try the live stream.
}
}
// If the cursor handed to us by the previous segment didn't
// appear in the session's history (stale / corrupted /
// history truncated), don't get stuck muting events forever
// — start delivering whatever shows up next.
if (!pastInitialId) {
pastInitialId = true
}
if (done || segmenting) {
// The upstream fetch is already cleaned up by
// `segmentAbort.abort()` (or will be by the runtime once
// we drop our reference). Don't `await upstream.cancel()`
// here — on an already-aborted body that call can hang
// on Deno Edge, which would prevent us from writing the
// final `segment_end` line.
break
}
// Tail the live stream until upstream EOFs (drop), the
// segment timer fires, or a terminal event arrives.
try {
for await (const event of parseSse(
upstream,
segmentAbort.signal,
)) {
if (segmenting) break
if (event.id) {
if (!seenEventIds.has(event.id)) {
seenEventIds.add(event.id)
lastEventIdSeen = event.id
handle(event)
} else {
lastEventIdSeen = event.id
}
}
if (isTerminal(event)) {
done = true
break
}
}
} catch {
// Most commonly: AbortError from segmentAbort. Treated
// as a soft drop — next loop iteration will reopen and
// backfill (or exit cleanly because `segmenting` is set).
}
}
if (done) {
writeJson({ type: 'done' })
} else if (segmenting) {
writeJson({
type: 'segment_end',
sessionId,
lastEventId: lastEventIdSeen,
startedAt,
})
}
} catch (err) {
writeJson({
type: 'error',
message: (err as Error)?.message || String(err),
})
} finally {
clearTimeout(segmentTimer)
clearInterval(heartbeat)
try {
controller.close()
} catch {
/* ignore */
}
}
},
})
return new Response(body, {
status: 200,
headers: {
'Content-Type': 'application/x-ndjson; charset=utf-8',
'Cache-Control': 'no-cache, no-transform',
'X-Accel-Buffering': 'no',
},
})
}
export const config: Config = {
path: '/api/agent-proxy',
}