import type { APIRoute } from "astro"; import { resetIdle, throttledSend, releaseSlot, getQueuePosition } from "../../lib/web-bridges"; import { upsertThread, appendLogLine, getThread } from "../../lib/thread-store"; import { notifyThreadDone } from "../../lib/push"; export const POST: APIRoute = async ({ request }) => { const headers = { "Content-Type": "application/json" }; let body: any; try { body = await request.json(); } catch { return new Response(JSON.stringify({ error: "invalid json" }), { status: 400, headers }); } const message = (body.message || "").trim(); if (!message) { return new Response(JSON.stringify({ error: "message required" }), { status: 400, headers }); } const tid = body.threadId || `code-${Date.now()}`; const rawTitle = (body.title || "").trim(); const subject = rawTitle || (message.length > 60 ? message.slice(0, 57) + "..." : message); const existing = await getThread(tid); if (existing) { await upsertThread({ threadId: tid, status: "queued" }); } else { await upsertThread({ channel: "code", threadId: tid, subject, from: "code", account: "", status: "queued", startedAt: Date.now(), endedAt: null, detail: "", }); } await appendLogLine(tid, `from code: ${subject}`); const stream = new ReadableStream({ start(controller) { const encoder = new TextEncoder(); let closed = false; function send(event: string, data: string) { if (closed) return; try { controller.enqueue(encoder.encode(`event: ${event}\ndata: ${data}\n\n`)); } catch { closed = true; } } function ping() { if (closed) return; try { controller.enqueue(encoder.encode(`: keepalive\n\n`)); } catch { closed = true; } } const keepalive = setInterval(ping, 5000); function finish() { clearInterval(keepalive); if (!closed) { try { controller.close(); } catch {} } closed = true; } send("thread", JSON.stringify({ threadId: tid })); const started = throttledSend(tid, message, (bridge) => { send("started", JSON.stringify({ threadId: tid })); upsertThread({ threadId: tid, status: "processing", processingAt: Date.now() }).catch(() => {}); const onSessionReady = (sessionId: string) => { upsertThread({ threadId: tid, sessionFile: `palacecode-${sessionId}.log`, }).catch(() => {}); }; bridge.once("sessionReady", onSessionReady); const onText = (chunk: string) => send("text", JSON.stringify({ text: chunk })); const cleanup = () => { bridge.removeListener("text", onText); bridge.removeListener("result", onResult); bridge.removeListener("error", onError); bridge.removeListener("exit", onExit); bridge.removeListener("sessionReady", onSessionReady); }; const onResult = () => { cleanup(); releaseSlot(tid); const full = (bridge as any).responseBuffer?.trim() || ""; send("done", JSON.stringify({ threadId: tid })); finish(); const u = bridge.usage; const tokIn = u ? u.input_tokens + (u as any).cache_read_input_tokens + (u as any).cache_creation_input_tokens : null; const tokOut = u ? u.output_tokens : null; const cost = u ? u.total_cost_usd : null; upsertThread({ threadId: tid, status: "replied", endedAt: Date.now(), ...(bridge.sessionId ? { sessionFile: `palacecode-${bridge.sessionId}.log` } : {}), ...(tokIn !== null ? { tokIn, tokOut, cost } : {}), } as any).catch(() => {}); appendLogLine(tid, full.length > 200 ? full.slice(0, 197) + "..." : full).catch(() => {}); notifyThreadDone(subject).catch(() => {}); resetIdle(tid); }; const onError = (err: Error) => { cleanup(); releaseSlot(tid); send("error", JSON.stringify({ error: err.message })); finish(); upsertThread({ threadId: tid, status: "failed", endedAt: Date.now(), detail: err.message, }).catch(() => {}); }; const onExit = (code: number | null) => { if (closed) return; cleanup(); releaseSlot(tid); if (code && code !== 0) { send("error", JSON.stringify({ error: `Process exited with code ${code}` })); } else { send("done", JSON.stringify({ threadId: tid })); } finish(); }; bridge.on("text", onText); bridge.once("result", onResult); bridge.once("error", onError); bridge.once("exit", onExit); }); if (!started) { const pos = getQueuePosition(tid); send("queued", JSON.stringify({ threadId: tid, position: pos })); } }, }); return new Response(stream, { headers: { "Content-Type": "text/event-stream", "Cache-Control": "no-cache", "Connection": "keep-alive", }, }); };