From df3c8e7d1cad11b2eb0e4605734c7ca72d110597 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 13 May 2026 13:08:54 +0000 Subject: [PATCH] fix(agent-proxy): events.list uses opaque `page` cursor, not `after_id` MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The Managed Agents events endpoint (`GET /v1/sessions/{id}/events`) does NOT support filtering by event id. It returns an opaque `next_page` cursor on each response and accepts it back via the `page` query parameter; an `after_id=` filter returns 400 Bad Request, which caused every segment resume to fail backfill (visible as `{"type":"status","kind":"session_error","message":"Backfill failed: events.list returned 400 Bad Request"}`). Caught during testing of commit 8e44de5: resuming from a segment boundary always returned 400 and the brief silently lost events from the previous segment. Changes: - `listAllEvents` now paginates via `page` / `next_page` and pulls the full session history (limit=1000). The Anthropic API has no per-id filter, so the caller is responsible for skipping events already delivered. - New `pastInitialId` flag at the top of the body loop: on resume, mute every event up to and including `initialLastEventId` (still adding them to `seenEventIds` so the live stream doesn't re-emit them), then start delivering. On a brand-new session the flag starts true and is a no-op. - Safety fallback: if backfill completes without ever seeing `initialLastEventId` (stale cursor / truncated history), flip the flag to true so we don't get stuck muting forever — the live stream will start delivering whatever shows up next. Co-Authored-By: alex --- netlify/edge-functions/agent-proxy.ts | 63 +++++++++++++++++++-------- 1 file changed, 44 insertions(+), 19 deletions(-) diff --git a/netlify/edge-functions/agent-proxy.ts b/netlify/edge-functions/agent-proxy.ts index f191b5b..2974ba6 100644 --- a/netlify/edge-functions/agent-proxy.ts +++ b/netlify/edge-functions/agent-proxy.ts @@ -93,8 +93,7 @@ type AgentEvent = { type EventsListPage = { data?: AgentEvent[] - has_more?: boolean - last_id?: string + next_page?: string } type NewSessionPayload = { @@ -221,17 +220,20 @@ async function* parseSse( } // Page through `GET /v1/sessions/{id}/events` to backfill anything -// we missed. Accepts an `after` id so we only fetch new events. +// we missed. The Managed Agents endpoint uses an opaque `page` cursor +// returned as `next_page` on each response; there is no `after_id` +// filter, so we always pull the full history and the caller is +// responsible for skipping events it has already handled (via +// `seenEventIds` and the resume-payload `lastEventId`). async function* listAllEvents( sessionId: string, apiKey: string, - after: string | undefined, signal?: AbortSignal, ): AsyncGenerator { - let cursor = after + let pageCursor: string | undefined for (;;) { - const params = new URLSearchParams({ order: 'asc', limit: '100' }) - if (cursor) params.set('after_id', cursor) + const params = new URLSearchParams({ order: 'asc', limit: '1000' }) + if (pageCursor) params.set('page', pageCursor) const res = await anthropicFetch( `/v1/sessions/${sessionId}/events?${params.toString()}`, apiKey, @@ -245,10 +247,8 @@ async function* listAllEvents( const page = (await res.json()) as EventsListPage const items = page.data || [] for (const ev of items) yield ev - if (!page.has_more) break - const lastId = page.last_id ?? items[items.length - 1]?.id - if (!lastId) break - cursor = lastId + if (!page.next_page) break + pageCursor = page.next_page } } @@ -419,6 +419,12 @@ Please follow your instructions to produce the funding outlook brief.` async start(controller) { let lastSendAt = Date.now() let lastEventIdSeen = initialLastEventId + // For resumes we replay the full session history via + // `listAllEvents` but mute every event up to and including + // `initialLastEventId` (the user has already seen those). Once + // we've cleared that cursor (or on a brand-new session where it + // is undefined), we start actually delivering events. + let pastInitialId = !initialLastEventId const writeJson = (obj: unknown) => { try { @@ -596,29 +602,40 @@ Please follow your instructions to produce the funding outlook brief.` break } - // Backfill anything we missed since `lastEventIdSeen`. - // Dedupes happen by event id; terminal events still flip - // `done` even if we've seen the id before. + // Backfill anything we missed. The events endpoint does + // NOT support `after_id`, so we always pull the full session + // history and skip everything up to and including + // `initialLastEventId` (the cursor handed to us by the + // previous segment via the resume payload). Within this + // segment, `seenEventIds` dedupes against the live stream. try { for await (const event of listAllEvents( sessionId, apiKey, - lastEventIdSeen, segmentAbort.signal, )) { if (segmenting) break if (event.id) { - if (!seenEventIds.has(event.id)) { + if (!pastInitialId) { + // Already delivered to the user in a previous + // segment — mark seen so the live stream doesn't + // re-emit it, but do NOT call handle(). + seenEventIds.add(event.id) + lastEventIdSeen = event.id + if (event.id === initialLastEventId) { + pastInitialId = true + } + } else if (!seenEventIds.has(event.id)) { seenEventIds.add(event.id) lastEventIdSeen = event.id handle(event) } else { - // Already handled in a previous iteration — but - // keep the cursor moving forward. + // Already handled earlier in this segment — keep + // the cursor moving forward. lastEventIdSeen = event.id } } - if (isTerminal(event)) { + if (isTerminal(event) && pastInitialId) { done = true break } @@ -634,6 +651,14 @@ Please follow your instructions to produce the funding outlook brief.` } } + // If the cursor handed to us by the previous segment didn't + // appear in the session's history (stale / corrupted / + // history truncated), don't get stuck muting events forever + // — start delivering whatever shows up next. + if (!pastInitialId) { + pastInitialId = true + } + if (done || segmenting) { // Drain the live-stream body we opened above so we don't // leak the connection.