diff --git a/netlify/edge-functions/agent-proxy.ts b/netlify/edge-functions/agent-proxy.ts index f191b5b..2974ba6 100644 --- a/netlify/edge-functions/agent-proxy.ts +++ b/netlify/edge-functions/agent-proxy.ts @@ -93,8 +93,7 @@ type AgentEvent = { type EventsListPage = { data?: AgentEvent[] - has_more?: boolean - last_id?: string + next_page?: string } type NewSessionPayload = { @@ -221,17 +220,20 @@ async function* parseSse( } // Page through `GET /v1/sessions/{id}/events` to backfill anything -// we missed. Accepts an `after` id so we only fetch new events. +// we missed. The Managed Agents endpoint uses an opaque `page` cursor +// returned as `next_page` on each response; there is no `after_id` +// filter, so we always pull the full history and the caller is +// responsible for skipping events it has already handled (via +// `seenEventIds` and the resume-payload `lastEventId`). async function* listAllEvents( sessionId: string, apiKey: string, - after: string | undefined, signal?: AbortSignal, ): AsyncGenerator { - let cursor = after + let pageCursor: string | undefined for (;;) { - const params = new URLSearchParams({ order: 'asc', limit: '100' }) - if (cursor) params.set('after_id', cursor) + const params = new URLSearchParams({ order: 'asc', limit: '1000' }) + if (pageCursor) params.set('page', pageCursor) const res = await anthropicFetch( `/v1/sessions/${sessionId}/events?${params.toString()}`, apiKey, @@ -245,10 +247,8 @@ async function* listAllEvents( const page = (await res.json()) as EventsListPage const items = page.data || [] for (const ev of items) yield ev - if (!page.has_more) break - const lastId = page.last_id ?? items[items.length - 1]?.id - if (!lastId) break - cursor = lastId + if (!page.next_page) break + pageCursor = page.next_page } } @@ -419,6 +419,12 @@ Please follow your instructions to produce the funding outlook brief.` async start(controller) { let lastSendAt = Date.now() let lastEventIdSeen = initialLastEventId + // For resumes we replay the full session history via + // `listAllEvents` but mute every event up to and including + // `initialLastEventId` (the user has already seen those). Once + // we've cleared that cursor (or on a brand-new session where it + // is undefined), we start actually delivering events. + let pastInitialId = !initialLastEventId const writeJson = (obj: unknown) => { try { @@ -596,29 +602,40 @@ Please follow your instructions to produce the funding outlook brief.` break } - // Backfill anything we missed since `lastEventIdSeen`. - // Dedupes happen by event id; terminal events still flip - // `done` even if we've seen the id before. + // Backfill anything we missed. The events endpoint does + // NOT support `after_id`, so we always pull the full session + // history and skip everything up to and including + // `initialLastEventId` (the cursor handed to us by the + // previous segment via the resume payload). Within this + // segment, `seenEventIds` dedupes against the live stream. try { for await (const event of listAllEvents( sessionId, apiKey, - lastEventIdSeen, segmentAbort.signal, )) { if (segmenting) break if (event.id) { - if (!seenEventIds.has(event.id)) { + if (!pastInitialId) { + // Already delivered to the user in a previous + // segment — mark seen so the live stream doesn't + // re-emit it, but do NOT call handle(). + seenEventIds.add(event.id) + lastEventIdSeen = event.id + if (event.id === initialLastEventId) { + pastInitialId = true + } + } else if (!seenEventIds.has(event.id)) { seenEventIds.add(event.id) lastEventIdSeen = event.id handle(event) } else { - // Already handled in a previous iteration — but - // keep the cursor moving forward. + // Already handled earlier in this segment — keep + // the cursor moving forward. lastEventIdSeen = event.id } } - if (isTerminal(event)) { + if (isTerminal(event) && pastInitialId) { done = true break } @@ -634,6 +651,14 @@ Please follow your instructions to produce the funding outlook brief.` } } + // If the cursor handed to us by the previous segment didn't + // appear in the session's history (stale / corrupted / + // history truncated), don't get stuck muting events forever + // — start delivering whatever shows up next. + if (!pastInitialId) { + pastInitialId = true + } + if (done || segmenting) { // Drain the live-stream body we opened above so we don't // leak the connection.