Merge pull request #1 from Semipublic/devin/1778642010-fix-stream-reconnect

fix(agent-proxy): Edge Function + reconnect/backfill + segmented streaming so long sessions stream end-to-end
This commit is contained in:
2026-05-13 09:34:52 -04:00
committed by GitHub
7 changed files with 2648 additions and 405 deletions
+47 -12
View File
@@ -2,7 +2,7 @@
A React + Vite frontend (Tailwind, Noto Serif) that collects public media A React + Vite frontend (Tailwind, Noto Serif) that collects public media
station details and streams an Anthropic Managed Agent response back to the station details and streams an Anthropic Managed Agent response back to the
browser through a Netlify Functions v2 proxy. browser through a Netlify Edge Function proxy.
## Setup ## Setup
@@ -31,17 +31,52 @@ Open http://localhost:8888.
## How streaming works ## How streaming works
1. The browser POSTs the form to `/.netlify/functions/agent-proxy`. 1. The browser POSTs the form to `/api/agent-proxy`.
2. The Netlify v2 function calls `anthropic.messages.stream(...)` and wraps 2. A Netlify Edge Function (Deno runtime) creates an Anthropic Managed
the upstream iterator in a `ReadableStream`. Each Agent session, opens the upstream SSE event stream, and sends the
`content_block_delta` text chunk is enqueued as plain UTF8 bytes. user message. It tails the stream and re-opens it (with an
3. The React app reads `response.body.getReader()` and decodes chunks with `events.list` backfill, deduped by event id) whenever it drops mid
`TextDecoder`, appending them to the result state. session, until the session reports a terminal `session.status_*`
4. A `Thinking` flag stays `true` until the first chunk arrives, then flips event or a 20-minute wall-clock budget is hit.
to a streaming state with a pulsing cursor. 3. Downstream to the browser the function emits newline-delimited JSON
(`application/x-ndjson`) — `text`, `status`, `heartbeat`,
`segment_end`, `done`, `error` — one object per line.
4. The React app reads `response.body.getReader()`, splits on `\n`, and
parses each line. `text` lines append to the brief; `status` lines
drive a separate “what the agent is doing now” banner.
5. A `Thinking` flag stays `true` until the first chunk arrives, then
flips to a streaming state with a pulsing cursor.
### Why segmented streaming
Netlify Edge Functions empirically cap a single streaming response at
~60 s wall-clock (Netlify's own canonical SSE example cuts at the same
mark, even though the docs imply "indefinite" streaming). The Anthropic
Managed Agent session itself is fine for several minutes; the limit is
per-HTTP-response. So the proxy segments the stream:
- Just before ~54 s the function writes one final NDJSON line —
`{"type":"segment_end","sessionId":"sess_…","lastEventId":"evt_…","startedAt":…}`
and closes the response.
- The React side sees `segment_end`, immediately POSTs to
`/api/agent-proxy` again with `{sessionId, lastEventId, startedAt}`
in the body.
- The function recognises this as a resume payload: it does **not**
create a new session or re-send the user message. It just reopens
the live SSE stream, backfills via
`GET /v1/sessions/{id}/events?after_id=lastEventId` (deduped by
`event.id`), and keeps tailing.
The 20-minute overall budget is enforced via `Date.now() - startedAt`
so it spans across all segments. The previous v2 Node Function (on AWS
Lambda) was killed at ~27 s, well before any reconnect could fire —
moving to Edge Functions extended that to ~60 s per response, and the
segment loop extends it the rest of the way.
## Files ## Files
- [src/App.jsx](src/App.jsx) — form, streaming reader, UI states. - [src/App.jsx](src/App.jsx) — form, NDJSON streaming reader,
- [netlify/functions/agent-proxy.js](netlify/functions/agent-proxy.js) — segment-end resume loop, UI states.
Managed Agent proxy with `ReadableStream`. - [netlify/edge-functions/agent-proxy.ts](netlify/edge-functions/agent-proxy.ts)
— Managed Agent proxy with reconnect, backfill, segmentation, and
NDJSON wire protocol.
-4
View File
@@ -1,10 +1,6 @@
[build] [build]
command = "npm run build" command = "npm run build"
publish = "dist" publish = "dist"
functions = "netlify/functions"
[functions]
node_bundler = "esbuild"
[dev] [dev]
framework = "vite" framework = "vite"
+766
View File
@@ -0,0 +1,766 @@
import type { Config, Context } from '@netlify/edge-functions'
/**
* Netlify Edge Function (Deno runtime).
*
* Proxies a request from the React frontend to a Claude Console-defined
* Managed Agent. The agent's model, system prompt, tools, MCP servers
* and skills are configured in the Console and referenced here by ID.
*
* ## Why this exists
*
* Two layered problems are in scope:
*
* 1. The upstream Anthropic SSE stream
* (`GET /v1/sessions/{id}/events/stream`) is not guaranteed to stay
* open for the full life of a multi-turn session; inter-turn gaps
* can trigger upstream / proxy read timeouts. We reconnect on drop
* and backfill missed events via `GET /v1/sessions/{id}/events`,
* deduped by `event.id`.
*
* 2. Netlify Edge Functions cap a single streaming response at ~60 s
* wall-clock (empirically confirmed against Netlify's own canonical
* SSE example). The Anthropic session itself is fine for ~minutes;
* the limit is per-HTTP-response. So we segment the stream: after
* ~54 s the function emits a `segment_end` line with `{sessionId,
* lastEventId, startedAt}` and closes the response. The React side
* immediately reopens against this same endpoint with that resume
* payload, and we pick up from `lastEventId` via backfill. To the
* user this looks like one continuous stream.
*
* ## Wire protocol (downstream to the browser): NDJSON
*
* {"type":"text","text":"...markdown..."} // append to brief
* {"type":"status","kind":"thinking"}
* {"type":"status","kind":"tool_use","name":"...","label":"..."}
* {"type":"status","kind":"tool_result","ok":true}
* {"type":"status","kind":"tool_result","ok":false,"message":"..."}
* {"type":"status","kind":"session_error","message":"..."}
* {"type":"heartbeat"} // keep-alive
* {"type":"segment_end","sessionId":"sess_...",
* "lastEventId":"evt_...","startedAt":1234567890} // reopen me
* {"type":"done"} // terminal
* {"type":"error","message":"..."} // unrecoverable
*
* ## Request payloads
*
* - New session (first call):
* {stationName, stationLocation, stationWebsite}
* - Resume (subsequent calls after a segment_end):
* {sessionId, lastEventId?, startedAt}
*
* The 20-min OVERALL budget spans across segments and is gated by
* `Date.now() - startedAt`. The SEGMENT budget is per-HTTP-response
* and exists only to keep us comfortably under Netlify's 60 s cap.
*/
const MANAGED_AGENTS_BETA = 'managed-agents-2026-04-01'
const ANTHROPIC_VERSION = '2023-06-01'
const ANTHROPIC_API_BASE = 'https://api.anthropic.com'
// Downstream keep-alive cadence. The browser's fetch reader appears
// stuck if no bytes flow for too long. We keep this tighter than the
// segment budget so the connection stays warm even during long
// model-thinking gaps with no agent events.
const HEARTBEAT_MS = 5_000
// Close the current response well before Netlify's empirical streaming
// cap. We've observed responses cut anywhere between ~49 s and ~60 s,
// so a 54 s budget is too aggressive — sometimes the platform pulls
// the rug out before our final `segment_end` write + `controller.close()`
// can flush. 40 s gives ~10 s of headroom on the low end of the
// observed range while still amortising session setup (~1 segment per
// ~40 s of brief) across a typical 3-5 min brief.
const SEGMENT_BUDGET_MS = 40_000
// Total wall-clock allowance across ALL segments for a single user
// brief. Past this we emit a final `error` line and stop reopening.
const OVERALL_BUDGET_MS = 20 * 60 * 1000 // 20 minutes
// Short backoff between an upstream drop and the next reconnect.
const RECONNECT_BACKOFF_MS = 500
// ---------------------------------------------------------------------------
// Types
// ---------------------------------------------------------------------------
type ContentBlock = { type?: string; text?: string }
type AgentEvent = {
id?: string
type?: string
content?: ContentBlock[]
name?: string
input?: Record<string, unknown>
is_error?: boolean
stop_reason?: { type?: string }
error?: { message?: string }
}
type EventsListPage = {
data?: AgentEvent[]
next_page?: string
}
type NewSessionPayload = {
stationName?: string
stationLocation?: string
stationWebsite?: string
}
type ResumePayload = {
sessionId: string
lastEventId?: string
startedAt?: number
}
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
function authHeaders(apiKey: string): Record<string, string> {
return {
'x-api-key': apiKey,
'anthropic-version': ANTHROPIC_VERSION,
'anthropic-beta': MANAGED_AGENTS_BETA,
}
}
// A `session.status_idle` with `stop_reason: requires_action` means
// the agent is waiting on a client-side response (custom tool result,
// tool confirmation). For this app we don't expose custom tools, so
// it shouldn't fire, but if it ever does we explicitly do NOT treat
// it as terminal — let the loop keep tailing.
function isTerminal(event: AgentEvent | undefined): boolean {
if (!event) return false
if (event.type === 'session.status_terminated') return true
if (event.type === 'session.status_idle') {
const t = event.stop_reason?.type
return t === 'end_turn' || t === 'retries_exhausted'
}
return false
}
function describeToolUse(event: AgentEvent): string {
const name = event.name || 'tool'
const input = (event.input || {}) as Record<string, unknown>
if (name === 'web_search' && typeof input.query === 'string') {
return `Searching the web: ${input.query}`
}
if (name === 'web_fetch' && typeof input.url === 'string') {
return `Fetching: ${input.url}`
}
if (event.type === 'agent.mcp_tool_use') {
const args = Object.entries(input)
.filter(([k]) => k !== 'limit')
.slice(0, 3)
.map(([k, v]) =>
typeof v === 'object' && v !== null
? `${k}=…`
: `${k}=${String(v).slice(0, 40)}`,
)
.join(', ')
return `${name}${args ? ` (${args})` : ''}`
}
return name
}
async function anthropicFetch(
path: string,
apiKey: string,
init: RequestInit = {},
): Promise<Response> {
const headers = {
...authHeaders(apiKey),
...(init.headers as Record<string, string> | undefined),
}
return await fetch(`${ANTHROPIC_API_BASE}${path}`, { ...init, headers })
}
// Parse the SSE stream into an async iterable of decoded events.
// Each SSE message is `event: <type>\ndata: <json>\n\n`.
//
// IMPORTANT: per the Fetch spec, an `AbortSignal` passed to `fetch()`
// only aborts the request/header phase — once `res.body` is returned,
// aborting that signal does NOT close the body stream and any
// in-flight `reader.read()` will hang forever. So we attach our own
// listener and `reader.cancel()` directly when the caller's signal
// aborts; the WHATWG spec guarantees pending reads resolve with
// `{done: true}` after cancel.
async function* parseSse(
stream: ReadableStream<Uint8Array>,
signal?: AbortSignal,
): AsyncGenerator<AgentEvent> {
const reader = stream.getReader()
const decoder = new TextDecoder('utf-8')
let buffer = ''
const onAbort = () => {
reader.cancel().catch(() => {})
}
if (signal) {
if (signal.aborted) onAbort()
else signal.addEventListener('abort', onAbort, { once: true })
}
try {
while (true) {
const { value, done } = await reader.read()
if (done) break
buffer += decoder.decode(value, { stream: true })
let sep
while ((sep = buffer.indexOf('\n\n')) !== -1) {
const raw = buffer.slice(0, sep)
buffer = buffer.slice(sep + 2)
let dataPayload = ''
let evType: string | undefined
for (const line of raw.split('\n')) {
if (line.startsWith('event:')) {
evType = line.slice(6).trim()
} else if (line.startsWith('data:')) {
dataPayload += line.slice(5).trim()
}
}
if (!dataPayload) continue
try {
const parsed = JSON.parse(dataPayload) as AgentEvent
if (evType && !parsed.type) parsed.type = evType
yield parsed
} catch {
// Malformed event — skip.
}
}
}
} finally {
if (signal) {
signal.removeEventListener('abort', onAbort)
}
try {
reader.releaseLock()
} catch {
/* ignore */
}
}
}
// Page through `GET /v1/sessions/{id}/events` to backfill anything
// we missed. The Managed Agents endpoint uses an opaque `page` cursor
// returned as `next_page` on each response; there is no `after_id`
// filter, so we always pull the full history and the caller is
// responsible for skipping events it has already handled (via
// `seenEventIds` and the resume-payload `lastEventId`).
async function* listAllEvents(
sessionId: string,
apiKey: string,
signal?: AbortSignal,
): AsyncGenerator<AgentEvent> {
let pageCursor: string | undefined
for (;;) {
const params = new URLSearchParams({ order: 'asc', limit: '1000' })
if (pageCursor) params.set('page', pageCursor)
const res = await anthropicFetch(
`/v1/sessions/${sessionId}/events?${params.toString()}`,
apiKey,
{ signal },
)
if (!res.ok) {
throw new Error(
`events.list returned ${res.status} ${res.statusText}`,
)
}
const page = (await res.json()) as EventsListPage
const items = page.data || []
for (const ev of items) yield ev
if (!page.next_page) break
pageCursor = page.next_page
}
}
async function openStream(
sessionId: string,
apiKey: string,
signal: AbortSignal,
): Promise<ReadableStream<Uint8Array>> {
const res = await anthropicFetch(
`/v1/sessions/${sessionId}/events/stream`,
apiKey,
{
method: 'GET',
headers: { accept: 'text/event-stream' },
signal,
},
)
if (!res.ok || !res.body) {
throw new Error(
`events.stream returned ${res.status} ${res.statusText}`,
)
}
return res.body
}
// `setTimeout` that resolves early on abort.
function abortableSleep(ms: number, signal: AbortSignal): Promise<void> {
return new Promise<void>((resolve) => {
const t = setTimeout(resolve, ms)
const onAbort = () => {
clearTimeout(t)
resolve()
}
if (signal.aborted) {
onAbort()
return
}
signal.addEventListener('abort', onAbort, { once: true })
})
}
// ---------------------------------------------------------------------------
// Handler
// ---------------------------------------------------------------------------
const URL_REGEX =
/^(https?:\/\/)?([\w-]+\.)+[\w-]{2,}(\/[\w\-._~:/?#[\]@!$&'()*+,;=%]*)?$/i
export default async (req: Request, _context: Context) => {
if (req.method !== 'POST') {
return new Response('Method Not Allowed', { status: 405 })
}
let payload: NewSessionPayload & Partial<ResumePayload>
try {
payload = await req.json()
} catch {
return new Response('Invalid JSON body', { status: 400 })
}
const apiKey = Netlify.env.get('ANTHROPIC_API_KEY')
if (!apiKey) {
return new Response('Server is missing ANTHROPIC_API_KEY', {
status: 500,
})
}
const isResume =
typeof payload.sessionId === 'string' && payload.sessionId.length > 0
let sessionId: string
let startedAt: number
let initialLastEventId: string | undefined
let userMessage: string | undefined
if (isResume) {
sessionId = payload.sessionId as string
startedAt =
typeof payload.startedAt === 'number' ? payload.startedAt : Date.now()
initialLastEventId =
typeof payload.lastEventId === 'string' && payload.lastEventId
? payload.lastEventId
: undefined
if (Date.now() - startedAt > OVERALL_BUDGET_MS) {
return new Response(
`Session ${sessionId} exceeded the ${Math.round(
OVERALL_BUDGET_MS / 60_000,
)}-minute overall budget; please start a new brief.`,
{ status: 408 },
)
}
} else {
const {
stationName = '',
stationLocation = '',
stationWebsite = '',
} = payload
if (!URL_REGEX.test(stationWebsite)) {
return new Response('Invalid station website URL', { status: 400 })
}
const AGENT_ID = Netlify.env.get('ANTHROPIC_AGENT_ID')
const ENVIRONMENT_ID = Netlify.env.get('ANTHROPIC_ENVIRONMENT_ID')
const VAULT_IDS = (Netlify.env.get('ANTHROPIC_VAULT_IDS') || '')
.split(',')
.map((s) => s.trim())
.filter(Boolean)
const missing: string[] = []
if (!AGENT_ID) missing.push('ANTHROPIC_AGENT_ID')
if (!ENVIRONMENT_ID) missing.push('ANTHROPIC_ENVIRONMENT_ID')
if (missing.length) {
return new Response(
`Server is missing env var(s): ${missing.join(', ')}`,
{ status: 500 },
)
}
userMessage = `Here is a new public media station intake:
- Station Name: ${stationName}
- Station Location: ${stationLocation}
- Station Website: ${stationWebsite}
Please follow your instructions to produce the funding outlook brief.`
const createBody: Record<string, unknown> = {
agent: AGENT_ID,
environment_id: ENVIRONMENT_ID,
}
if (VAULT_IDS.length) createBody.vault_ids = VAULT_IDS
try {
const createRes = await anthropicFetch('/v1/sessions', apiKey, {
method: 'POST',
headers: { 'content-type': 'application/json' },
body: JSON.stringify(createBody),
})
if (!createRes.ok) {
const text = await createRes.text()
return new Response(
`Failed to create session: ${createRes.status} ${text}`,
{ status: 500 },
)
}
const created = (await createRes.json()) as { id?: string }
if (!created.id) {
return new Response('Failed to create session: no id returned', {
status: 500,
})
}
sessionId = created.id
startedAt = Date.now()
} catch (err) {
return new Response(
`Failed to create session: ${(err as Error)?.message || err}`,
{ status: 500 },
)
}
}
const encoder = new TextEncoder()
const seenEventIds = new Set<string>()
const body = new ReadableStream<Uint8Array>({
async start(controller) {
let lastSendAt = Date.now()
let lastEventIdSeen = initialLastEventId
// For resumes we replay the full session history via
// `listAllEvents` but mute every event up to and including
// `initialLastEventId` (the user has already seen those). Once
// we've cleared that cursor (or on a brand-new session where it
// is undefined), we start actually delivering events.
let pastInitialId = !initialLastEventId
const writeJson = (obj: unknown) => {
try {
controller.enqueue(encoder.encode(JSON.stringify(obj) + '\n'))
lastSendAt = Date.now()
} catch {
/* controller closed */
}
}
const heartbeat = setInterval(() => {
if (Date.now() - lastSendAt >= HEARTBEAT_MS) {
writeJson({ type: 'heartbeat' })
}
}, HEARTBEAT_MS / 2)
// Single AbortController for this segment. The segment timer
// calls .abort() when we approach Netlify's wall-clock cap; the
// for-await loops exit promptly and we write `segment_end`.
const segmentAbort = new AbortController()
let segmenting = false
const segmentTimer = setTimeout(() => {
segmenting = true
try {
segmentAbort.abort()
} catch {
/* ignore */
}
}, SEGMENT_BUDGET_MS)
const handle = (event: AgentEvent) => {
switch (event.type) {
case 'agent.message': {
if (Array.isArray(event.content)) {
for (const block of event.content) {
if (block?.type === 'text' && block.text) {
writeJson({ type: 'text', text: block.text + '\n\n' })
}
}
}
break
}
case 'agent.thinking': {
writeJson({ type: 'status', kind: 'thinking' })
break
}
case 'agent.tool_use':
case 'agent.mcp_tool_use': {
writeJson({
type: 'status',
kind: 'tool_use',
name: event.name || 'tool',
label: describeToolUse(event),
})
break
}
case 'agent.tool_result':
case 'agent.mcp_tool_result': {
if (event.is_error) {
const msg =
(Array.isArray(event.content) && event.content[0]?.text) ||
'tool error'
writeJson({
type: 'status',
kind: 'tool_result',
ok: false,
message: String(msg).slice(0, 300),
})
} else {
writeJson({ type: 'status', kind: 'tool_result', ok: true })
}
break
}
case 'session.error': {
const msg = event.error?.message || 'unknown session error'
writeJson({
type: 'status',
kind: 'session_error',
message: msg,
})
break
}
}
}
// Brand new session: send the kickoff user.message BEFORE we
// start tailing. (For resume, the user.message was sent on the
// first segment, so we never re-send it.)
if (!isResume && userMessage) {
try {
const sendRes = await anthropicFetch(
`/v1/sessions/${sessionId}/events`,
apiKey,
{
method: 'POST',
headers: { 'content-type': 'application/json' },
body: JSON.stringify({
events: [
{
type: 'user.message',
content: [{ type: 'text', text: userMessage }],
},
],
}),
},
)
if (!sendRes.ok) {
const text = await sendRes.text()
writeJson({
type: 'error',
message: `Failed to send user message: ${sendRes.status} ${text}`,
})
clearTimeout(segmentTimer)
clearInterval(heartbeat)
try {
controller.close()
} catch {
/* ignore */
}
return
}
} catch (err) {
writeJson({
type: 'error',
message: `Failed to send user message: ${(err as Error)?.message || err}`,
})
clearTimeout(segmentTimer)
clearInterval(heartbeat)
try {
controller.close()
} catch {
/* ignore */
}
return
}
}
let done = false
let iteration = 0
try {
while (!done && !segmenting) {
iteration++
if (iteration > 1) {
await abortableSleep(RECONNECT_BACKOFF_MS, segmentAbort.signal)
if (segmenting) break
}
if (Date.now() - startedAt >= OVERALL_BUDGET_MS) {
writeJson({
type: 'error',
message: `Stream budget (${Math.round(
OVERALL_BUDGET_MS / 60_000,
)} min) exhausted. The session may still be running — try again or check the Console.`,
})
break
}
// Open the live stream FIRST so any events emitted while
// we're backfilling are still captured on the wire.
let upstream: ReadableStream<Uint8Array>
try {
upstream = await openStream(
sessionId,
apiKey,
segmentAbort.signal,
)
} catch (err) {
if (segmenting) break
writeJson({
type: 'error',
message: `Failed to open event stream: ${(err as Error)?.message || err}`,
})
break
}
// Backfill anything we missed. The events endpoint does
// NOT support `after_id`, so we always pull the full session
// history and skip everything up to and including
// `initialLastEventId` (the cursor handed to us by the
// previous segment via the resume payload). Within this
// segment, `seenEventIds` dedupes against the live stream.
try {
for await (const event of listAllEvents(
sessionId,
apiKey,
segmentAbort.signal,
)) {
if (segmenting) break
if (event.id) {
if (!pastInitialId) {
// Already delivered to the user in a previous
// segment — mark seen so the live stream doesn't
// re-emit it, but do NOT call handle().
seenEventIds.add(event.id)
lastEventIdSeen = event.id
if (event.id === initialLastEventId) {
pastInitialId = true
}
} else if (!seenEventIds.has(event.id)) {
seenEventIds.add(event.id)
lastEventIdSeen = event.id
handle(event)
} else {
// Already handled earlier in this segment — keep
// the cursor moving forward.
lastEventIdSeen = event.id
}
}
if (isTerminal(event) && pastInitialId) {
done = true
break
}
}
} catch (err) {
if (!segmenting) {
writeJson({
type: 'status',
kind: 'session_error',
message: `Backfill failed: ${(err as Error)?.message || err}`,
})
// Fall through and try the live stream.
}
}
// If the cursor handed to us by the previous segment didn't
// appear in the session's history (stale / corrupted /
// history truncated), don't get stuck muting events forever
// — start delivering whatever shows up next.
if (!pastInitialId) {
pastInitialId = true
}
if (done || segmenting) {
// The upstream fetch is already cleaned up by
// `segmentAbort.abort()` (or will be by the runtime once
// we drop our reference). Don't `await upstream.cancel()`
// here — on an already-aborted body that call can hang
// on Deno Edge, which would prevent us from writing the
// final `segment_end` line.
break
}
// Tail the live stream until upstream EOFs (drop), the
// segment timer fires, or a terminal event arrives.
try {
for await (const event of parseSse(
upstream,
segmentAbort.signal,
)) {
if (segmenting) break
if (event.id) {
if (!seenEventIds.has(event.id)) {
seenEventIds.add(event.id)
lastEventIdSeen = event.id
handle(event)
} else {
lastEventIdSeen = event.id
}
}
if (isTerminal(event)) {
done = true
break
}
}
} catch {
// Most commonly: AbortError from segmentAbort. Treated
// as a soft drop — next loop iteration will reopen and
// backfill (or exit cleanly because `segmenting` is set).
}
}
if (done) {
writeJson({ type: 'done' })
} else if (segmenting) {
writeJson({
type: 'segment_end',
sessionId,
lastEventId: lastEventIdSeen,
startedAt,
})
}
} catch (err) {
writeJson({
type: 'error',
message: (err as Error)?.message || String(err),
})
} finally {
clearTimeout(segmentTimer)
clearInterval(heartbeat)
try {
controller.close()
} catch {
/* ignore */
}
}
},
})
return new Response(body, {
status: 200,
headers: {
'Content-Type': 'application/x-ndjson; charset=utf-8',
'Cache-Control': 'no-cache, no-transform',
'X-Accel-Buffering': 'no',
},
})
}
export const config: Config = {
path: '/api/agent-proxy',
}
-298
View File
@@ -1,298 +0,0 @@
import Anthropic from '@anthropic-ai/sdk'
/**
* Netlify Functions v2 handler.
*
* Proxies a request from the React frontend to a Claude Console-defined
* Managed Agent via the /v1/sessions endpoints. The agent's model, system
* prompt, tools, MCP servers, and skills are configured in the Console and
* referenced here by ID. We open the SSE event stream first, then send a
* `user.message` event, and pipe each `agent.message` text block to the
* client through a ReadableStream so we never hit Netlify's 26-second
* synchronous execution limit.
*/
// All Managed Agents endpoints require this beta header.
const MANAGED_AGENTS_BETA = 'managed-agents-2026-04-01'
// ---------------------------------------------------------------------------
// Anthropic client (reused across warm Lambda invocations)
// ---------------------------------------------------------------------------
let _client
function getClient() {
if (_client) return _client
if (!process.env.ANTHROPIC_API_KEY) {
throw new Error('Server is missing ANTHROPIC_API_KEY')
}
_client = new Anthropic({
apiKey: process.env.ANTHROPIC_API_KEY,
defaultHeaders: { 'anthropic-beta': MANAGED_AGENTS_BETA },
})
return _client
}
// ---------------------------------------------------------------------------
// Handler
// ---------------------------------------------------------------------------
export default async (req /*, context */) => {
if (req.method !== 'POST') {
return new Response('Method Not Allowed', { status: 405 })
}
let payload
try {
payload = await req.json()
} catch {
return new Response('Invalid JSON body', { status: 400 })
}
const {
stationName = '',
stationLocation = '',
stationWebsite = '',
} = payload || {}
// Server-side URL validation (mirrors the client-side regex).
const URL_REGEX =
/^(https?:\/\/)?([\w-]+\.)+[\w-]{2,}(\/[\w\-._~:/?#[\]@!$&'()*+,;=%]*)?$/i
if (!URL_REGEX.test(stationWebsite)) {
return new Response('Invalid station website URL', { status: 400 })
}
// Read env vars at request time (Netlify makes them available per-invocation).
const AGENT_ID = process.env.ANTHROPIC_AGENT_ID
const ENVIRONMENT_ID = process.env.ANTHROPIC_ENVIRONMENT_ID
const VAULT_IDS = (process.env.ANTHROPIC_VAULT_IDS || '')
.split(',')
.map((s) => s.trim())
.filter(Boolean)
const missing = []
if (!AGENT_ID) missing.push('ANTHROPIC_AGENT_ID')
if (!ENVIRONMENT_ID) missing.push('ANTHROPIC_ENVIRONMENT_ID')
if (missing.length) {
return new Response(`Server is missing env var(s): ${missing.join(', ')}`, {
status: 500,
})
}
let client
try {
client = getClient()
} catch (err) {
return new Response(
`Failed to initialize agent: ${err.message || err}`,
{ status: 500 },
)
}
const userMessage = `Here is a new public media station intake:
- Station Name: ${stationName}
- Station Location: ${stationLocation}
- Station Website: ${stationWebsite}
Please follow your instructions to produce the funding outlook brief.`
// ----- Create the session -----
let session
try {
const sessionParams = {
agent: AGENT_ID,
environment_id: ENVIRONMENT_ID,
}
if (VAULT_IDS.length) sessionParams.vault_ids = VAULT_IDS
session = await client.beta.sessions.create(sessionParams)
} catch (err) {
return new Response(
`Failed to create session: ${err.message || err}`,
{ status: 500 },
)
}
// ----- Open the event stream BEFORE sending the user message -----
// (Stream-first ensures we don't miss any events the agent emits.)
let upstream
try {
upstream = await client.beta.sessions.events.stream(session.id)
} catch (err) {
return new Response(
`Failed to open session event stream: ${err.message || err}`,
{ status: 500 },
)
}
// ----- Send the kickoff user.message event -----
try {
await client.beta.sessions.events.send(session.id, {
events: [
{
type: 'user.message',
content: [{ type: 'text', text: userMessage }],
},
],
})
} catch (err) {
return new Response(
`Failed to send user message: ${err.message || err}`,
{ status: 500 },
)
}
// ----- Pipe agent activity to the client -----
// Long stretches between agent.message events (model "thinking",
// multi-tool batches, etc.) can silence the response stream long enough
// that Netlify's edge proxy drops the connection. To keep bytes flowing
// we (a) forward extra event types as concise status lines and (b) emit
// a periodic heartbeat space when the upstream goes quiet.
const encoder = new TextEncoder()
const seenEventIds = new Set()
const writtenFiles = new Set() // dedupe writes by path
const HEARTBEAT_MS = 10_000
const stream = new ReadableStream({
async start(controller) {
let lastSendAt = Date.now()
const send = (s) => {
controller.enqueue(encoder.encode(s))
lastSendAt = Date.now()
}
// Zero-width space — invisible in rendered Markdown but counts as a
// byte on the wire, which is enough to defeat proxy idle-timeouts.
const heartbeat = setInterval(() => {
if (Date.now() - lastSendAt >= HEARTBEAT_MS) {
try {
controller.enqueue(encoder.encode('\u200B'))
lastSendAt = Date.now()
} catch {
/* controller may have closed */
}
}
}, HEARTBEAT_MS / 2)
try {
for await (const event of upstream) {
if (event.id && !seenEventIds.has(event.id)) {
seenEventIds.add(event.id)
switch (event.type) {
case 'agent.message': {
if (Array.isArray(event.content)) {
for (const block of event.content) {
if (block?.type === 'text' && block.text) {
send(block.text + '\n\n')
}
}
}
break
}
case 'agent.thinking': {
send(`\n\n_💭 thinking…_\n\n`)
break
}
case 'agent.tool_use': {
// The write tool carries the actual brief in input.content.
if (
event.name === 'write' &&
typeof event.input?.content === 'string'
) {
const path = event.input.file_path || event.input.path || ''
if (!writtenFiles.has(path)) {
writtenFiles.add(path)
send('\n\n---\n\n' + event.input.content + '\n\n')
}
} else {
const label = describeToolUse(event)
if (label) send(`\n\n_${label}_\n\n`)
}
break
}
case 'agent.mcp_tool_use': {
const label = describeToolUse(event)
if (label) send(`\n\n_${label}_\n\n`)
break
}
case 'agent.tool_result':
case 'agent.mcp_tool_result': {
if (event.is_error) {
const msg =
(Array.isArray(event.content) &&
event.content[0]?.text) ||
'tool error'
send(`\n\n_⚠️ tool error: ${String(msg).slice(0, 200)}_\n\n`)
} else {
send(`\n\n_✓ result_\n\n`)
}
break
}
case 'session.error': {
const msg = event.error?.message || 'unknown session error'
send(`\n\n[session error: ${msg}]`)
break
}
}
}
// Terminal-state gate.
if (event.type === 'session.status_terminated') break
if (
event.type === 'session.status_idle' &&
event.stop_reason?.type !== 'requires_action'
) {
break
}
}
clearInterval(heartbeat)
controller.close()
} catch (err) {
clearInterval(heartbeat)
send(`\n\n[stream error: ${err.message || err}]`)
controller.close()
}
},
cancel() {
// If the client disconnects, abort the upstream stream.
try {
upstream.controller?.abort?.()
} catch {
/* ignore */
}
},
})
function describeToolUse(event) {
const name = event.name || 'tool'
const input = event.input || {}
if (name === 'web_search' && input.query) return `🔍 Searching: ${input.query}`
if (name === 'web_fetch' && input.url) return `🌐 Fetching: ${input.url}`
if (event.type === 'agent.mcp_tool_use') {
const args = Object.entries(input)
.filter(([k]) => k !== 'limit')
.slice(0, 3)
.map(([k, v]) =>
typeof v === 'object' ? `${k}=…` : `${k}=${String(v).slice(0, 40)}`,
)
.join(', ')
return `🛠️ ${name}${args ? ` (${args})` : ''}`
}
return `🛠️ ${name}`
}
return new Response(stream, {
status: 200,
headers: {
'Content-Type': 'text/plain; charset=utf-8',
'Cache-Control': 'no-cache, no-transform',
'X-Accel-Buffering': 'no',
},
})
}
export const config = {
path: '/.netlify/functions/agent-proxy',
}
+1705 -69
View File
File diff suppressed because it is too large Load Diff
+1 -1
View File
@@ -10,13 +10,13 @@
"preview": "vite preview" "preview": "vite preview"
}, },
"dependencies": { "dependencies": {
"@anthropic-ai/sdk": "^0.95.2",
"react": "^18.3.1", "react": "^18.3.1",
"react-dom": "^18.3.1", "react-dom": "^18.3.1",
"react-markdown": "^9.0.1", "react-markdown": "^9.0.1",
"remark-gfm": "^4.0.0" "remark-gfm": "^4.0.0"
}, },
"devDependencies": { "devDependencies": {
"@netlify/edge-functions": "^2.11.1",
"@vitejs/plugin-react": "^4.3.4", "@vitejs/plugin-react": "^4.3.4",
"autoprefixer": "^10.4.20", "autoprefixer": "^10.4.20",
"postcss": "^8.4.49", "postcss": "^8.4.49",
+121 -13
View File
@@ -20,6 +20,7 @@ export default function App() {
const [isThinking, setIsThinking] = useState(false) const [isThinking, setIsThinking] = useState(false)
const [isStreaming, setIsStreaming] = useState(false) const [isStreaming, setIsStreaming] = useState(false)
const [result, setResult] = useState('') const [result, setResult] = useState('')
const [status, setStatus] = useState(null)
const [streamError, setStreamError] = useState(null) const [streamError, setStreamError] = useState(null)
const resultRef = useRef(null) const resultRef = useRef(null)
@@ -47,27 +48,91 @@ export default function App() {
return Object.keys(next).length === 0 return Object.keys(next).length === 0
} }
// Parse the NDJSON stream from /api/agent-proxy: one JSON object per
// line. Returns a resume payload `{sessionId, lastEventId, startedAt}`
// if the server emitted a `segment_end` (Netlify Edge caps a single
// streaming response at ~60 s, so longer briefs span multiple
// segments). Returns null when the stream ends cleanly (`done`,
// `error`, or upstream EOF without a segment_end).
const readStream = async (response) => { const readStream = async (response) => {
const reader = response.body.getReader() const reader = response.body.getReader()
const decoder = new TextDecoder('utf-8') const decoder = new TextDecoder('utf-8')
let firstChunk = true let buffer = ''
let firstByte = true
let segmentEnd = null
let fatalError = null
const handleEvent = (msg) => {
switch (msg?.type) {
case 'text':
if (typeof msg.text === 'string' && msg.text) {
setResult((prev) => prev + msg.text)
}
break
case 'status':
setStatus(msg)
break
case 'segment_end':
if (typeof msg.sessionId === 'string' && msg.sessionId) {
segmentEnd = {
sessionId: msg.sessionId,
lastEventId: msg.lastEventId,
startedAt: msg.startedAt,
}
}
break
case 'error':
case 'session_error':
fatalError = msg.message || 'streaming error'
setStreamError(fatalError)
break
case 'done':
case 'heartbeat':
default:
break
}
}
const flushLines = () => {
let nl
while ((nl = buffer.indexOf('\n')) !== -1) {
const line = buffer.slice(0, nl).trim()
buffer = buffer.slice(nl + 1)
if (!line) continue
try {
handleEvent(JSON.parse(line))
} catch {
// Ignore malformed lines — the heartbeat / done sentinel
// will still tell us when the stream is over.
}
}
}
while (true) { while (true) {
const { done, value } = await reader.read() const { done, value } = await reader.read()
if (done) break if (done) break
const chunk = decoder.decode(value, { stream: true }) buffer += decoder.decode(value, { stream: true })
if (firstChunk) { if (firstByte) {
// Once data starts flowing, flip 'thinking' off and 'streaming' on. // Once any bytes arrive, flip 'thinking' off and 'streaming' on.
setIsThinking(false) setIsThinking(false)
setIsStreaming(true) setIsStreaming(true)
firstChunk = false firstByte = false
} }
setResult((prev) => prev + chunk) flushLines()
} }
// Flush any remaining buffered bytes // Flush any trailing buffered bytes (line without final \n).
const tail = decoder.decode() buffer += decoder.decode()
if (tail) setResult((prev) => prev + tail) if (buffer.trim()) {
setIsStreaming(false) try {
handleEvent(JSON.parse(buffer.trim()))
} catch {
/* ignore */
}
}
// Only return a resume payload if we got a clean segment_end AND
// no fatal error fired in the same segment.
return fatalError ? null : segmentEnd
} }
const handleSubmit = async (e) => { const handleSubmit = async (e) => {
@@ -75,16 +140,32 @@ export default function App() {
if (!validate()) return if (!validate()) return
setResult('') setResult('')
setStatus(null)
setStreamError(null) setStreamError(null)
setIsSubmitting(true) setIsSubmitting(true)
setIsThinking(true) setIsThinking(true)
setIsStreaming(false) setIsStreaming(false)
try { try {
const response = await fetch('/.netlify/functions/agent-proxy', { let resume = null
// Loop across segments. Each iteration is one HTTP request;
// the server closes it at ~54 s and the next iteration picks
// up from `lastEventId` so the user sees one continuous brief.
// Hard cap on iterations as a belt-and-braces guard against a
// runaway segment loop.
for (let i = 0; i < 40; i++) {
const body = resume
? {
sessionId: resume.sessionId,
lastEventId: resume.lastEventId,
startedAt: resume.startedAt,
}
: form
const response = await fetch('/api/agent-proxy', {
method: 'POST', method: 'POST',
headers: { 'Content-Type': 'application/json' }, headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(form), body: JSON.stringify(body),
}) })
if (!response.ok || !response.body) { if (!response.ok || !response.body) {
@@ -93,7 +174,10 @@ export default function App() {
) )
} }
await readStream(response) const next = await readStream(response)
if (!next) break
resume = next
}
} catch (err) { } catch (err) {
console.error(err) console.error(err)
setStreamError(err.message || 'Something went wrong while streaming.') setStreamError(err.message || 'Something went wrong while streaming.')
@@ -101,6 +185,7 @@ export default function App() {
setIsSubmitting(false) setIsSubmitting(false)
setIsThinking(false) setIsThinking(false)
setIsStreaming(false) setIsStreaming(false)
setStatus(null)
} }
} }
@@ -184,6 +269,12 @@ export default function App() {
</StatusBanner> </StatusBanner>
)} )}
{isStreaming && status && (
<StatusBanner>
<Spinner /> {renderStatus(status)}
</StatusBanner>
)}
{streamError && ( {streamError && (
<div className="rounded-md border border-red-300 bg-red-50 p-4 text-red-800"> <div className="rounded-md border border-red-300 bg-red-50 p-4 text-red-800">
{streamError} {streamError}
@@ -228,6 +319,23 @@ function Field({ label, name, type = 'text', value, onChange, error, placeholder
) )
} }
function renderStatus(status) {
if (!status) return null
switch (status.kind) {
case 'thinking':
return 'Thinking'
case 'tool_use':
return status.label || `Using tool: ${status.name || 'tool'}`
case 'tool_result':
if (status.ok) return 'Tool finished continuing'
return `Tool error: ${status.message || 'unknown error'}`
case 'session_error':
return `Session error: ${status.message || 'unknown error'}`
default:
return 'Working'
}
}
function StatusBanner({ children }) { function StatusBanner({ children }) {
return ( return (
<div className="mb-4 flex items-center gap-3 rounded-md border border-stone-300 bg-stone-100 px-4 py-3 text-stone-700"> <div className="mb-4 flex items-center gap-3 rounded-md border border-stone-300 bg-stone-100 px-4 py-3 text-stone-700">