import Redis from "ioredis"; const PREFIX = "pc-threads:"; const THREAD_TTL = 86400; // 24 hours const ATTACHMENT_TTL = 604800; // 7 days const LOG_LINES_MAX = 10; export interface ThreadData { channel: "code"; threadId: string; subject: string; from: string; account: string; status: "processing" | "queued" | "replied" | "no_reply" | "failed" | "stalled" | "skipped" | "archived"; startedAt: number; processingAt: number | null; endedAt: number | null; detail: string; sessionFile: string | null; linearIssue: string | null; linearTitle: string | null; linearUrl: string | null; tokIn: number | null; tokOut: number | null; cost: number | null; } export interface ThreadAttachment { threadId: string; linearIssue: string; ts: string; } let redis: Redis | null = null; export function getRedis(): Redis { if (!redis) { redis = new Redis({ port: 6399, lazyConnect: false, maxRetriesPerRequest: 3 }); redis.on("error", (err) => console.error("[redis]", err.message)); } return redis; } export async function closeRedis(): Promise { if (redis) { await redis.quit(); redis = null; } } // --- Thread operations --- function threadKey(threadId: string): string { return `${PREFIX}thread:${threadId}`; } function logLinesKey(threadId: string): string { return `${PREFIX}loglines:${threadId}`; } const INDEX_KEY = `${PREFIX}threads-index`; const DELETED_TTL = 3600; function deletedKey(threadId: string): string { return `${PREFIX}deleted:${threadId}`; } export async function upsertThread( data: Partial & { threadId: string } ): Promise { const r = getRedis(); const key = threadKey(data.threadId); const isDeleted = await r.exists(deletedKey(data.threadId)); if (isDeleted) return; const fields: Record = {}; for (const [k, v] of Object.entries(data)) { fields[k] = v === null || v === undefined ? "" : String(v); } const pipeline = r.pipeline(); pipeline.hset(key, fields); pipeline.expire(key, THREAD_TTL); if (data.startedAt !== undefined) { pipeline.zadd(INDEX_KEY, data.startedAt, data.threadId); } await pipeline.exec(); } export async function deleteThread(threadId: string): Promise { const r = getRedis(); const pipeline = r.pipeline(); pipeline.del(threadKey(threadId)); pipeline.del(logLinesKey(threadId)); pipeline.zrem(INDEX_KEY, threadId); pipeline.set(deletedKey(threadId), "1", "EX", DELETED_TTL); await pipeline.exec(); } export async function appendLogLine(threadId: string, line: string): Promise { const r = getRedis(); const key = logLinesKey(threadId); const pipeline = r.pipeline(); pipeline.rpush(key, line); pipeline.ltrim(key, -LOG_LINES_MAX, -1); pipeline.expire(key, THREAD_TTL); await pipeline.exec(); } export async function getThread(threadId: string): Promise { const r = getRedis(); const raw = await r.hgetall(threadKey(threadId)); if (!raw || !raw.threadId) return null; return deserializeThread(raw); } export async function getAllThreads(since?: number): Promise { const r = getRedis(); const minScore = since ? String(since) : "-inf"; const members = await r.zrangebyscore(INDEX_KEY, minScore, "+inf"); if (members.length === 0) return []; const pipeline = r.pipeline(); for (const tid of members) pipeline.hgetall(threadKey(tid)); const results = await pipeline.exec(); const threads: ThreadData[] = []; const stale: string[] = []; for (let i = 0; i < results!.length; i++) { const [err, raw] = results![i]; if (err || !raw || !(raw as Record).threadId) { stale.push(members[i]); continue; } threads.push(deserializeThread(raw as Record)); } if (stale.length > 0) { const p2 = r.pipeline(); for (const m of stale) p2.zrem(INDEX_KEY, m); p2.exec(); } return threads; } export async function getLogLines(threadId: string): Promise { return getRedis().lrange(logLinesKey(threadId), 0, -1); } function deserializeThread(raw: Record): ThreadData { return { channel: "code", threadId: raw.threadId, subject: raw.subject || "", from: raw.from || "", account: raw.account || "", status: (raw.status || "processing") as ThreadData["status"], startedAt: Number(raw.startedAt) || 0, processingAt: raw.processingAt ? Number(raw.processingAt) : null, endedAt: raw.endedAt ? Number(raw.endedAt) : null, detail: raw.detail || "", sessionFile: raw.sessionFile || null, linearIssue: raw.linearIssue || null, linearTitle: raw.linearTitle || null, linearUrl: raw.linearUrl || null, tokIn: raw.tokIn ? Number(raw.tokIn) : null, tokOut: raw.tokOut ? Number(raw.tokOut) : null, cost: raw.cost ? Number(raw.cost) : null, }; } // --- Attachment operations --- export async function setAttachment(threadId: string, linearIssue: string): Promise { const r = getRedis(); const key = `${PREFIX}attachment:${threadId}`; const pipeline = r.pipeline(); pipeline.hset(key, { linearIssue, ts: new Date().toISOString() }); pipeline.expire(key, ATTACHMENT_TTL); await pipeline.exec(); } export async function getAttachment(threadId: string): Promise { const r = getRedis(); const raw = await r.hgetall(`${PREFIX}attachment:${threadId}`); if (!raw || !raw.linearIssue) return null; return { threadId, linearIssue: raw.linearIssue, ts: raw.ts || "" }; } export async function getAllAttachments(): Promise { const r = getRedis(); const keys = await r.keys(`${PREFIX}attachment:*`); if (keys.length === 0) return []; const pipeline = r.pipeline(); for (const k of keys) pipeline.hgetall(k); const results = await pipeline.exec(); const out: ThreadAttachment[] = []; for (let i = 0; i < keys.length; i++) { const [err, raw] = results![i]; if (err || !raw) continue; const h = raw as Record; const tid = keys[i].replace(`${PREFIX}attachment:`, ""); if (h.linearIssue) out.push({ threadId: tid, linearIssue: h.linearIssue, ts: h.ts || "" }); } return out; }