migrate agent-proxy to Netlify Edge Function so long sessions stream end-to-end

The reconnect + events.list backfill in c283d88 is correct but never ran:
the previous v2 Node Function was killed at ~27 s (well before the 20 min
reconnect budget could matter), so streams always died after the first MCP
tool batch.

Move the proxy to a Netlify Edge Function (Deno runtime) which has no
streaming-duration cap as long as we keep writing to the response body.
Same reconnect / backfill / dedupe-by-event-id pattern; same NDJSON wire
protocol to the browser. Implemented with plain fetch() against the
Anthropic REST API (npm packages on Edge are beta) so we have no SDK
runtime dependency.

Frontend now POSTs to /api/agent-proxy. The Anthropic SDK is removed
from the package; @netlify/edge-functions is added for ambient types.

Co-Authored-By: alex <alex@semipublic.co>
This commit is contained in:
Devin AI
2026-05-13 12:40:31 +00:00
parent c283d884cf
commit d1c5be112e
6 changed files with 1984 additions and 196 deletions
+25 -12
View File
@@ -2,7 +2,7 @@
A React + Vite frontend (Tailwind, Noto Serif) that collects public media
station details and streams an Anthropic Managed Agent response back to the
browser through a Netlify Functions v2 proxy.
browser through a Netlify Edge Function proxy.
## Setup
@@ -31,17 +31,30 @@ Open http://localhost:8888.
## How streaming works
1. The browser POSTs the form to `/.netlify/functions/agent-proxy`.
2. The Netlify v2 function calls `anthropic.messages.stream(...)` and wraps
the upstream iterator in a `ReadableStream`. Each
`content_block_delta` text chunk is enqueued as plain UTF8 bytes.
3. The React app reads `response.body.getReader()` and decodes chunks with
`TextDecoder`, appending them to the result state.
4. A `Thinking` flag stays `true` until the first chunk arrives, then flips
to a streaming state with a pulsing cursor.
1. The browser POSTs the form to `/api/agent-proxy`.
2. A Netlify Edge Function (Deno runtime) creates an Anthropic Managed
Agent session, opens the upstream SSE event stream, and sends the
user message. It then tails the stream and re-opens it (with an
`events.list` backfill, deduped by event id) whenever it drops mid
session, until the session reports a terminal `session.status_*`
event or a 20-minute wall-clock budget is hit.
3. Downstream to the browser the function emits newline-delimited JSON
(`application/x-ndjson`) — `text`, `status`, `heartbeat`, `done`,
`error` — one object per line.
4. The React app reads `response.body.getReader()`, splits on `\n`, and
parses each line. `text` lines append to the brief; `status` lines
drive a separate “what the agent is doing now” banner.
5. A `Thinking` flag stays `true` until the first chunk arrives, then
flips to a streaming state with a pulsing cursor.
Edge Functions are required here — the previous v2 Node Function ran on
AWS Lambda and got killed at ~27s, well before any reconnect could
fire. Edge Functions run on Deno Deploy with no streaming-duration cap
as long as the function keeps writing to the response body.
## Files
- [src/App.jsx](src/App.jsx) — form, streaming reader, UI states.
- [netlify/functions/agent-proxy.js](netlify/functions/agent-proxy.js)
Managed Agent proxy with `ReadableStream`.
- [src/App.jsx](src/App.jsx) — form, NDJSON streaming reader, UI states.
- [netlify/edge-functions/agent-proxy.ts](netlify/edge-functions/agent-proxy.ts)
Managed Agent proxy with reconnect, backfill, and NDJSON wire
protocol.
-4
View File
@@ -1,10 +1,6 @@
[build]
command = "npm run build"
publish = "dist"
functions = "netlify/functions"
[functions]
node_bundler = "esbuild"
[dev]
framework = "vite"
@@ -1,12 +1,20 @@
import Anthropic from '@anthropic-ai/sdk'
import type { Config, Context } from '@netlify/edge-functions'
/**
* Netlify Functions v2 handler.
* Netlify Edge Function (Deno runtime).
*
* Proxies a request from the React frontend to a Claude Console-defined
* Managed Agent via the /v1/sessions endpoints. The agent's model, system
* prompt, tools, MCP servers, and skills are configured in the Console and
* referenced here by ID.
* Managed Agent via the `/v1/sessions` endpoints. The agent's model,
* system prompt, tools, MCP servers, and skills are configured in the
* Console and referenced here by ID.
*
* This replaces the v2 Node Function at `netlify/functions/agent-proxy.js`.
* The Node Function process was killed at ~27 s by the platform well
* before any SSE reconnect could fire so the brief always stopped after
* the first MCP tool batch. Edge Functions run on Deno Deploy with a
* 40 s response-header timeout but no streaming-duration cap as long as
* we keep writing to the body, so the 20-minute reconnect budget below
* is actually reachable.
*
* Wire protocol (downstream to the browser): newline-delimited JSON
* (`application/x-ndjson`). One JSON object per line; the React side
@@ -26,14 +34,21 @@ import Anthropic from '@anthropic-ai/sdk'
* (`GET /v1/sessions/{id}/events/stream`) is not guaranteed to stay open
* for the full life of a multi-turn session long inter-turn gaps (e.g.
* while MCP tools or the next model request are running) can trigger
* upstream / proxy read timeouts. When the upstream iterator ends without
* upstream / proxy read timeouts. When the upstream stream ends without
* a terminal `session.status_*` event we treat it as a drop, fetch any
* events we missed via `sessions.events.list()` (deduped by event id),
* events we missed via the events-list endpoint (deduped by event id),
* reopen the live stream, and keep going up to a wall-clock budget.
*
* We deliberately use plain `fetch()` against the Anthropic REST API
* rather than the `@anthropic-ai/sdk` npm package because npm support in
* Netlify Edge Functions is still in beta, and the SSE / list endpoints
* we need are easy to call directly.
*/
// All Managed Agents endpoints require this beta header.
const MANAGED_AGENTS_BETA = 'managed-agents-2026-04-01'
const ANTHROPIC_VERSION = '2023-06-01'
const ANTHROPIC_API_BASE = 'https://api.anthropic.com'
// Downstream keep-alive cadence. The browser's fetch reader will time
// out / appear stuck if no bytes flow for too long; we emit a
@@ -51,19 +66,38 @@ const RECONNECT_BUDGET_MS = 20 * 60 * 1000 // 20 minutes
const RECONNECT_BACKOFF_MS = 500
// ---------------------------------------------------------------------------
// Anthropic client (reused across warm Lambda invocations)
// Types (kept loose — Anthropic Managed Agents events are large and we
// only inspect a small subset of fields).
// ---------------------------------------------------------------------------
let _client
function getClient() {
if (_client) return _client
if (!process.env.ANTHROPIC_API_KEY) {
throw new Error('Server is missing ANTHROPIC_API_KEY')
type ContentBlock = { type?: string; text?: string }
type AgentEvent = {
id?: string
type?: string
content?: ContentBlock[]
name?: string
input?: Record<string, unknown>
is_error?: boolean
stop_reason?: { type?: string }
error?: { message?: string }
}
type EventsListPage = {
data?: AgentEvent[]
has_more?: boolean
last_id?: string
}
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
function authHeaders(apiKey: string): Record<string, string> {
return {
'x-api-key': apiKey,
'anthropic-version': ANTHROPIC_VERSION,
'anthropic-beta': MANAGED_AGENTS_BETA,
}
_client = new Anthropic({
apiKey: process.env.ANTHROPIC_API_KEY,
defaultHeaders: { 'anthropic-beta': MANAGED_AGENTS_BETA },
})
return _client
}
// A `session.status_idle` with `stop_reason: requires_action` means the
@@ -71,22 +105,23 @@ function getClient() {
// confirmation). For this app we don't expose custom tools, so it should
// never fire — but if it ever does we explicitly do NOT treat it as
// terminal and let the loop keep tailing.
function isTerminal(event) {
if (event?.type === 'session.status_terminated') return true
if (event?.type === 'session.status_idle') {
function isTerminal(event: AgentEvent | undefined): boolean {
if (!event) return false
if (event.type === 'session.status_terminated') return true
if (event.type === 'session.status_idle') {
const t = event.stop_reason?.type
return t === 'end_turn' || t === 'retries_exhausted'
}
return false
}
function describeToolUse(event) {
function describeToolUse(event: AgentEvent): string {
const name = event.name || 'tool'
const input = event.input || {}
if (name === 'web_search' && input.query) {
const input = (event.input || {}) as Record<string, unknown>
if (name === 'web_search' && typeof input.query === 'string') {
return `Searching the web: ${input.query}`
}
if (name === 'web_fetch' && input.url) {
if (name === 'web_fetch' && typeof input.url === 'string') {
return `Fetching: ${input.url}`
}
if (event.type === 'agent.mcp_tool_use') {
@@ -94,7 +129,9 @@ function describeToolUse(event) {
.filter(([k]) => k !== 'limit')
.slice(0, 3)
.map(([k, v]) =>
typeof v === 'object' ? `${k}=…` : `${k}=${String(v).slice(0, 40)}`,
typeof v === 'object' && v !== null
? `${k}=…`
: `${k}=${String(v).slice(0, 40)}`,
)
.join(', ')
return `${name}${args ? ` (${args})` : ''}`
@@ -102,15 +139,112 @@ function describeToolUse(event) {
return name
}
async function anthropicFetch(
path: string,
apiKey: string,
init: RequestInit = {},
): Promise<Response> {
const headers = {
...authHeaders(apiKey),
...(init.headers as Record<string, string> | undefined),
}
return await fetch(`${ANTHROPIC_API_BASE}${path}`, { ...init, headers })
}
// Parse the SSE stream into an async iterable of decoded `AgentEvent`
// objects. Each SSE message is `event: <type>\ndata: <json>\n\n`.
async function* parseSse(
stream: ReadableStream<Uint8Array>,
): AsyncGenerator<AgentEvent> {
const reader = stream.getReader()
const decoder = new TextDecoder('utf-8')
let buffer = ''
while (true) {
const { value, done } = await reader.read()
if (done) break
buffer += decoder.decode(value, { stream: true })
// SSE events are separated by a blank line (`\n\n`). Process each
// complete event.
let sep
while ((sep = buffer.indexOf('\n\n')) !== -1) {
const raw = buffer.slice(0, sep)
buffer = buffer.slice(sep + 2)
let dataPayload = ''
let evType: string | undefined
for (const line of raw.split('\n')) {
if (line.startsWith('event:')) {
evType = line.slice(6).trim()
} else if (line.startsWith('data:')) {
dataPayload += line.slice(5).trim()
}
// Anthropic doesn't multi-line `data:`, but tolerate it just in
// case — multiple `data:` lines are concatenated per the SSE
// spec.
}
if (!dataPayload) continue
try {
const parsed = JSON.parse(dataPayload) as AgentEvent
// `event:` header takes precedence; fall back to the `type`
// field inside `data` (Anthropic sets both).
if (evType && !parsed.type) parsed.type = evType
yield parsed
} catch {
// Malformed event — skip.
}
}
}
}
// Page through `GET /v1/sessions/{id}/events` to backfill anything we
// missed while disconnected. The endpoint returns `{data, has_more,
// last_id}`; we keep paging while `has_more` is true.
async function* listAllEvents(
sessionId: string,
apiKey: string,
): AsyncGenerator<AgentEvent> {
let after: string | undefined
for (;;) {
const params = new URLSearchParams({ order: 'asc', limit: '100' })
if (after) params.set('after_id', after)
const res = await anthropicFetch(
`/v1/sessions/${sessionId}/events?${params.toString()}`,
apiKey,
)
if (!res.ok) {
throw new Error(
`events.list returned ${res.status} ${res.statusText}`,
)
}
const page = (await res.json()) as EventsListPage
const items = page.data || []
for (const ev of items) yield ev
if (!page.has_more) break
const lastId = page.last_id ?? items[items.length - 1]?.id
if (!lastId) break // can't paginate without an anchor
after = lastId
}
}
// ---------------------------------------------------------------------------
// Handler
// ---------------------------------------------------------------------------
export default async (req /*, context */) => {
const URL_REGEX =
/^(https?:\/\/)?([\w-]+\.)+[\w-]{2,}(\/[\w\-._~:/?#[\]@!$&'()*+,;=%]*)?$/i
export default async (req: Request, _context: Context) => {
if (req.method !== 'POST') {
return new Response('Method Not Allowed', { status: 405 })
}
let payload
let payload: {
stationName?: string
stationLocation?: string
stationWebsite?: string
}
try {
payload = await req.json()
} catch {
@@ -123,21 +257,22 @@ export default async (req /*, context */) => {
stationWebsite = '',
} = payload || {}
// Server-side URL validation (mirrors the client-side regex).
const URL_REGEX =
/^(https?:\/\/)?([\w-]+\.)+[\w-]{2,}(\/[\w\-._~:/?#[\]@!$&'()*+,;=%]*)?$/i
if (!URL_REGEX.test(stationWebsite)) {
return new Response('Invalid station website URL', { status: 400 })
}
const AGENT_ID = process.env.ANTHROPIC_AGENT_ID
const ENVIRONMENT_ID = process.env.ANTHROPIC_ENVIRONMENT_ID
const VAULT_IDS = (process.env.ANTHROPIC_VAULT_IDS || '')
const apiKey = Netlify.env.get('ANTHROPIC_API_KEY')
const AGENT_ID = Netlify.env.get('ANTHROPIC_AGENT_ID')
const ENVIRONMENT_ID = Netlify.env.get('ANTHROPIC_ENVIRONMENT_ID')
const VAULT_IDS = (Netlify.env.get('ANTHROPIC_VAULT_IDS') || '')
.split(',')
.map((s) => s.trim())
.filter(Boolean)
const missing = []
if (!apiKey) {
return new Response('Server is missing ANTHROPIC_API_KEY', { status: 500 })
}
const missing: string[] = []
if (!AGENT_ID) missing.push('ANTHROPIC_AGENT_ID')
if (!ENVIRONMENT_ID) missing.push('ANTHROPIC_ENVIRONMENT_ID')
if (missing.length) {
@@ -146,16 +281,6 @@ export default async (req /*, context */) => {
})
}
let client
try {
client = getClient()
} catch (err) {
return new Response(
`Failed to initialize agent: ${err.message || err}`,
{ status: 500 },
)
}
const userMessage = `Here is a new public media station intake:
- Station Name: ${stationName}
@@ -165,63 +290,106 @@ export default async (req /*, context */) => {
Please follow your instructions to produce the funding outlook brief.`
// ----- Create the session -----
let session
const createBody: Record<string, unknown> = {
agent: AGENT_ID,
environment_id: ENVIRONMENT_ID,
}
if (VAULT_IDS.length) createBody.vault_ids = VAULT_IDS
let sessionId: string
try {
const sessionParams = {
agent: AGENT_ID,
environment_id: ENVIRONMENT_ID,
const createRes = await anthropicFetch('/v1/sessions', apiKey, {
method: 'POST',
headers: { 'content-type': 'application/json' },
body: JSON.stringify(createBody),
})
if (!createRes.ok) {
const text = await createRes.text()
return new Response(
`Failed to create session: ${createRes.status} ${text}`,
{ status: 500 },
)
}
if (VAULT_IDS.length) sessionParams.vault_ids = VAULT_IDS
session = await client.beta.sessions.create(sessionParams)
const created = (await createRes.json()) as { id?: string }
if (!created.id) {
return new Response('Failed to create session: no id returned', {
status: 500,
})
}
sessionId = created.id
} catch (err) {
return new Response(
`Failed to create session: ${err.message || err}`,
`Failed to create session: ${(err as Error)?.message || err}`,
{ status: 500 },
)
}
// ----- Open the event stream BEFORE sending the user message -----
// (Stream-first ensures we don't miss any events the agent emits.)
let initialUpstream
const openStream = async (): Promise<ReadableStream<Uint8Array>> => {
const res = await anthropicFetch(
`/v1/sessions/${sessionId}/events/stream`,
apiKey,
{ method: 'GET', headers: { accept: 'text/event-stream' } },
)
if (!res.ok || !res.body) {
throw new Error(
`events.stream returned ${res.status} ${res.statusText}`,
)
}
return res.body
}
let upstream: ReadableStream<Uint8Array>
try {
initialUpstream = await client.beta.sessions.events.stream(session.id)
upstream = await openStream()
} catch (err) {
return new Response(
`Failed to open session event stream: ${err.message || err}`,
`Failed to open session event stream: ${(err as Error)?.message || err}`,
{ status: 500 },
)
}
// ----- Send the kickoff user.message event -----
try {
await client.beta.sessions.events.send(session.id, {
events: [
{
type: 'user.message',
content: [{ type: 'text', text: userMessage }],
},
],
})
} catch (err) {
try {
initialUpstream.controller?.abort?.()
} catch {
/* ignore */
const sendRes = await anthropicFetch(
`/v1/sessions/${sessionId}/events`,
apiKey,
{
method: 'POST',
headers: { 'content-type': 'application/json' },
body: JSON.stringify({
events: [
{
type: 'user.message',
content: [{ type: 'text', text: userMessage }],
},
],
}),
},
)
if (!sendRes.ok) {
const text = await sendRes.text()
return new Response(
`Failed to send user message: ${sendRes.status} ${text}`,
{ status: 500 },
)
}
} catch (err) {
return new Response(
`Failed to send user message: ${err.message || err}`,
`Failed to send user message: ${(err as Error)?.message || err}`,
{ status: 500 },
)
}
const encoder = new TextEncoder()
const seenEventIds = new Set()
const seenEventIds = new Set<string>()
const deadline = Date.now() + RECONNECT_BUDGET_MS
const body = new ReadableStream({
const body = new ReadableStream<Uint8Array>({
async start(controller) {
let lastSendAt = Date.now()
const writeJson = (obj) => {
const writeJson = (obj: unknown) => {
try {
controller.enqueue(encoder.encode(JSON.stringify(obj) + '\n'))
lastSendAt = Date.now()
@@ -230,16 +398,13 @@ Please follow your instructions to produce the funding outlook brief.`
}
}
// Heartbeat to keep the downstream connection alive while we're
// waiting on the next upstream event (e.g. during a long MCP tool
// call or before the next model_request_start).
const heartbeat = setInterval(() => {
if (Date.now() - lastSendAt >= HEARTBEAT_MS) {
writeJson({ type: 'heartbeat' })
}
}, HEARTBEAT_MS / 2)
const handle = (event) => {
const handle = (event: AgentEvent) => {
switch (event.type) {
case 'agent.message': {
if (Array.isArray(event.content)) {
@@ -272,8 +437,7 @@ Please follow your instructions to produce the funding outlook brief.`
case 'agent.mcp_tool_result': {
if (event.is_error) {
const msg =
(Array.isArray(event.content) &&
event.content[0]?.text) ||
(Array.isArray(event.content) && event.content[0]?.text) ||
'tool error'
writeJson({
type: 'status',
@@ -300,7 +464,7 @@ Please follow your instructions to produce the funding outlook brief.`
}
let done = false
let currentStream = initialUpstream
let currentStream: ReadableStream<Uint8Array> = upstream
let iteration = 0
try {
@@ -323,24 +487,19 @@ Please follow your instructions to produce the funding outlook brief.`
await new Promise((r) => setTimeout(r, RECONNECT_BACKOFF_MS))
try {
currentStream = await client.beta.sessions.events.stream(
session.id,
)
currentStream = await openStream()
} catch (err) {
writeJson({
type: 'error',
message: `Failed to reopen event stream: ${err?.message || err}`,
message: `Failed to reopen event stream: ${(err as Error)?.message || err}`,
})
break
}
// Backfill: pull everything the session has emitted so far
// and dedupe by event.id. Auto-paginated by the SDK.
// and dedupe by event.id.
try {
for await (const event of client.beta.sessions.events.list(
session.id,
{ order: 'asc' },
)) {
for await (const event of listAllEvents(sessionId, apiKey)) {
if (event.id && !seenEventIds.has(event.id)) {
seenEventIds.add(event.id)
handle(event)
@@ -357,7 +516,7 @@ Please follow your instructions to produce the funding outlook brief.`
writeJson({
type: 'status',
kind: 'session_error',
message: `Backfill failed: ${err?.message || err}`,
message: `Backfill failed: ${(err as Error)?.message || err}`,
})
// Don't break — fall through and try the live stream.
}
@@ -368,7 +527,7 @@ Please follow your instructions to produce the funding outlook brief.`
// Tail the currently-open live stream until it ends, errors,
// or hits a terminal session event.
try {
for await (const event of currentStream) {
for await (const event of parseSse(currentStream)) {
if (event.id && !seenEventIds.has(event.id)) {
seenEventIds.add(event.id)
handle(event)
@@ -391,15 +550,10 @@ Please follow your instructions to produce the funding outlook brief.`
} catch (err) {
writeJson({
type: 'error',
message: err?.message || String(err),
message: (err as Error)?.message || String(err),
})
} finally {
clearInterval(heartbeat)
try {
currentStream?.controller?.abort?.()
} catch {
/* ignore */
}
try {
controller.close()
} catch {
@@ -407,17 +561,6 @@ Please follow your instructions to produce the funding outlook brief.`
}
}
},
cancel() {
// Browser disconnected. Best-effort: abort whichever upstream is
// currently open. (`currentStream` lives inside `start()` so we
// only have the initial one in scope here — that's enough to make
// sure we don't keep a socket open after the very first turn.)
try {
initialUpstream.controller?.abort?.()
} catch {
/* ignore */
}
},
})
return new Response(body, {
@@ -430,6 +573,6 @@ Please follow your instructions to produce the funding outlook brief.`
})
}
export const config = {
path: '/.netlify/functions/agent-proxy',
export const config: Config = {
path: '/api/agent-proxy',
}
+1705 -69
View File
File diff suppressed because it is too large Load Diff
+1 -1
View File
@@ -10,13 +10,13 @@
"preview": "vite preview"
},
"dependencies": {
"@anthropic-ai/sdk": "^0.95.2",
"react": "^18.3.1",
"react-dom": "^18.3.1",
"react-markdown": "^9.0.1",
"remark-gfm": "^4.0.0"
},
"devDependencies": {
"@netlify/edge-functions": "^2.11.1",
"@vitejs/plugin-react": "^4.3.4",
"autoprefixer": "^10.4.20",
"postcss": "^8.4.49",
+1 -1
View File
@@ -128,7 +128,7 @@ export default function App() {
setIsStreaming(false)
try {
const response = await fetch('/.netlify/functions/agent-proxy', {
const response = await fetch('/api/agent-proxy', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(form),