import type { Config, Context } from '@netlify/edge-functions' /** * Netlify Edge Function (Deno runtime). * * Proxies a request from the React frontend to a Claude Console-defined * Managed Agent. The agent's model, system prompt, tools, MCP servers * and skills are configured in the Console and referenced here by ID. * * ## Why this exists * * Two layered problems are in scope: * * 1. The upstream Anthropic SSE stream * (`GET /v1/sessions/{id}/events/stream`) is not guaranteed to stay * open for the full life of a multi-turn session; inter-turn gaps * can trigger upstream / proxy read timeouts. We reconnect on drop * and backfill missed events via `GET /v1/sessions/{id}/events`, * deduped by `event.id`. * * 2. Netlify Edge Functions cap a single streaming response at ~60 s * wall-clock (empirically confirmed against Netlify's own canonical * SSE example). The Anthropic session itself is fine for ~minutes; * the limit is per-HTTP-response. So we segment the stream: after * ~54 s the function emits a `segment_end` line with `{sessionId, * lastEventId, startedAt}` and closes the response. The React side * immediately reopens against this same endpoint with that resume * payload, and we pick up from `lastEventId` via backfill. To the * user this looks like one continuous stream. * * ## Wire protocol (downstream to the browser): NDJSON * * {"type":"text","text":"...markdown..."} // append to brief * {"type":"status","kind":"thinking"} * {"type":"status","kind":"tool_use","name":"...","label":"..."} * {"type":"status","kind":"tool_result","ok":true} * {"type":"status","kind":"tool_result","ok":false,"message":"..."} * {"type":"status","kind":"session_error","message":"..."} * {"type":"heartbeat"} // keep-alive * {"type":"segment_end","sessionId":"sess_...", * "lastEventId":"evt_...","startedAt":1234567890} // reopen me * {"type":"done"} // terminal * {"type":"error","message":"..."} // unrecoverable * * ## Request payloads * * - New session (first call): * {stationName, stationLocation, stationWebsite} * - Resume (subsequent calls after a segment_end): * {sessionId, lastEventId?, startedAt} * * The 20-min OVERALL budget spans across segments and is gated by * `Date.now() - startedAt`. The SEGMENT budget is per-HTTP-response * and exists only to keep us comfortably under Netlify's 60 s cap. */ const MANAGED_AGENTS_BETA = 'managed-agents-2026-04-01' const ANTHROPIC_VERSION = '2023-06-01' const ANTHROPIC_API_BASE = 'https://api.anthropic.com' // Downstream keep-alive cadence. The browser's fetch reader appears // stuck if no bytes flow for too long. We keep this tighter than the // segment budget so the connection stays warm even during long // model-thinking gaps with no agent events. const HEARTBEAT_MS = 5_000 // Close the current response well before Netlify's empirical streaming // cap. We've observed responses cut anywhere between ~49 s and ~60 s, // so a 54 s budget is too aggressive — sometimes the platform pulls // the rug out before our final `segment_end` write + `controller.close()` // can flush. 40 s gives ~10 s of headroom on the low end of the // observed range while still amortising session setup (~1 segment per // ~40 s of brief) across a typical 3-5 min brief. const SEGMENT_BUDGET_MS = 40_000 // Total wall-clock allowance across ALL segments for a single user // brief. Past this we emit a final `error` line and stop reopening. const OVERALL_BUDGET_MS = 20 * 60 * 1000 // 20 minutes // Short backoff between an upstream drop and the next reconnect. const RECONNECT_BACKOFF_MS = 500 // --------------------------------------------------------------------------- // Types // --------------------------------------------------------------------------- type ContentBlock = { type?: string; text?: string } type AgentEvent = { id?: string type?: string content?: ContentBlock[] name?: string input?: Record is_error?: boolean stop_reason?: { type?: string } error?: { message?: string } } type EventsListPage = { data?: AgentEvent[] next_page?: string } type NewSessionPayload = { stationName?: string stationLocation?: string stationWebsite?: string } type ResumePayload = { sessionId: string lastEventId?: string startedAt?: number } // --------------------------------------------------------------------------- // Helpers // --------------------------------------------------------------------------- function authHeaders(apiKey: string): Record { return { 'x-api-key': apiKey, 'anthropic-version': ANTHROPIC_VERSION, 'anthropic-beta': MANAGED_AGENTS_BETA, } } // A `session.status_idle` with `stop_reason: requires_action` means // the agent is waiting on a client-side response (custom tool result, // tool confirmation). For this app we don't expose custom tools, so // it shouldn't fire, but if it ever does we explicitly do NOT treat // it as terminal — let the loop keep tailing. function isTerminal(event: AgentEvent | undefined): boolean { if (!event) return false if (event.type === 'session.status_terminated') return true if (event.type === 'session.status_idle') { const t = event.stop_reason?.type return t === 'end_turn' || t === 'retries_exhausted' } return false } function describeToolUse(event: AgentEvent): string { const name = event.name || 'tool' const input = (event.input || {}) as Record if (name === 'web_search' && typeof input.query === 'string') { return `Searching the web: ${input.query}` } if (name === 'web_fetch' && typeof input.url === 'string') { return `Fetching: ${input.url}` } if (event.type === 'agent.mcp_tool_use') { const args = Object.entries(input) .filter(([k]) => k !== 'limit') .slice(0, 3) .map(([k, v]) => typeof v === 'object' && v !== null ? `${k}=…` : `${k}=${String(v).slice(0, 40)}`, ) .join(', ') return `${name}${args ? ` (${args})` : ''}` } return name } async function anthropicFetch( path: string, apiKey: string, init: RequestInit = {}, ): Promise { const headers = { ...authHeaders(apiKey), ...(init.headers as Record | undefined), } return await fetch(`${ANTHROPIC_API_BASE}${path}`, { ...init, headers }) } // Parse the SSE stream into an async iterable of decoded events. // Each SSE message is `event: \ndata: \n\n`. // // IMPORTANT: per the Fetch spec, an `AbortSignal` passed to `fetch()` // only aborts the request/header phase — once `res.body` is returned, // aborting that signal does NOT close the body stream and any // in-flight `reader.read()` will hang forever. So we attach our own // listener and `reader.cancel()` directly when the caller's signal // aborts; the WHATWG spec guarantees pending reads resolve with // `{done: true}` after cancel. async function* parseSse( stream: ReadableStream, signal?: AbortSignal, ): AsyncGenerator { const reader = stream.getReader() const decoder = new TextDecoder('utf-8') let buffer = '' const onAbort = () => { reader.cancel().catch(() => {}) } if (signal) { if (signal.aborted) onAbort() else signal.addEventListener('abort', onAbort, { once: true }) } try { while (true) { const { value, done } = await reader.read() if (done) break buffer += decoder.decode(value, { stream: true }) let sep while ((sep = buffer.indexOf('\n\n')) !== -1) { const raw = buffer.slice(0, sep) buffer = buffer.slice(sep + 2) let dataPayload = '' let evType: string | undefined for (const line of raw.split('\n')) { if (line.startsWith('event:')) { evType = line.slice(6).trim() } else if (line.startsWith('data:')) { dataPayload += line.slice(5).trim() } } if (!dataPayload) continue try { const parsed = JSON.parse(dataPayload) as AgentEvent if (evType && !parsed.type) parsed.type = evType yield parsed } catch { // Malformed event — skip. } } } } finally { if (signal) { signal.removeEventListener('abort', onAbort) } try { reader.releaseLock() } catch { /* ignore */ } } } // Page through `GET /v1/sessions/{id}/events` to backfill anything // we missed. The Managed Agents endpoint uses an opaque `page` cursor // returned as `next_page` on each response; there is no `after_id` // filter, so we always pull the full history and the caller is // responsible for skipping events it has already handled (via // `seenEventIds` and the resume-payload `lastEventId`). async function* listAllEvents( sessionId: string, apiKey: string, signal?: AbortSignal, ): AsyncGenerator { let pageCursor: string | undefined for (;;) { const params = new URLSearchParams({ order: 'asc', limit: '1000' }) if (pageCursor) params.set('page', pageCursor) const res = await anthropicFetch( `/v1/sessions/${sessionId}/events?${params.toString()}`, apiKey, { signal }, ) if (!res.ok) { throw new Error( `events.list returned ${res.status} ${res.statusText}`, ) } const page = (await res.json()) as EventsListPage const items = page.data || [] for (const ev of items) yield ev if (!page.next_page) break pageCursor = page.next_page } } async function openStream( sessionId: string, apiKey: string, signal: AbortSignal, ): Promise> { const res = await anthropicFetch( `/v1/sessions/${sessionId}/events/stream`, apiKey, { method: 'GET', headers: { accept: 'text/event-stream' }, signal, }, ) if (!res.ok || !res.body) { throw new Error( `events.stream returned ${res.status} ${res.statusText}`, ) } return res.body } // `setTimeout` that resolves early on abort. function abortableSleep(ms: number, signal: AbortSignal): Promise { return new Promise((resolve) => { const t = setTimeout(resolve, ms) const onAbort = () => { clearTimeout(t) resolve() } if (signal.aborted) { onAbort() return } signal.addEventListener('abort', onAbort, { once: true }) }) } // --------------------------------------------------------------------------- // Handler // --------------------------------------------------------------------------- const URL_REGEX = /^(https?:\/\/)?([\w-]+\.)+[\w-]{2,}(\/[\w\-._~:/?#[\]@!$&'()*+,;=%]*)?$/i export default async (req: Request, _context: Context) => { if (req.method !== 'POST') { return new Response('Method Not Allowed', { status: 405 }) } let payload: NewSessionPayload & Partial try { payload = await req.json() } catch { return new Response('Invalid JSON body', { status: 400 }) } const apiKey = Netlify.env.get('ANTHROPIC_API_KEY') if (!apiKey) { return new Response('Server is missing ANTHROPIC_API_KEY', { status: 500, }) } const isResume = typeof payload.sessionId === 'string' && payload.sessionId.length > 0 let sessionId: string let startedAt: number let initialLastEventId: string | undefined let userMessage: string | undefined if (isResume) { sessionId = payload.sessionId as string startedAt = typeof payload.startedAt === 'number' ? payload.startedAt : Date.now() initialLastEventId = typeof payload.lastEventId === 'string' && payload.lastEventId ? payload.lastEventId : undefined if (Date.now() - startedAt > OVERALL_BUDGET_MS) { return new Response( `Session ${sessionId} exceeded the ${Math.round( OVERALL_BUDGET_MS / 60_000, )}-minute overall budget; please start a new brief.`, { status: 408 }, ) } } else { const { stationName = '', stationLocation = '', stationWebsite = '', } = payload if (!URL_REGEX.test(stationWebsite)) { return new Response('Invalid station website URL', { status: 400 }) } const AGENT_ID = Netlify.env.get('ANTHROPIC_AGENT_ID') const ENVIRONMENT_ID = Netlify.env.get('ANTHROPIC_ENVIRONMENT_ID') const VAULT_IDS = (Netlify.env.get('ANTHROPIC_VAULT_IDS') || '') .split(',') .map((s) => s.trim()) .filter(Boolean) const missing: string[] = [] if (!AGENT_ID) missing.push('ANTHROPIC_AGENT_ID') if (!ENVIRONMENT_ID) missing.push('ANTHROPIC_ENVIRONMENT_ID') if (missing.length) { return new Response( `Server is missing env var(s): ${missing.join(', ')}`, { status: 500 }, ) } userMessage = `Here is a new public media station intake: - Station Name: ${stationName} - Station Location: ${stationLocation} - Station Website: ${stationWebsite} Please follow your instructions to produce the funding outlook brief.` const createBody: Record = { agent: AGENT_ID, environment_id: ENVIRONMENT_ID, } if (VAULT_IDS.length) createBody.vault_ids = VAULT_IDS try { const createRes = await anthropicFetch('/v1/sessions', apiKey, { method: 'POST', headers: { 'content-type': 'application/json' }, body: JSON.stringify(createBody), }) if (!createRes.ok) { const text = await createRes.text() return new Response( `Failed to create session: ${createRes.status} ${text}`, { status: 500 }, ) } const created = (await createRes.json()) as { id?: string } if (!created.id) { return new Response('Failed to create session: no id returned', { status: 500, }) } sessionId = created.id startedAt = Date.now() } catch (err) { return new Response( `Failed to create session: ${(err as Error)?.message || err}`, { status: 500 }, ) } } const encoder = new TextEncoder() const seenEventIds = new Set() const body = new ReadableStream({ async start(controller) { let lastSendAt = Date.now() let lastEventIdSeen = initialLastEventId // For resumes we replay the full session history via // `listAllEvents` but mute every event up to and including // `initialLastEventId` (the user has already seen those). Once // we've cleared that cursor (or on a brand-new session where it // is undefined), we start actually delivering events. let pastInitialId = !initialLastEventId const writeJson = (obj: unknown) => { try { controller.enqueue(encoder.encode(JSON.stringify(obj) + '\n')) lastSendAt = Date.now() } catch { /* controller closed */ } } const heartbeat = setInterval(() => { if (Date.now() - lastSendAt >= HEARTBEAT_MS) { writeJson({ type: 'heartbeat' }) } }, HEARTBEAT_MS / 2) // Single AbortController for this segment. The segment timer // calls .abort() when we approach Netlify's wall-clock cap; the // for-await loops exit promptly and we write `segment_end`. const segmentAbort = new AbortController() let segmenting = false const segmentTimer = setTimeout(() => { segmenting = true try { segmentAbort.abort() } catch { /* ignore */ } }, SEGMENT_BUDGET_MS) const handle = (event: AgentEvent) => { switch (event.type) { case 'agent.message': { if (Array.isArray(event.content)) { for (const block of event.content) { if (block?.type === 'text' && block.text) { writeJson({ type: 'text', text: block.text + '\n\n' }) } } } break } case 'agent.thinking': { writeJson({ type: 'status', kind: 'thinking' }) break } case 'agent.tool_use': case 'agent.mcp_tool_use': { writeJson({ type: 'status', kind: 'tool_use', name: event.name || 'tool', label: describeToolUse(event), }) break } case 'agent.tool_result': case 'agent.mcp_tool_result': { if (event.is_error) { const msg = (Array.isArray(event.content) && event.content[0]?.text) || 'tool error' writeJson({ type: 'status', kind: 'tool_result', ok: false, message: String(msg).slice(0, 300), }) } else { writeJson({ type: 'status', kind: 'tool_result', ok: true }) } break } case 'session.error': { const msg = event.error?.message || 'unknown session error' writeJson({ type: 'status', kind: 'session_error', message: msg, }) break } } } // Brand new session: send the kickoff user.message BEFORE we // start tailing. (For resume, the user.message was sent on the // first segment, so we never re-send it.) if (!isResume && userMessage) { try { const sendRes = await anthropicFetch( `/v1/sessions/${sessionId}/events`, apiKey, { method: 'POST', headers: { 'content-type': 'application/json' }, body: JSON.stringify({ events: [ { type: 'user.message', content: [{ type: 'text', text: userMessage }], }, ], }), }, ) if (!sendRes.ok) { const text = await sendRes.text() writeJson({ type: 'error', message: `Failed to send user message: ${sendRes.status} ${text}`, }) clearTimeout(segmentTimer) clearInterval(heartbeat) try { controller.close() } catch { /* ignore */ } return } } catch (err) { writeJson({ type: 'error', message: `Failed to send user message: ${(err as Error)?.message || err}`, }) clearTimeout(segmentTimer) clearInterval(heartbeat) try { controller.close() } catch { /* ignore */ } return } } let done = false let iteration = 0 try { while (!done && !segmenting) { iteration++ if (iteration > 1) { await abortableSleep(RECONNECT_BACKOFF_MS, segmentAbort.signal) if (segmenting) break } if (Date.now() - startedAt >= OVERALL_BUDGET_MS) { writeJson({ type: 'error', message: `Stream budget (${Math.round( OVERALL_BUDGET_MS / 60_000, )} min) exhausted. The session may still be running — try again or check the Console.`, }) break } // Open the live stream FIRST so any events emitted while // we're backfilling are still captured on the wire. let upstream: ReadableStream try { upstream = await openStream( sessionId, apiKey, segmentAbort.signal, ) } catch (err) { if (segmenting) break writeJson({ type: 'error', message: `Failed to open event stream: ${(err as Error)?.message || err}`, }) break } // Backfill anything we missed. The events endpoint does // NOT support `after_id`, so we always pull the full session // history and skip everything up to and including // `initialLastEventId` (the cursor handed to us by the // previous segment via the resume payload). Within this // segment, `seenEventIds` dedupes against the live stream. try { for await (const event of listAllEvents( sessionId, apiKey, segmentAbort.signal, )) { if (segmenting) break if (event.id) { if (!pastInitialId) { // Already delivered to the user in a previous // segment — mark seen so the live stream doesn't // re-emit it, but do NOT call handle(). seenEventIds.add(event.id) lastEventIdSeen = event.id if (event.id === initialLastEventId) { pastInitialId = true } } else if (!seenEventIds.has(event.id)) { seenEventIds.add(event.id) lastEventIdSeen = event.id handle(event) } else { // Already handled earlier in this segment — keep // the cursor moving forward. lastEventIdSeen = event.id } } if (isTerminal(event) && pastInitialId) { done = true break } } } catch (err) { if (!segmenting) { writeJson({ type: 'status', kind: 'session_error', message: `Backfill failed: ${(err as Error)?.message || err}`, }) // Fall through and try the live stream. } } // If the cursor handed to us by the previous segment didn't // appear in the session's history (stale / corrupted / // history truncated), don't get stuck muting events forever // — start delivering whatever shows up next. if (!pastInitialId) { pastInitialId = true } if (done || segmenting) { // The upstream fetch is already cleaned up by // `segmentAbort.abort()` (or will be by the runtime once // we drop our reference). Don't `await upstream.cancel()` // here — on an already-aborted body that call can hang // on Deno Edge, which would prevent us from writing the // final `segment_end` line. break } // Tail the live stream until upstream EOFs (drop), the // segment timer fires, or a terminal event arrives. try { for await (const event of parseSse( upstream, segmentAbort.signal, )) { if (segmenting) break if (event.id) { if (!seenEventIds.has(event.id)) { seenEventIds.add(event.id) lastEventIdSeen = event.id handle(event) } else { lastEventIdSeen = event.id } } if (isTerminal(event)) { done = true break } } } catch { // Most commonly: AbortError from segmentAbort. Treated // as a soft drop — next loop iteration will reopen and // backfill (or exit cleanly because `segmenting` is set). } } if (done) { writeJson({ type: 'done' }) } else if (segmenting) { writeJson({ type: 'segment_end', sessionId, lastEventId: lastEventIdSeen, startedAt, }) } } catch (err) { writeJson({ type: 'error', message: (err as Error)?.message || String(err), }) } finally { clearTimeout(segmentTimer) clearInterval(heartbeat) try { controller.close() } catch { /* ignore */ } } }, }) return new Response(body, { status: 200, headers: { 'Content-Type': 'application/x-ndjson; charset=utf-8', 'Cache-Control': 'no-cache, no-transform', 'X-Accel-Buffering': 'no', }, }) } export const config: Config = { path: '/api/agent-proxy', }