Files
pmc-funder-tool/netlify/functions/agent-proxy.js
T
Devin AI c283d884cf fix(agent-proxy): reconnect with events.list backfill so long sessions don't cut off
The Anthropic Managed-Agents SSE stream (`/v1/sessions/{id}/events/stream`)
is not guaranteed to stay open for the life of a multi-turn session.
Inter-turn gaps (e.g. while MCP tools or the next model request run) of
~25s+ are common and can trigger upstream / proxy read timeouts; when
that happens, the SDK's `for await` iterator just ends and the proxy
silently closed the downstream response, leaving the browser with only
the events from the first tool batch.

Implement the official 'consolidation' pattern: treat any upstream
end-of-stream that isn't a terminal `session.status_*` as a transient
drop, fetch what we missed via `sessions.events.list()` (deduped by
event.id), reopen the live stream, and keep going — up to a 20 minute
wall-clock budget so a wedged session can't pin the function forever.

Also switch the wire format from inline-markdown-with-status-comments to
NDJSON, so the React side can render in-flight tool activity in a
separate status banner from the streaming brief. The existing zero-width
heartbeat is replaced by an explicit `{"type":"heartbeat"}` line.

Drop the `agent.tool_use` write-tool special case — the agent now
streams the brief directly via `agent.message` text blocks.

Co-Authored-By: alex <alex@semipublic.co>
2026-05-13 03:20:15 +00:00

436 lines
14 KiB
JavaScript

import Anthropic from '@anthropic-ai/sdk'
/**
* Netlify Functions v2 handler.
*
* Proxies a request from the React frontend to a Claude Console-defined
* Managed Agent via the /v1/sessions endpoints. The agent's model, system
* prompt, tools, MCP servers, and skills are configured in the Console and
* referenced here by ID.
*
* Wire protocol (downstream to the browser): newline-delimited JSON
* (`application/x-ndjson`). One JSON object per line; the React side
* parses incrementally as bytes arrive. Object shapes:
*
* {"type":"text","text":"...markdown..."} // append to brief
* {"type":"status","kind":"thinking"}
* {"type":"status","kind":"tool_use","name":"...","label":"..."}
* {"type":"status","kind":"tool_result","ok":true}
* {"type":"status","kind":"tool_result","ok":false,"message":"..."}
* {"type":"status","kind":"session_error","message":"..."}
* {"type":"heartbeat"} // keep-alive only
* {"type":"done"} // session terminated cleanly
* {"type":"error","message":"..."} // unrecoverable
*
* Reconnect strategy: the upstream Anthropic SSE stream
* (`GET /v1/sessions/{id}/events/stream`) is not guaranteed to stay open
* for the full life of a multi-turn session — long inter-turn gaps (e.g.
* while MCP tools or the next model request are running) can trigger
* upstream / proxy read timeouts. When the upstream iterator ends without
* a terminal `session.status_*` event we treat it as a drop, fetch any
* events we missed via `sessions.events.list()` (deduped by event id),
* reopen the live stream, and keep going — up to a wall-clock budget.
*/
// All Managed Agents endpoints require this beta header.
const MANAGED_AGENTS_BETA = 'managed-agents-2026-04-01'
// Downstream keep-alive cadence. The browser's fetch reader will time
// out / appear stuck if no bytes flow for too long; we emit a
// `{"type":"heartbeat"}` line when the upstream is quiet.
const HEARTBEAT_MS = 10_000
// Hard ceiling on total wall-clock time the function will keep
// reconnecting upstream on behalf of a single browser request. Past this
// we emit an `error` line and close the stream so the user can retry
// rather than having the request hang indefinitely.
const RECONNECT_BUDGET_MS = 20 * 60 * 1000 // 20 minutes
// Short backoff between an upstream drop and the next reconnect attempt,
// so a tight error loop can't hammer the API.
const RECONNECT_BACKOFF_MS = 500
// ---------------------------------------------------------------------------
// Anthropic client (reused across warm Lambda invocations)
// ---------------------------------------------------------------------------
let _client
function getClient() {
if (_client) return _client
if (!process.env.ANTHROPIC_API_KEY) {
throw new Error('Server is missing ANTHROPIC_API_KEY')
}
_client = new Anthropic({
apiKey: process.env.ANTHROPIC_API_KEY,
defaultHeaders: { 'anthropic-beta': MANAGED_AGENTS_BETA },
})
return _client
}
// A `session.status_idle` with `stop_reason: requires_action` means the
// agent is waiting on a client-side response (custom tool result, tool
// confirmation). For this app we don't expose custom tools, so it should
// never fire — but if it ever does we explicitly do NOT treat it as
// terminal and let the loop keep tailing.
function isTerminal(event) {
if (event?.type === 'session.status_terminated') return true
if (event?.type === 'session.status_idle') {
const t = event.stop_reason?.type
return t === 'end_turn' || t === 'retries_exhausted'
}
return false
}
function describeToolUse(event) {
const name = event.name || 'tool'
const input = event.input || {}
if (name === 'web_search' && input.query) {
return `Searching the web: ${input.query}`
}
if (name === 'web_fetch' && input.url) {
return `Fetching: ${input.url}`
}
if (event.type === 'agent.mcp_tool_use') {
const args = Object.entries(input)
.filter(([k]) => k !== 'limit')
.slice(0, 3)
.map(([k, v]) =>
typeof v === 'object' ? `${k}=…` : `${k}=${String(v).slice(0, 40)}`,
)
.join(', ')
return `${name}${args ? ` (${args})` : ''}`
}
return name
}
// ---------------------------------------------------------------------------
// Handler
// ---------------------------------------------------------------------------
export default async (req /*, context */) => {
if (req.method !== 'POST') {
return new Response('Method Not Allowed', { status: 405 })
}
let payload
try {
payload = await req.json()
} catch {
return new Response('Invalid JSON body', { status: 400 })
}
const {
stationName = '',
stationLocation = '',
stationWebsite = '',
} = payload || {}
// Server-side URL validation (mirrors the client-side regex).
const URL_REGEX =
/^(https?:\/\/)?([\w-]+\.)+[\w-]{2,}(\/[\w\-._~:/?#[\]@!$&'()*+,;=%]*)?$/i
if (!URL_REGEX.test(stationWebsite)) {
return new Response('Invalid station website URL', { status: 400 })
}
const AGENT_ID = process.env.ANTHROPIC_AGENT_ID
const ENVIRONMENT_ID = process.env.ANTHROPIC_ENVIRONMENT_ID
const VAULT_IDS = (process.env.ANTHROPIC_VAULT_IDS || '')
.split(',')
.map((s) => s.trim())
.filter(Boolean)
const missing = []
if (!AGENT_ID) missing.push('ANTHROPIC_AGENT_ID')
if (!ENVIRONMENT_ID) missing.push('ANTHROPIC_ENVIRONMENT_ID')
if (missing.length) {
return new Response(`Server is missing env var(s): ${missing.join(', ')}`, {
status: 500,
})
}
let client
try {
client = getClient()
} catch (err) {
return new Response(
`Failed to initialize agent: ${err.message || err}`,
{ status: 500 },
)
}
const userMessage = `Here is a new public media station intake:
- Station Name: ${stationName}
- Station Location: ${stationLocation}
- Station Website: ${stationWebsite}
Please follow your instructions to produce the funding outlook brief.`
// ----- Create the session -----
let session
try {
const sessionParams = {
agent: AGENT_ID,
environment_id: ENVIRONMENT_ID,
}
if (VAULT_IDS.length) sessionParams.vault_ids = VAULT_IDS
session = await client.beta.sessions.create(sessionParams)
} catch (err) {
return new Response(
`Failed to create session: ${err.message || err}`,
{ status: 500 },
)
}
// ----- Open the event stream BEFORE sending the user message -----
// (Stream-first ensures we don't miss any events the agent emits.)
let initialUpstream
try {
initialUpstream = await client.beta.sessions.events.stream(session.id)
} catch (err) {
return new Response(
`Failed to open session event stream: ${err.message || err}`,
{ status: 500 },
)
}
// ----- Send the kickoff user.message event -----
try {
await client.beta.sessions.events.send(session.id, {
events: [
{
type: 'user.message',
content: [{ type: 'text', text: userMessage }],
},
],
})
} catch (err) {
try {
initialUpstream.controller?.abort?.()
} catch {
/* ignore */
}
return new Response(
`Failed to send user message: ${err.message || err}`,
{ status: 500 },
)
}
const encoder = new TextEncoder()
const seenEventIds = new Set()
const deadline = Date.now() + RECONNECT_BUDGET_MS
const body = new ReadableStream({
async start(controller) {
let lastSendAt = Date.now()
const writeJson = (obj) => {
try {
controller.enqueue(encoder.encode(JSON.stringify(obj) + '\n'))
lastSendAt = Date.now()
} catch {
/* controller closed */
}
}
// Heartbeat to keep the downstream connection alive while we're
// waiting on the next upstream event (e.g. during a long MCP tool
// call or before the next model_request_start).
const heartbeat = setInterval(() => {
if (Date.now() - lastSendAt >= HEARTBEAT_MS) {
writeJson({ type: 'heartbeat' })
}
}, HEARTBEAT_MS / 2)
const handle = (event) => {
switch (event.type) {
case 'agent.message': {
if (Array.isArray(event.content)) {
for (const block of event.content) {
if (block?.type === 'text' && block.text) {
writeJson({ type: 'text', text: block.text + '\n\n' })
}
}
}
break
}
case 'agent.thinking': {
writeJson({ type: 'status', kind: 'thinking' })
break
}
case 'agent.tool_use':
case 'agent.mcp_tool_use': {
writeJson({
type: 'status',
kind: 'tool_use',
name: event.name || 'tool',
label: describeToolUse(event),
})
break
}
case 'agent.tool_result':
case 'agent.mcp_tool_result': {
if (event.is_error) {
const msg =
(Array.isArray(event.content) &&
event.content[0]?.text) ||
'tool error'
writeJson({
type: 'status',
kind: 'tool_result',
ok: false,
message: String(msg).slice(0, 300),
})
} else {
writeJson({ type: 'status', kind: 'tool_result', ok: true })
}
break
}
case 'session.error': {
const msg = event.error?.message || 'unknown session error'
writeJson({
type: 'status',
kind: 'session_error',
message: msg,
})
break
}
}
}
let done = false
let currentStream = initialUpstream
let iteration = 0
try {
while (!done) {
iteration++
// On reconnect (every iteration after the first), reopen the
// live stream and backfill anything we missed during the gap.
if (iteration > 1) {
if (Date.now() >= deadline) {
writeJson({
type: 'error',
message: `Stream reconnect budget (${Math.round(
RECONNECT_BUDGET_MS / 60_000,
)} min) exhausted. The session may still be running — try again or check the Console.`,
})
break
}
await new Promise((r) => setTimeout(r, RECONNECT_BACKOFF_MS))
try {
currentStream = await client.beta.sessions.events.stream(
session.id,
)
} catch (err) {
writeJson({
type: 'error',
message: `Failed to reopen event stream: ${err?.message || err}`,
})
break
}
// Backfill: pull everything the session has emitted so far
// and dedupe by event.id. Auto-paginated by the SDK.
try {
for await (const event of client.beta.sessions.events.list(
session.id,
{ order: 'asc' },
)) {
if (event.id && !seenEventIds.has(event.id)) {
seenEventIds.add(event.id)
handle(event)
}
// Terminal checks must run even for already-seen events,
// or a terminal event that came in via the backfill gets
// skipped and the loop never exits.
if (isTerminal(event)) {
done = true
break
}
}
} catch (err) {
writeJson({
type: 'status',
kind: 'session_error',
message: `Backfill failed: ${err?.message || err}`,
})
// Don't break — fall through and try the live stream.
}
if (done) break
}
// Tail the currently-open live stream until it ends, errors,
// or hits a terminal session event.
try {
for await (const event of currentStream) {
if (event.id && !seenEventIds.has(event.id)) {
seenEventIds.add(event.id)
handle(event)
}
if (isTerminal(event)) {
done = true
break
}
}
} catch {
// Treat as transient. If it's actually persistent, the
// reopen + backfill on the next iteration will surface it
// (or burn down the reconnect budget cleanly).
}
}
if (done) {
writeJson({ type: 'done' })
}
} catch (err) {
writeJson({
type: 'error',
message: err?.message || String(err),
})
} finally {
clearInterval(heartbeat)
try {
currentStream?.controller?.abort?.()
} catch {
/* ignore */
}
try {
controller.close()
} catch {
/* ignore */
}
}
},
cancel() {
// Browser disconnected. Best-effort: abort whichever upstream is
// currently open. (`currentStream` lives inside `start()` so we
// only have the initial one in scope here — that's enough to make
// sure we don't keep a socket open after the very first turn.)
try {
initialUpstream.controller?.abort?.()
} catch {
/* ignore */
}
},
})
return new Response(body, {
status: 200,
headers: {
'Content-Type': 'application/x-ndjson; charset=utf-8',
'Cache-Control': 'no-cache, no-transform',
'X-Accel-Buffering': 'no',
},
})
}
export const config = {
path: '/.netlify/functions/agent-proxy',
}