fix(agent-proxy): events.list uses opaque page cursor, not after_id
The Managed Agents events endpoint (`GET /v1/sessions/{id}/events`)
does NOT support filtering by event id. It returns an opaque
`next_page` cursor on each response and accepts it back via the
`page` query parameter; an `after_id=` filter returns 400 Bad
Request, which caused every segment resume to fail backfill (visible
as `{"type":"status","kind":"session_error","message":"Backfill
failed: events.list returned 400 Bad Request"}`).
Caught during testing of commit 8e44de5: resuming from a segment
boundary always returned 400 and the brief silently lost events from
the previous segment.
Changes:
- `listAllEvents` now paginates via `page` / `next_page` and pulls
the full session history (limit=1000). The Anthropic API has no
per-id filter, so the caller is responsible for skipping events
already delivered.
- New `pastInitialId` flag at the top of the body loop: on resume,
mute every event up to and including `initialLastEventId`
(still adding them to `seenEventIds` so the live stream doesn't
re-emit them), then start delivering. On a brand-new session the
flag starts true and is a no-op.
- Safety fallback: if backfill completes without ever seeing
`initialLastEventId` (stale cursor / truncated history), flip
the flag to true so we don't get stuck muting forever — the live
stream will start delivering whatever shows up next.
Co-Authored-By: alex <alex@semipublic.co>
This commit is contained in:
@@ -93,8 +93,7 @@ type AgentEvent = {
|
|||||||
|
|
||||||
type EventsListPage = {
|
type EventsListPage = {
|
||||||
data?: AgentEvent[]
|
data?: AgentEvent[]
|
||||||
has_more?: boolean
|
next_page?: string
|
||||||
last_id?: string
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type NewSessionPayload = {
|
type NewSessionPayload = {
|
||||||
@@ -221,17 +220,20 @@ async function* parseSse(
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Page through `GET /v1/sessions/{id}/events` to backfill anything
|
// Page through `GET /v1/sessions/{id}/events` to backfill anything
|
||||||
// we missed. Accepts an `after` id so we only fetch new events.
|
// we missed. The Managed Agents endpoint uses an opaque `page` cursor
|
||||||
|
// returned as `next_page` on each response; there is no `after_id`
|
||||||
|
// filter, so we always pull the full history and the caller is
|
||||||
|
// responsible for skipping events it has already handled (via
|
||||||
|
// `seenEventIds` and the resume-payload `lastEventId`).
|
||||||
async function* listAllEvents(
|
async function* listAllEvents(
|
||||||
sessionId: string,
|
sessionId: string,
|
||||||
apiKey: string,
|
apiKey: string,
|
||||||
after: string | undefined,
|
|
||||||
signal?: AbortSignal,
|
signal?: AbortSignal,
|
||||||
): AsyncGenerator<AgentEvent> {
|
): AsyncGenerator<AgentEvent> {
|
||||||
let cursor = after
|
let pageCursor: string | undefined
|
||||||
for (;;) {
|
for (;;) {
|
||||||
const params = new URLSearchParams({ order: 'asc', limit: '100' })
|
const params = new URLSearchParams({ order: 'asc', limit: '1000' })
|
||||||
if (cursor) params.set('after_id', cursor)
|
if (pageCursor) params.set('page', pageCursor)
|
||||||
const res = await anthropicFetch(
|
const res = await anthropicFetch(
|
||||||
`/v1/sessions/${sessionId}/events?${params.toString()}`,
|
`/v1/sessions/${sessionId}/events?${params.toString()}`,
|
||||||
apiKey,
|
apiKey,
|
||||||
@@ -245,10 +247,8 @@ async function* listAllEvents(
|
|||||||
const page = (await res.json()) as EventsListPage
|
const page = (await res.json()) as EventsListPage
|
||||||
const items = page.data || []
|
const items = page.data || []
|
||||||
for (const ev of items) yield ev
|
for (const ev of items) yield ev
|
||||||
if (!page.has_more) break
|
if (!page.next_page) break
|
||||||
const lastId = page.last_id ?? items[items.length - 1]?.id
|
pageCursor = page.next_page
|
||||||
if (!lastId) break
|
|
||||||
cursor = lastId
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -419,6 +419,12 @@ Please follow your instructions to produce the funding outlook brief.`
|
|||||||
async start(controller) {
|
async start(controller) {
|
||||||
let lastSendAt = Date.now()
|
let lastSendAt = Date.now()
|
||||||
let lastEventIdSeen = initialLastEventId
|
let lastEventIdSeen = initialLastEventId
|
||||||
|
// For resumes we replay the full session history via
|
||||||
|
// `listAllEvents` but mute every event up to and including
|
||||||
|
// `initialLastEventId` (the user has already seen those). Once
|
||||||
|
// we've cleared that cursor (or on a brand-new session where it
|
||||||
|
// is undefined), we start actually delivering events.
|
||||||
|
let pastInitialId = !initialLastEventId
|
||||||
|
|
||||||
const writeJson = (obj: unknown) => {
|
const writeJson = (obj: unknown) => {
|
||||||
try {
|
try {
|
||||||
@@ -596,29 +602,40 @@ Please follow your instructions to produce the funding outlook brief.`
|
|||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
// Backfill anything we missed since `lastEventIdSeen`.
|
// Backfill anything we missed. The events endpoint does
|
||||||
// Dedupes happen by event id; terminal events still flip
|
// NOT support `after_id`, so we always pull the full session
|
||||||
// `done` even if we've seen the id before.
|
// history and skip everything up to and including
|
||||||
|
// `initialLastEventId` (the cursor handed to us by the
|
||||||
|
// previous segment via the resume payload). Within this
|
||||||
|
// segment, `seenEventIds` dedupes against the live stream.
|
||||||
try {
|
try {
|
||||||
for await (const event of listAllEvents(
|
for await (const event of listAllEvents(
|
||||||
sessionId,
|
sessionId,
|
||||||
apiKey,
|
apiKey,
|
||||||
lastEventIdSeen,
|
|
||||||
segmentAbort.signal,
|
segmentAbort.signal,
|
||||||
)) {
|
)) {
|
||||||
if (segmenting) break
|
if (segmenting) break
|
||||||
if (event.id) {
|
if (event.id) {
|
||||||
if (!seenEventIds.has(event.id)) {
|
if (!pastInitialId) {
|
||||||
|
// Already delivered to the user in a previous
|
||||||
|
// segment — mark seen so the live stream doesn't
|
||||||
|
// re-emit it, but do NOT call handle().
|
||||||
|
seenEventIds.add(event.id)
|
||||||
|
lastEventIdSeen = event.id
|
||||||
|
if (event.id === initialLastEventId) {
|
||||||
|
pastInitialId = true
|
||||||
|
}
|
||||||
|
} else if (!seenEventIds.has(event.id)) {
|
||||||
seenEventIds.add(event.id)
|
seenEventIds.add(event.id)
|
||||||
lastEventIdSeen = event.id
|
lastEventIdSeen = event.id
|
||||||
handle(event)
|
handle(event)
|
||||||
} else {
|
} else {
|
||||||
// Already handled in a previous iteration — but
|
// Already handled earlier in this segment — keep
|
||||||
// keep the cursor moving forward.
|
// the cursor moving forward.
|
||||||
lastEventIdSeen = event.id
|
lastEventIdSeen = event.id
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (isTerminal(event)) {
|
if (isTerminal(event) && pastInitialId) {
|
||||||
done = true
|
done = true
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
@@ -634,6 +651,14 @@ Please follow your instructions to produce the funding outlook brief.`
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// If the cursor handed to us by the previous segment didn't
|
||||||
|
// appear in the session's history (stale / corrupted /
|
||||||
|
// history truncated), don't get stuck muting events forever
|
||||||
|
// — start delivering whatever shows up next.
|
||||||
|
if (!pastInitialId) {
|
||||||
|
pastInitialId = true
|
||||||
|
}
|
||||||
|
|
||||||
if (done || segmenting) {
|
if (done || segmenting) {
|
||||||
// Drain the live-stream body we opened above so we don't
|
// Drain the live-stream body we opened above so we don't
|
||||||
// leak the connection.
|
// leak the connection.
|
||||||
|
|||||||
Reference in New Issue
Block a user