Streaming updates

This commit is contained in:
2026-05-12 22:29:25 -04:00
parent d1fd7fbe61
commit 1a93c9246f
+69 -15
View File
@@ -139,27 +139,65 @@ Please follow your instructions to produce the funding outlook brief.`
)
}
// ----- Pipe agent.message text blocks to the client -----
// ----- Pipe agent activity to the client -----
// The agent emits multiple event types. We surface:
// - agent.message text blocks (narration)
// - agent.tool_use for the `write` tool, whose `input.content` IS the
// final brief the agent produces (it writes it as a Markdown file in
// the sandbox instead of streaming it back)
// - lightweight status lines for other tool calls so the UI keeps moving
const encoder = new TextEncoder()
const seenEventIds = new Set()
const writtenFiles = new Set() // dedupe writes by path
const stream = new ReadableStream({
async start(controller) {
const send = (s) => controller.enqueue(encoder.encode(s))
try {
for await (const event of upstream) {
if (event.id) {
if (seenEventIds.has(event.id)) {
// Still need to evaluate terminal events even if seen.
} else {
seenEventIds.add(event.id)
if (event.type === 'agent.message' && Array.isArray(event.content)) {
for (const block of event.content) {
if (block?.type === 'text' && block.text) {
controller.enqueue(encoder.encode(block.text))
if (event.id && !seenEventIds.has(event.id)) {
seenEventIds.add(event.id)
switch (event.type) {
case 'agent.message': {
if (Array.isArray(event.content)) {
for (const block of event.content) {
if (block?.type === 'text' && block.text) {
send(block.text + '\n\n')
}
}
}
} else if (event.type === 'session.error') {
break
}
case 'agent.tool_use': {
// The write tool carries the actual brief in input.content.
if (
event.name === 'write' &&
typeof event.input?.content === 'string'
) {
const path = event.input.file_path || event.input.path || ''
if (!writtenFiles.has(path)) {
writtenFiles.add(path)
send('\n\n---\n\n' + event.input.content + '\n\n')
}
} else {
const label = describeToolUse(event)
if (label) send(`\n\n_${label}_\n\n`)
}
break
}
case 'agent.mcp_tool_use': {
const label = describeToolUse(event)
if (label) send(`\n\n_${label}_\n\n`)
break
}
case 'session.error': {
const msg = event.error?.message || 'unknown session error'
controller.enqueue(encoder.encode(`\n\n[session error: ${msg}]`))
send(`\n\n[session error: ${msg}]`)
break
}
}
}
@@ -175,9 +213,7 @@ Please follow your instructions to produce the funding outlook brief.`
}
controller.close()
} catch (err) {
controller.enqueue(
encoder.encode(`\n\n[stream error: ${err.message || err}]`),
)
send(`\n\n[stream error: ${err.message || err}]`)
controller.close()
}
},
@@ -191,6 +227,24 @@ Please follow your instructions to produce the funding outlook brief.`
},
})
function describeToolUse(event) {
const name = event.name || 'tool'
const input = event.input || {}
if (name === 'web_search' && input.query) return `🔍 Searching: ${input.query}`
if (name === 'web_fetch' && input.url) return `🌐 Fetching: ${input.url}`
if (event.type === 'agent.mcp_tool_use') {
const args = Object.entries(input)
.filter(([k]) => k !== 'limit')
.slice(0, 3)
.map(([k, v]) =>
typeof v === 'object' ? `${k}=…` : `${k}=${String(v).slice(0, 40)}`,
)
.join(', ')
return `🛠️ ${name}${args ? ` (${args})` : ''}`
}
return `🛠️ ${name}`
}
return new Response(stream, {
status: 200,
headers: {