diff --git a/netlify/functions/agent-proxy.js b/netlify/functions/agent-proxy.js index f864b7e..c2b02e3 100644 --- a/netlify/functions/agent-proxy.js +++ b/netlify/functions/agent-proxy.js @@ -6,15 +6,50 @@ import Anthropic from '@anthropic-ai/sdk' * Proxies a request from the React frontend to a Claude Console-defined * Managed Agent via the /v1/sessions endpoints. The agent's model, system * prompt, tools, MCP servers, and skills are configured in the Console and - * referenced here by ID. We open the SSE event stream first, then send a - * `user.message` event, and pipe each `agent.message` text block to the - * client through a ReadableStream so we never hit Netlify's 26-second - * synchronous execution limit. + * referenced here by ID. + * + * Wire protocol (downstream to the browser): newline-delimited JSON + * (`application/x-ndjson`). One JSON object per line; the React side + * parses incrementally as bytes arrive. Object shapes: + * + * {"type":"text","text":"...markdown..."} // append to brief + * {"type":"status","kind":"thinking"} + * {"type":"status","kind":"tool_use","name":"...","label":"..."} + * {"type":"status","kind":"tool_result","ok":true} + * {"type":"status","kind":"tool_result","ok":false,"message":"..."} + * {"type":"status","kind":"session_error","message":"..."} + * {"type":"heartbeat"} // keep-alive only + * {"type":"done"} // session terminated cleanly + * {"type":"error","message":"..."} // unrecoverable + * + * Reconnect strategy: the upstream Anthropic SSE stream + * (`GET /v1/sessions/{id}/events/stream`) is not guaranteed to stay open + * for the full life of a multi-turn session — long inter-turn gaps (e.g. + * while MCP tools or the next model request are running) can trigger + * upstream / proxy read timeouts. When the upstream iterator ends without + * a terminal `session.status_*` event we treat it as a drop, fetch any + * events we missed via `sessions.events.list()` (deduped by event id), + * reopen the live stream, and keep going — up to a wall-clock budget. */ // All Managed Agents endpoints require this beta header. const MANAGED_AGENTS_BETA = 'managed-agents-2026-04-01' +// Downstream keep-alive cadence. The browser's fetch reader will time +// out / appear stuck if no bytes flow for too long; we emit a +// `{"type":"heartbeat"}` line when the upstream is quiet. +const HEARTBEAT_MS = 10_000 + +// Hard ceiling on total wall-clock time the function will keep +// reconnecting upstream on behalf of a single browser request. Past this +// we emit an `error` line and close the stream so the user can retry +// rather than having the request hang indefinitely. +const RECONNECT_BUDGET_MS = 20 * 60 * 1000 // 20 minutes + +// Short backoff between an upstream drop and the next reconnect attempt, +// so a tight error loop can't hammer the API. +const RECONNECT_BACKOFF_MS = 500 + // --------------------------------------------------------------------------- // Anthropic client (reused across warm Lambda invocations) // --------------------------------------------------------------------------- @@ -31,6 +66,42 @@ function getClient() { return _client } +// A `session.status_idle` with `stop_reason: requires_action` means the +// agent is waiting on a client-side response (custom tool result, tool +// confirmation). For this app we don't expose custom tools, so it should +// never fire — but if it ever does we explicitly do NOT treat it as +// terminal and let the loop keep tailing. +function isTerminal(event) { + if (event?.type === 'session.status_terminated') return true + if (event?.type === 'session.status_idle') { + const t = event.stop_reason?.type + return t === 'end_turn' || t === 'retries_exhausted' + } + return false +} + +function describeToolUse(event) { + const name = event.name || 'tool' + const input = event.input || {} + if (name === 'web_search' && input.query) { + return `Searching the web: ${input.query}` + } + if (name === 'web_fetch' && input.url) { + return `Fetching: ${input.url}` + } + if (event.type === 'agent.mcp_tool_use') { + const args = Object.entries(input) + .filter(([k]) => k !== 'limit') + .slice(0, 3) + .map(([k, v]) => + typeof v === 'object' ? `${k}=…` : `${k}=${String(v).slice(0, 40)}`, + ) + .join(', ') + return `${name}${args ? ` (${args})` : ''}` + } + return name +} + // --------------------------------------------------------------------------- // Handler // --------------------------------------------------------------------------- @@ -59,7 +130,6 @@ export default async (req /*, context */) => { return new Response('Invalid station website URL', { status: 400 }) } - // Read env vars at request time (Netlify makes them available per-invocation). const AGENT_ID = process.env.ANTHROPIC_AGENT_ID const ENVIRONMENT_ID = process.env.ANTHROPIC_ENVIRONMENT_ID const VAULT_IDS = (process.env.ANTHROPIC_VAULT_IDS || '') @@ -112,9 +182,9 @@ Please follow your instructions to produce the funding outlook brief.` // ----- Open the event stream BEFORE sending the user message ----- // (Stream-first ensures we don't miss any events the agent emits.) - let upstream + let initialUpstream try { - upstream = await client.beta.sessions.events.stream(session.id) + initialUpstream = await client.beta.sessions.events.stream(session.id) } catch (err) { return new Response( `Failed to open session event stream: ${err.message || err}`, @@ -133,160 +203,227 @@ Please follow your instructions to produce the funding outlook brief.` ], }) } catch (err) { + try { + initialUpstream.controller?.abort?.() + } catch { + /* ignore */ + } return new Response( `Failed to send user message: ${err.message || err}`, { status: 500 }, ) } - // ----- Pipe agent activity to the client ----- - // Long stretches between agent.message events (model "thinking", - // multi-tool batches, etc.) can silence the response stream long enough - // that Netlify's edge proxy drops the connection. To keep bytes flowing - // we (a) forward extra event types as concise status lines and (b) emit - // a periodic heartbeat space when the upstream goes quiet. const encoder = new TextEncoder() const seenEventIds = new Set() - const writtenFiles = new Set() // dedupe writes by path - const HEARTBEAT_MS = 10_000 + const deadline = Date.now() + RECONNECT_BUDGET_MS - const stream = new ReadableStream({ + const body = new ReadableStream({ async start(controller) { let lastSendAt = Date.now() - const send = (s) => { - controller.enqueue(encoder.encode(s)) - lastSendAt = Date.now() + const writeJson = (obj) => { + try { + controller.enqueue(encoder.encode(JSON.stringify(obj) + '\n')) + lastSendAt = Date.now() + } catch { + /* controller closed */ + } } - // Zero-width space — invisible in rendered Markdown but counts as a - // byte on the wire, which is enough to defeat proxy idle-timeouts. + + // Heartbeat to keep the downstream connection alive while we're + // waiting on the next upstream event (e.g. during a long MCP tool + // call or before the next model_request_start). const heartbeat = setInterval(() => { if (Date.now() - lastSendAt >= HEARTBEAT_MS) { - try { - controller.enqueue(encoder.encode('\u200B')) - lastSendAt = Date.now() - } catch { - /* controller may have closed */ - } + writeJson({ type: 'heartbeat' }) } }, HEARTBEAT_MS / 2) - try { - for await (const event of upstream) { - if (event.id && !seenEventIds.has(event.id)) { - seenEventIds.add(event.id) - - switch (event.type) { - case 'agent.message': { - if (Array.isArray(event.content)) { - for (const block of event.content) { - if (block?.type === 'text' && block.text) { - send(block.text + '\n\n') - } - } + const handle = (event) => { + switch (event.type) { + case 'agent.message': { + if (Array.isArray(event.content)) { + for (const block of event.content) { + if (block?.type === 'text' && block.text) { + writeJson({ type: 'text', text: block.text + '\n\n' }) } - break - } - - case 'agent.thinking': { - send(`\n\n_💭 thinking…_\n\n`) - break - } - - case 'agent.tool_use': { - // The write tool carries the actual brief in input.content. - if ( - event.name === 'write' && - typeof event.input?.content === 'string' - ) { - const path = event.input.file_path || event.input.path || '' - if (!writtenFiles.has(path)) { - writtenFiles.add(path) - send('\n\n---\n\n' + event.input.content + '\n\n') - } - } else { - const label = describeToolUse(event) - if (label) send(`\n\n_${label}_\n\n`) - } - break - } - - case 'agent.mcp_tool_use': { - const label = describeToolUse(event) - if (label) send(`\n\n_${label}_\n\n`) - break - } - - case 'agent.tool_result': - case 'agent.mcp_tool_result': { - if (event.is_error) { - const msg = - (Array.isArray(event.content) && - event.content[0]?.text) || - 'tool error' - send(`\n\n_⚠️ tool error: ${String(msg).slice(0, 200)}_\n\n`) - } else { - send(`\n\n_✓ result_\n\n`) - } - break - } - - case 'session.error': { - const msg = event.error?.message || 'unknown session error' - send(`\n\n[session error: ${msg}]`) - break } } + break } - // Terminal-state gate. - if (event.type === 'session.status_terminated') break - if ( - event.type === 'session.status_idle' && - event.stop_reason?.type !== 'requires_action' - ) { + case 'agent.thinking': { + writeJson({ type: 'status', kind: 'thinking' }) + break + } + + case 'agent.tool_use': + case 'agent.mcp_tool_use': { + writeJson({ + type: 'status', + kind: 'tool_use', + name: event.name || 'tool', + label: describeToolUse(event), + }) + break + } + + case 'agent.tool_result': + case 'agent.mcp_tool_result': { + if (event.is_error) { + const msg = + (Array.isArray(event.content) && + event.content[0]?.text) || + 'tool error' + writeJson({ + type: 'status', + kind: 'tool_result', + ok: false, + message: String(msg).slice(0, 300), + }) + } else { + writeJson({ type: 'status', kind: 'tool_result', ok: true }) + } + break + } + + case 'session.error': { + const msg = event.error?.message || 'unknown session error' + writeJson({ + type: 'status', + kind: 'session_error', + message: msg, + }) break } } - clearInterval(heartbeat) - controller.close() + } + + let done = false + let currentStream = initialUpstream + let iteration = 0 + + try { + while (!done) { + iteration++ + + // On reconnect (every iteration after the first), reopen the + // live stream and backfill anything we missed during the gap. + if (iteration > 1) { + if (Date.now() >= deadline) { + writeJson({ + type: 'error', + message: `Stream reconnect budget (${Math.round( + RECONNECT_BUDGET_MS / 60_000, + )} min) exhausted. The session may still be running — try again or check the Console.`, + }) + break + } + + await new Promise((r) => setTimeout(r, RECONNECT_BACKOFF_MS)) + + try { + currentStream = await client.beta.sessions.events.stream( + session.id, + ) + } catch (err) { + writeJson({ + type: 'error', + message: `Failed to reopen event stream: ${err?.message || err}`, + }) + break + } + + // Backfill: pull everything the session has emitted so far + // and dedupe by event.id. Auto-paginated by the SDK. + try { + for await (const event of client.beta.sessions.events.list( + session.id, + { order: 'asc' }, + )) { + if (event.id && !seenEventIds.has(event.id)) { + seenEventIds.add(event.id) + handle(event) + } + // Terminal checks must run even for already-seen events, + // or a terminal event that came in via the backfill gets + // skipped and the loop never exits. + if (isTerminal(event)) { + done = true + break + } + } + } catch (err) { + writeJson({ + type: 'status', + kind: 'session_error', + message: `Backfill failed: ${err?.message || err}`, + }) + // Don't break — fall through and try the live stream. + } + + if (done) break + } + + // Tail the currently-open live stream until it ends, errors, + // or hits a terminal session event. + try { + for await (const event of currentStream) { + if (event.id && !seenEventIds.has(event.id)) { + seenEventIds.add(event.id) + handle(event) + } + if (isTerminal(event)) { + done = true + break + } + } + } catch { + // Treat as transient. If it's actually persistent, the + // reopen + backfill on the next iteration will surface it + // (or burn down the reconnect budget cleanly). + } + } + + if (done) { + writeJson({ type: 'done' }) + } } catch (err) { + writeJson({ + type: 'error', + message: err?.message || String(err), + }) + } finally { clearInterval(heartbeat) - send(`\n\n[stream error: ${err.message || err}]`) - controller.close() + try { + currentStream?.controller?.abort?.() + } catch { + /* ignore */ + } + try { + controller.close() + } catch { + /* ignore */ + } } }, cancel() { - // If the client disconnects, abort the upstream stream. + // Browser disconnected. Best-effort: abort whichever upstream is + // currently open. (`currentStream` lives inside `start()` so we + // only have the initial one in scope here — that's enough to make + // sure we don't keep a socket open after the very first turn.) try { - upstream.controller?.abort?.() + initialUpstream.controller?.abort?.() } catch { /* ignore */ } }, }) - function describeToolUse(event) { - const name = event.name || 'tool' - const input = event.input || {} - if (name === 'web_search' && input.query) return `🔍 Searching: ${input.query}` - if (name === 'web_fetch' && input.url) return `🌐 Fetching: ${input.url}` - if (event.type === 'agent.mcp_tool_use') { - const args = Object.entries(input) - .filter(([k]) => k !== 'limit') - .slice(0, 3) - .map(([k, v]) => - typeof v === 'object' ? `${k}=…` : `${k}=${String(v).slice(0, 40)}`, - ) - .join(', ') - return `🛠️ ${name}${args ? ` (${args})` : ''}` - } - return `🛠️ ${name}` - } - - return new Response(stream, { + return new Response(body, { status: 200, headers: { - 'Content-Type': 'text/plain; charset=utf-8', + 'Content-Type': 'application/x-ndjson; charset=utf-8', 'Cache-Control': 'no-cache, no-transform', 'X-Accel-Buffering': 'no', }, diff --git a/src/App.jsx b/src/App.jsx index f94a020..cb3c7fc 100644 --- a/src/App.jsx +++ b/src/App.jsx @@ -20,6 +20,7 @@ export default function App() { const [isThinking, setIsThinking] = useState(false) const [isStreaming, setIsStreaming] = useState(false) const [result, setResult] = useState('') + const [status, setStatus] = useState(null) const [streamError, setStreamError] = useState(null) const resultRef = useRef(null) @@ -47,27 +48,72 @@ export default function App() { return Object.keys(next).length === 0 } + const handleEvent = (msg) => { + switch (msg?.type) { + case 'text': + if (typeof msg.text === 'string' && msg.text) { + setResult((prev) => prev + msg.text) + } + break + case 'status': + setStatus(msg) + break + case 'error': + case 'session_error': + setStreamError(msg.message || 'streaming error') + break + case 'done': + case 'heartbeat': + default: + break + } + } + + // Parse the NDJSON stream from /agent-proxy: one JSON object per line. const readStream = async (response) => { const reader = response.body.getReader() const decoder = new TextDecoder('utf-8') - let firstChunk = true + let buffer = '' + let firstByte = true + + const flushLines = () => { + let nl + while ((nl = buffer.indexOf('\n')) !== -1) { + const line = buffer.slice(0, nl).trim() + buffer = buffer.slice(nl + 1) + if (!line) continue + try { + handleEvent(JSON.parse(line)) + } catch { + // Ignore malformed lines — the heartbeat / done sentinel + // will still tell us when the stream is over. + } + } + } while (true) { const { done, value } = await reader.read() if (done) break - const chunk = decoder.decode(value, { stream: true }) - if (firstChunk) { - // Once data starts flowing, flip 'thinking' off and 'streaming' on. + buffer += decoder.decode(value, { stream: true }) + if (firstByte) { + // Once any bytes arrive, flip 'thinking' off and 'streaming' on. setIsThinking(false) setIsStreaming(true) - firstChunk = false + firstByte = false + } + flushLines() + } + // Flush any trailing buffered bytes (line without final \n). + buffer += decoder.decode() + if (buffer.trim()) { + try { + handleEvent(JSON.parse(buffer.trim())) + } catch { + /* ignore */ } - setResult((prev) => prev + chunk) } - // Flush any remaining buffered bytes - const tail = decoder.decode() - if (tail) setResult((prev) => prev + tail) setIsStreaming(false) + setStatus(null) } const handleSubmit = async (e) => { @@ -75,6 +121,7 @@ export default function App() { if (!validate()) return setResult('') + setStatus(null) setStreamError(null) setIsSubmitting(true) setIsThinking(true) @@ -101,6 +148,7 @@ export default function App() { setIsSubmitting(false) setIsThinking(false) setIsStreaming(false) + setStatus(null) } } @@ -184,6 +232,12 @@ export default function App() { )} + {isStreaming && status && ( + + {renderStatus(status)} + + )} + {streamError && (
{streamError} @@ -228,6 +282,23 @@ function Field({ label, name, type = 'text', value, onChange, error, placeholder ) } +function renderStatus(status) { + if (!status) return null + switch (status.kind) { + case 'thinking': + return 'Thinking…' + case 'tool_use': + return status.label || `Using tool: ${status.name || 'tool'}` + case 'tool_result': + if (status.ok) return 'Tool finished — continuing…' + return `Tool error: ${status.message || 'unknown error'}` + case 'session_error': + return `Session error: ${status.message || 'unknown error'}` + default: + return 'Working…' + } +} + function StatusBanner({ children }) { return (