import { join } from "node:path"; import { ClaudeBridge } from "@channels/slack/bridge"; import { upsertThread } from "./thread-store"; const ROOT = process.env.REPO_ROOT!; const STATE_DIR = join(ROOT, "code/palaceplatform/channels/logs/sessions"); export const MAX_ACTIVE = 3; export const bridges = new Map(); const idleTimers = new Map>(); const IDLE_MS = 15 * 60 * 1000; const WATCHDOG_INTERVAL_MS = 30_000; const activeSet = new Set(); const watchdog = setInterval(() => { for (const tid of activeSet) { const bridge = bridges.get(tid); if (!bridge || !bridge.isRunning) { console.log(`[watchdog] dead process detected for ${tid}, cleaning up`); activeSet.delete(tid); if (bridge) { bridge.kill(); bridges.delete(tid); } const timer = idleTimers.get(tid); if (timer) { clearTimeout(timer); idleTimers.delete(tid); } upsertThread({ threadId: tid, status: "failed", endedAt: Date.now(), detail: "process died without emitting result", }).catch(() => {}); drainQueue(); } } }, WATCHDOG_INTERVAL_MS); interface QueueItem { tid: string; message: string; onSlotOpen: (bridge: ClaudeBridge) => void; } const queue: QueueItem[] = []; export function resetIdle(tid: string): void { const existing = idleTimers.get(tid); if (existing) clearTimeout(existing); idleTimers.set( tid, setTimeout(() => { const b = bridges.get(tid); if (b) { console.log(`[web-chat] idle cleanup: ${tid}`); b.kill(); bridges.delete(tid); } idleTimers.delete(tid); }, IDLE_MS) ); } export function getOrCreateBridge(tid: string): { bridge: ClaudeBridge; isNew: boolean } { const existing = bridges.get(tid); if (existing) return { bridge: existing, isNew: false }; const bridge = new ClaudeBridge(); bridge.channel = "palacecode"; bridge.model = "sonnet"; bridge.maxTurns = 1000; bridge.setStateFile(join(STATE_DIR, `.web-thread-${tid}`)); bridges.set(tid, bridge); return { bridge, isNew: true }; } export function throttledSend( tid: string, message: string, onSlotOpen: (bridge: ClaudeBridge) => void, ): boolean { if (activeSet.size < MAX_ACTIVE) { activeSet.add(tid); const { bridge } = getOrCreateBridge(tid); onSlotOpen(bridge); bridge.send(message).catch(() => {}); return true; } queue.push({ tid, message, onSlotOpen }); return false; } export function releaseSlot(tid: string): void { activeSet.delete(tid); drainQueue(); } function drainQueue(): void { while (queue.length > 0 && activeSet.size < MAX_ACTIVE) { const next = queue.shift()!; activeSet.add(next.tid); const { bridge } = getOrCreateBridge(next.tid); next.onSlotOpen(bridge); bridge.send(next.message).catch(() => {}); } } export function getQueuePosition(tid: string): number { if (activeSet.has(tid)) return 0; const idx = queue.findIndex((q) => q.tid === tid); return idx >= 0 ? idx + 1 : -1; } export function activeCount(): number { return activeSet.size; } export function queueLength(): number { return queue.length; }