import { spawn, ChildProcess } from "child_process"; import { EventEmitter } from "events"; import * as readline from "readline"; import * as path from "path"; import * as fs from "fs"; import { logUsage, extractUsageFromResult } from "@palace/sdk/usage"; const CWD = process.env.REPO_ROOT || path.resolve(process.cwd(), ".."); const CONV_LOG_DIR = path.resolve(CWD, "code/palaceplatform/channels/logs/sessions"); if (!fs.existsSync(CONV_LOG_DIR)) fs.mkdirSync(CONV_LOG_DIR, { recursive: true }); function pdt(): string { return new Date().toLocaleString("en-US", { timeZone: "America/Los_Angeles", year: "numeric", month: "2-digit", day: "2-digit", hour: "2-digit", minute: "2-digit", second: "2-digit", hour12: false, }); } export interface SessionUsage { duration_ms: number; total_cost_usd: number; input_tokens: number; output_tokens: number; cache_read_input_tokens: number; cache_creation_input_tokens: number; num_turns: number; } export class ClaudeBridge extends EventEmitter { private proc: ChildProcess | null = null; private rl: readline.Interface | null = null; private busy = false; private responseBuffer = ""; private _sessionId: string | null = null; private generation = 0; private isResume = false; private stateFile: string | null = null; private queue: Array<{ content: Array>; resolve: (value: string) => void; reject: (reason: Error) => void; }> = []; private ready = false; private lastUsage: SessionUsage | null = null; private sendStart: number = 0; private convLogPath: string | null = null; model: string | null = null; channel: string | null = null; domain: string | null = null; maxTurns: number = 1000; constructor(resumeSessionId?: string) { super(); if (resumeSessionId) this._sessionId = resumeSessionId; } setStateFile(p: string): void { this.stateFile = p; if (!this._sessionId && fs.existsSync(p)) { const id = fs.readFileSync(p, "utf8").trim(); if (id) this._sessionId = id; } } reset(): void { this.kill(); this._sessionId = null; if (this.stateFile && fs.existsSync(this.stateFile)) { try { fs.unlinkSync(this.stateFile); } catch {} } } start(): void { if (this.proc) this.kill(); const gen = ++this.generation; this.ready = false; if (!this._sessionId && this.stateFile && fs.existsSync(this.stateFile)) { const id = fs.readFileSync(this.stateFile, "utf8").trim(); if (id) this._sessionId = id; } this.isResume = !!this._sessionId; const args = [ "-p", "--output-format", "stream-json", "--input-format", "stream-json", "--dangerously-skip-permissions", "--max-turns", String(this.maxTurns), "--chrome", "--verbose", ]; if (this._sessionId) { args.push("--resume", this._sessionId); } if (this.model) { args.push("--model", this.model); } this.proc = spawn( "claude", args, { cwd: CWD, stdio: ["pipe", "pipe", "pipe"], env: { ...process.env, CLAUDECODE: "", CLAUDE_AUTOCOMPACT_PCT_OVERRIDE: "95", ACE_HAS_MEMORY: this._sessionId ? "1" : "", }, } ); this.rl = readline.createInterface({ input: this.proc.stdout! }); this.rl.on("line", (line) => this.handleLine(line)); this.proc.stderr?.on("data", (data) => { const msg = data.toString().trim(); if (msg) console.error("[claude stderr]", msg); }); this.proc.on("exit", (code, signal) => { if (gen !== this.generation) return; console.log( `[claude] process exited with code ${code}${signal ? ` (signal: ${signal})` : ""}` ); this.proc = null; this.rl = null; this.busy = false; this.ready = false; this.emit("exit", code); }); this.proc.on("error", (err) => { if (gen !== this.generation) return; console.error("[claude] process error:", err); this.proc = null; this.rl = null; this.busy = false; this.ready = false; this.emit("error", err); }); console.log("[claude] subprocess started"); } kill(): void { if (this.proc) { this.generation++; this.proc.kill("SIGTERM"); this.proc = null; this.rl = null; this.responseBuffer = ""; console.log("[claude] subprocess killed"); const wasBusy = this.busy; this.busy = false; this.ready = false; if (wasBusy) this.emit("exit", null); this.drainQueue(new Error("claude process killed")); } } get sessionId(): string | null { return this._sessionId; } async send(text: string): Promise { return this.sendContent([{ type: "text", text }]); } async sendContent(content: Array>): Promise { return new Promise((resolve, reject) => { this.queue.push({ content, resolve, reject }); if (!this.busy) this.processQueue(); }); } private async processQueue(): Promise { if (this.busy) return; const item = this.queue.shift(); if (!item) return; if (!this.proc) { this.start(); } if (!this.proc) { item.reject(new Error("claude process failed to start")); this.processQueue(); return; } this.busy = true; this.responseBuffer = ""; const onResult = () => { this.busy = false; cleanup(); item.resolve(this.responseBuffer.trim()); this.processQueue(); }; const onError = (err: Error) => { this.busy = false; cleanup(); item.reject(err); this.processQueue(); }; const onExit = (code: number | null) => { this.busy = false; cleanup(); const buf = this.responseBuffer.trim(); if (code && code !== 0) { item.reject(new Error(buf || `claude process exited with code ${code}`)); } else if (buf) { item.resolve(buf); } else { item.reject(new Error(`claude process exited with code ${code}`)); } this.drainQueue(new Error("claude process exited unexpectedly")); }; let responseTimeout: ReturnType; const cleanup = () => { clearTimeout(responseTimeout); this.removeListener("result", onResult); this.removeListener("error", onError); this.removeListener("exit", onExit); }; this.once("result", onResult); this.once("error", onError); this.once("exit", onExit); responseTimeout = setTimeout(() => { console.error("[claude] response timeout (30min), killing process"); this.kill(); }, 30 * 60 * 1000); const msg = JSON.stringify({ type: "user", message: { role: "user", content: item.content, }, }); try { console.log("[claude] writing to stdin:", msg.length, "bytes"); this.proc.stdin!.write(msg + "\n"); if (this.convLogPath) { const ts = pdt(); const text = item.content.filter((c) => c.type === "text").map((c) => c.text).join("\n"); fs.appendFileSync(this.convLogPath, `[${ts}] USER\n${text}\n${"─".repeat(60)}\n`); } } catch (err) { this.busy = false; cleanup(); item.reject(err instanceof Error ? err : new Error(String(err))); this.processQueue(); } } private drainQueue(err: Error): void { while (this.queue.length > 0) { const item = this.queue.shift()!; item.reject(err); } } get isRunning(): boolean { return this.proc !== null; } get isBusy(): boolean { return this.busy; } private buildBootMemory(): string { const MEM = path.join(CWD, "palaces/manglasabang/secretariat/memory"); let out = ""; const identityFiles = ["JUNWON.md", "ACE.md", "TOOLS.md", "GUARDRAILS.md", "TASK-MANAGEMENT.md", "core-memories.md"]; for (const f of identityFiles) { const fp = path.join(MEM, "identity", f); if (fs.existsSync(fp)) { out += `\n${"═".repeat(60)}\n${f}\n${"═".repeat(60)}\n${fs.readFileSync(fp, "utf8")}`; } } for (const dir of ["last-one-week", "last-one-month", "last-one-year", "each-past-year"]) { const d = path.join(MEM, dir); if (!fs.existsSync(d)) continue; for (const f of fs.readdirSync(d).sort()) { if (!f.endsWith(".md")) continue; out += `\n${"═".repeat(60)}\n${dir}/${f}\n${"═".repeat(60)}\n${fs.readFileSync(path.join(d, f), "utf8")}`; } } return out; } private handleLine(line: string): void { if (!line.trim()) return; let msg: any; try { msg = JSON.parse(line); } catch { console.error("[claude] non-json:", line.slice(0, 200)); return; } console.log("[claude] event:", msg.type, msg.subtype || ""); switch (msg.type) { case "system": if (msg.subtype === "init" && msg.session_id) { const alreadyInSession = !!this._sessionId && this._sessionId === msg.session_id; this._sessionId = msg.session_id; this.ready = true; console.log("[claude] session:", this._sessionId); if (!this.isResume && !alreadyInSession && this.stateFile) { try { fs.writeFileSync(this.stateFile, this._sessionId); } catch {} } const prefix = this.channel || "claude"; this.convLogPath = path.join(CONV_LOG_DIR, `${prefix}-${this._sessionId}.log`); const transcriptPath = path.join(process.env.HOME || "", `.claude/projects/-Users-ace-palacering/${this._sessionId}.jsonl`); let header = `[${pdt()}] SESSION START\ntranscript: ${transcriptPath}\nmodel: ${msg.model || "unknown"}\n`; if (this.isResume || alreadyInSession) { header += "(resumed — memory already loaded)\n"; } else { header += "\nBOOT MEMORY:" + this.buildBootMemory() + "\n"; } fs.appendFileSync(this.convLogPath, header + "─".repeat(60) + "\n"); this.emit("ready"); this.emit("sessionReady", this._sessionId, this.convLogPath); } break; case "assistant": if (msg.message?.content) { for (const block of msg.message.content) { if (block.type === "text") { this.responseBuffer += block.text; this.emit("text", block.text); if (this.convLogPath && block.text.trim()) { fs.appendFileSync(this.convLogPath, `[${pdt()}] TEXT\n${block.text.trim()}\n${"─".repeat(60)}\n`); } } else if (block.type === "tool_use" && this.convLogPath) { const inputStr = JSON.stringify(block.input || {}, null, 2); fs.appendFileSync(this.convLogPath, `[${pdt()}] TOOL CALL: ${block.name}\n${inputStr}\n${"─".repeat(60)}\n`); } } } break; case "user": if (msg.message?.content && this.convLogPath) { for (const block of msg.message.content) { if (block.type === "tool_result") { const raw = Array.isArray(block.content) ? block.content.filter((c: any) => c.type === "text").map((c: any) => c.text).join("\n") : typeof block.content === "string" ? block.content : ""; fs.appendFileSync(this.convLogPath, `[${pdt()}] TOOL RESULT\n${raw}\n${"─".repeat(60)}\n`); } } } break; case "result": this.lastUsage = extractUsageFromResult(msg); console.log("[claude] result received, buffer:", this.responseBuffer.length, "chars"); if (this.convLogPath) { const u = this.lastUsage; const tokIn = u.input_tokens + u.cache_read_input_tokens + u.cache_creation_input_tokens; fs.appendFileSync(this.convLogPath, `[${pdt()}] RESULT | turns: ${u.num_turns} | cost: $${u.total_cost_usd.toFixed(4)} | ${tokIn}in / ${u.output_tokens}out\n${"─".repeat(60)}\n`); } this.logSession(); this.emit("result"); break; } } private logSession(): void { if (!this.lastUsage) return; logUsage({ ...this.lastUsage, channel: this.channel || "unknown", model: this.model || "sonnet", domain: this.domain || undefined, }); const tokIn = this.lastUsage.input_tokens + this.lastUsage.cache_read_input_tokens + this.lastUsage.cache_creation_input_tokens; console.log(`[claude] usage logged: $${this.lastUsage.total_cost_usd.toFixed(4)}, ${tokIn}in/${this.lastUsage.output_tokens}out`); } get usage(): SessionUsage | null { return this.lastUsage; } }