import { ImapFlow } from "imapflow"; import { createTransport } from "nodemailer"; import { simpleParser, ParsedMail } from "mailparser"; import { marked } from "marked"; import { ClaudeBridge } from "../slack/bridge"; import { wrapHtml } from "./send"; import { AUDIO_TYPES, transcribeBuffer } from "../voice/transcribe"; import { synthesize } from "../voice/speak"; import * as fs from "fs"; import * as pathLib from "path"; const _log = console.log.bind(console); const _err = console.error.bind(console); const pdt = () => new Date().toLocaleString("en-US", { timeZone: "America/Los_Angeles", year: "numeric", month: "2-digit", day: "2-digit", hour: "2-digit", minute: "2-digit", second: "2-digit", hour12: false }); console.log = (...args: any[]) => _log(`[${pdt()}]`, ...args); console.error = (...args: any[]) => _err(`[${pdt()}]`, ...args); const pass = process.env.PURELYMAIL_PASS; if (!pass) { console.error("Missing PURELYMAIL_PASS"); process.exit(1); } const ACCOUNTS = [ "ace@manglasabang.com", "ace@palace.fund", "ace@palacering.com", ]; const ACCOUNT_SET = new Set(ACCOUNTS.map((a) => a.toLowerCase())); const JUNWON_EMAILS = new Set([ "junwon@manglasabang.com", "junwon@palace.fund", "junwon@palacering.com", ]); const FAMILY_DOMAINS = new Set(["palace.fund", "manglasabang.com", "palacering.com"]); const DOMAIN_FOLDER: Record = { "manglasabang.com": "manglasabang", "palace.fund": "palacefund", "palacering.com": "palacering", }; const FOLDERS = ["manglasabang", "palacefund", "palacering", "others"]; const smtpTransports = new Map( ACCOUNTS.map((user) => [ user, createTransport({ host: "mailserver.purelymail.com", port: 465, secure: true, auth: { user, pass }, }), ]) ); function extractAddress(from: ParsedMail["from"]): string { if (!from) return ""; const addr = from.value?.[0]?.address; return addr?.toLowerCase() || ""; } function extractBody(mail: ParsedMail): string { if (mail.text?.trim()) return mail.text.trim(); if (mail.html) return mail.html.replace(/<[^>]+>/g, " ").replace(/\s+/g, " ").trim(); return ""; } const IMAGE_TYPES = new Set(["image/jpeg", "image/png", "image/gif", "image/webp"]); const MIME_BY_EXT: Record = { ".png": "image/png", ".jpg": "image/jpeg", ".jpeg": "image/jpeg", ".gif": "image/gif", ".webp": "image/webp", ".pdf": "application/pdf", ".csv": "text/csv", ".txt": "text/plain", ".json": "application/json", ".html": "text/html", ".zip": "application/zip", }; interface FileAttachment { filename: string; content: Buffer; contentType: string; cid?: string; } const ATTACHABLE_EXTS = new Set(Object.keys(MIME_BY_EXT)); function extractAttachments( response: string ): { cleanText: string; attachments: FileAttachment[] } { const ATTACH_RE = /\[ATTACH:([^\]]+)\]/g; const attachments: FileAttachment[] = []; const attachedPaths = new Set(); const cleanText = response.replace(ATTACH_RE, (_, filePath: string) => { const resolved = pathLib.isAbsolute(filePath.trim()) ? filePath.trim() : pathLib.resolve(CWD, filePath.trim()); attachedPaths.add(resolved); try { const buf = fs.readFileSync(resolved); const ext = pathLib.extname(resolved).toLowerCase(); const contentType = MIME_BY_EXT[ext] || "application/octet-stream"; const filename = pathLib.basename(resolved); const isImage = contentType.startsWith("image/"); const cid = isImage ? `img-${attachments.length}@ace` : undefined; attachments.push({ filename, content: buf, contentType, cid }); console.log(`[email] attached: ${filename} (${buf.length} bytes, ${contentType})`); if (isImage) { return `\n![${filename}](cid:${cid})\n`; } return ""; } catch (err) { console.error(`[email] failed to attach ${resolved}:`, err); return `[attachment failed: ${pathLib.basename(resolved)}]`; } }); // Auto-detect file paths mentioned in response that weren't explicitly attached const PATH_RE = /(?:\/[\w.@-]+(?:\/[\w.@-]+)*\/[\w.@-]+\.\w+)/g; let match: RegExpExecArray | null; while ((match = PATH_RE.exec(response)) !== null) { const candidate = match[0]; if (attachedPaths.has(candidate)) continue; const ext = pathLib.extname(candidate).toLowerCase(); if (!ATTACHABLE_EXTS.has(ext)) continue; try { if (!fs.existsSync(candidate)) continue; const stat = fs.statSync(candidate); if (!stat.isFile() || stat.size > 25 * 1024 * 1024) continue; const buf = fs.readFileSync(candidate); const contentType = MIME_BY_EXT[ext] || "application/octet-stream"; const filename = pathLib.basename(candidate); const isImage = contentType.startsWith("image/"); const cid = isImage ? `img-${attachments.length}@ace` : undefined; attachments.push({ filename, content: buf, contentType, cid }); attachedPaths.add(candidate); console.log(`[email] auto-attached: ${filename} (${buf.length} bytes, ${contentType})`); } catch { // skip files we can't read } } return { cleanText, attachments }; } const CWD = pathLib.resolve(__dirname, "../.."); interface EmailRecord { uid: number; from: string; to: string; date: string; subject: string; body: string; messageId?: string; inReplyTo?: string; references?: string[]; } function normalizeSubject(subject: string): string { return (subject || "").replace(/^(Re:\s*|Fwd?:\s*)+/gi, "").trim(); } const messageIdToThreadId = new Map(); const THREAD_MAP_FILE = pathLib.resolve(__dirname, "thread-map.json"); function loadThreadMap(): void { try { if (fs.existsSync(THREAD_MAP_FILE)) { const data: Record = JSON.parse(fs.readFileSync(THREAD_MAP_FILE, "utf-8")); const cutoff = Date.now() - DEDUP_TTL_MS; for (const [msgId, entry] of Object.entries(data)) { if (entry.ts > cutoff) messageIdToThreadId.set(msgId, entry); } console.log(`[email] loaded ${messageIdToThreadId.size} thread mappings from disk`); } } catch {} } function saveThreadMap(): void { try { const obj: Record = {}; for (const [k, v] of messageIdToThreadId) obj[k] = v; fs.writeFileSync(THREAD_MAP_FILE, JSON.stringify(obj)); } catch {} } function getThreadId(mail: ParsedMail): string { const refs = mail.references; const refList: string[] = Array.isArray(refs) ? refs : typeof refs === "string" && refs.trim() ? refs.trim().split(/\s+/) : []; if (mail.inReplyTo) { const entry = messageIdToThreadId.get(mail.inReplyTo); if (entry) return entry.threadId; } for (const ref of refList) { const entry = messageIdToThreadId.get(ref); if (entry) return entry.threadId; } if (refList.length > 0) return refList[0]; const subject = normalizeSubject(mail.subject || ""); if (subject) return `subject:${subject}`; return mail.messageId || `thread-${Date.now()}-${Math.random()}`; } function registerMessageThread(mail: ParsedMail, threadId: string): void { if (mail.messageId) messageIdToThreadId.set(mail.messageId, { threadId, ts: Date.now() }); } function belongsToThread(email: EmailRecord, threadId: string): boolean { if (threadId.startsWith("subject:")) { return `subject:${normalizeSubject(email.subject)}` === threadId; } if (email.messageId === threadId) return true; if (email.inReplyTo === threadId) return true; if (email.references?.includes(threadId)) return true; return false; } async function fetchAllEmails(client: ImapFlow): Promise { const emails: EmailRecord[] = []; const since = new Date(); since.setDate(since.getDate() - 7); const uids = await client.search({ since }, { uid: true }); if (!uids || uids.length === 0) return emails; for (const uid of uids) { const msg = await client.fetchOne(String(uid), { source: true }, { uid: true }); if (msg?.source) { const mail = await simpleParser(msg.source); const from = extractAddress(mail.from); const to = mail.to; const toVal = (Array.isArray(to) ? to[0] : to)?.value?.[0]?.address || ""; const refs = mail.references; const refsArray = Array.isArray(refs) ? refs : typeof refs === "string" && refs.trim() ? refs.trim().split(/\s+/) : undefined; const record: EmailRecord = { uid, from, to: toVal, date: mail.date?.toISOString() || "", subject: mail.subject || "(no subject)", body: extractBody(mail), messageId: mail.messageId, inReplyTo: typeof mail.inReplyTo === "string" ? mail.inReplyTo : undefined, references: refsArray, }; emails.push(record); const tid = getThreadId(mail); registerMessageThread(mail, tid); } } return emails; } function formatEmailHistory(emails: EmailRecord[]): string { if (emails.length === 0) return "(no emails)"; return emails .map((e, i) => { const direction = ACCOUNT_SET.has(e.from) ? "SENT" : "RECEIVED"; return [ `--- [${i + 1}] ${direction} | ${e.date} ---`, `From: ${e.from}`, `To: ${e.to}`, `Subject: ${e.subject}`, "", e.body || "(empty)", ].join("\n"); }) .join("\n\n"); } const DEDUP_TTL_MS = 8 * 24 * 60 * 60 * 1000; const DEDUP_FILE = pathLib.join(__dirname, "dedup.json"); const processedMessageIds = new Map(); const processedFingerprints = new Map(); const retryCounts = new Map(); const MAX_RETRIES = 2; function saveDedupToDisk(): void { try { fs.writeFileSync(DEDUP_FILE, JSON.stringify({ messageIds: Array.from(processedMessageIds.entries()), fingerprints: Array.from(processedFingerprints.entries()), }), "utf8"); } catch (err) { console.error("[email] failed to save dedup:", err); } } function loadDedupFromDisk(): void { try { if (!fs.existsSync(DEDUP_FILE)) return; const data = JSON.parse(fs.readFileSync(DEDUP_FILE, "utf8")); const cutoff = Date.now() - DEDUP_TTL_MS; for (const [k, v] of data.messageIds || []) { if (v > cutoff) processedMessageIds.set(k, v); } for (const [k, v] of data.fingerprints || []) { if (v > cutoff) processedFingerprints.set(k, v); } console.log(`[email] loaded ${processedMessageIds.size} dedup entries from disk`); } catch (err) { console.error("[email] failed to load dedup:", err); } } function pruneDedup(): void { const cutoff = Date.now() - DEDUP_TTL_MS; for (const [key, ts] of processedMessageIds) { if (ts < cutoff) processedMessageIds.delete(key); } for (const [key, ts] of processedFingerprints) { if (ts < cutoff) processedFingerprints.delete(key); } for (const [key, entry] of messageIdToThreadId) { if (entry.ts < cutoff) messageIdToThreadId.delete(key); } saveThreadMap(); saveDedupToDisk(); } function emailFingerprint(from: string, subject: string, body: string): string { const normSubject = (subject || "").replace(/^(Re:\s*|Fwd?:\s*)+/gi, "").trim().toLowerCase(); const normBody = (body || "").trim().slice(0, 100).toLowerCase(); return `${from}|${normSubject}|${normBody}`; } const inboxSessions = new Map(); function emailDomain(account: string): string { if (account.includes("palace.fund")) return "palacefund"; if (account.includes("palacering.com")) return "palacelab"; return "junwonhome"; } async function initInboxSession(account: string, emails: EmailRecord[]): Promise { const existing = inboxSessions.get(account); if (existing) { existing.kill(); inboxSessions.delete(account); } const bridge = new ClaudeBridge(); bridge.model = "sonnet"; bridge.channel = "email"; bridge.domain = emailDomain(account); bridge.maxTurns = 1000; bridge.setStateFile(pathLib.join(__dirname, `../logs/sessions/.email-inbox-${account.replace(/[^a-z0-9]/gi, "-")}`)); const history = formatEmailHistory(emails); await bridge.send( `You are Ace (${account}). You work for Junwon. This is YOUR inbox — emails sent to ${account}. You have full visibility of your inbox for cross-inbox tasks: checking what's new, flagging urgent emails, detecting duplicates, summarizing inbox state, and answering questions from Junwon. You do NOT generate replies to individual email threads — that's handled by per-thread sessions. Here is the complete email history (${emails.length} emails): ${history} You are now ready. When notified of new emails, acknowledge them briefly internally. When Junwon asks inbox questions, answer directly and concisely.` ); inboxSessions.set(account, bridge); console.log(`[email:${account}] inbox session initialized (${emails.length} emails)`); } const threadBridges = new Map(); const IDLE_TIMEOUT_MS = 15 * 60 * 1000; const threadIdleTimers = new Map>(); function threadKey(account: string, threadId: string): string { return `${account}:${threadId}`; } function resetThreadIdleTimer(key: string): void { const existing = threadIdleTimers.get(key); if (existing) clearTimeout(existing); threadIdleTimers.set(key, setTimeout(() => { const bridge = threadBridges.get(key); if (bridge) { console.log(`[email] idle timeout: cleaning up thread ${key}`); bridge.kill(); threadBridges.delete(key); } threadIdleTimers.delete(key); }, IDLE_TIMEOUT_MS)); } const allEmailsByAccount = new Map(); function getThreadEmails(threadId: string): EmailRecord[] { const all: EmailRecord[] = []; for (const emails of allEmailsByAccount.values()) { all.push(...emails.filter((e) => belongsToThread(e, threadId))); } return all; } async function sendToThread(account: string, threadId: string, content: Array>): Promise { const key = threadKey(account, threadId); let bridge = threadBridges.get(key); const isNewSession = !bridge; if (!bridge) { bridge = new ClaudeBridge(); bridge.model = "sonnet"; bridge.channel = "email"; bridge.domain = emailDomain(account); bridge.maxTurns = 1000; bridge.setStateFile(pathLib.join(__dirname, `../logs/sessions/.email-thread-${key.replace(/[^a-z0-9]/gi, "-")}`)); } const existingTimer = threadIdleTimers.get(key); if (existingTimer) { clearTimeout(existingTimer); threadIdleTimers.delete(key); } try { if (isNewSession) { const threadEmails = getThreadEmails(threadId); const historySection = threadEmails.length > 0 ? `Here is the thread history (${threadEmails.length} email(s)):\n\n${formatEmailHistory(threadEmails)}\n\n---\n\n` : `This is a new thread — no prior history.\n\n`; await bridge.send( `You are Ace (${account}), Junwon's personal AI assistant. You are handling one specific email thread. ${historySection}You are now ready to handle incoming emails in this thread. Respond as Ace — direct, helpful, no fluff. Reply with just the email body text (no subject line, no headers). IMPORTANT — File attachments: If you create, reference, or mention ANY files in your response, you MUST include [ATTACH:/absolute/path/to/file] markers for each file. Images will be embedded inline; other files will be sent as regular attachments. You can include multiple [ATTACH:...] markers. Always attach deliverables — the recipient cannot see files on disk, only what you attach to the email.` ); console.log(`[email:${account}] thread session created: ${threadId}`); } else { console.log(`[email:${account}] resuming thread session: ${threadId}`); } const response = await bridge.sendContent(content); threadBridges.set(key, bridge); resetThreadIdleTimer(key); return response; } catch (err) { bridge.kill(); threadBridges.delete(key); throw err; } } async function handleMail(account: string, mail: ParsedMail): Promise { const from = extractAddress(mail.from); const body = extractBody(mail); const images = (mail.attachments || []).filter((a) => IMAGE_TYPES.has(a.contentType)); const audios = (mail.attachments || []).filter((a) => AUDIO_TYPES.has(a.contentType)); const fromDomain = from.split("@")[1] || ""; const destFolder = DOMAIN_FOLDER[fromDomain] ?? "others"; if (ACCOUNT_SET.has(from)) { console.log(`[email:${account}] self-sent mail, skipping`); return destFolder; } const msgId = mail.messageId; if (msgId) { if (processedMessageIds.has(msgId)) { console.log(`[email:${account}] duplicate message-id ${msgId}, skipping`); return destFolder; } } if (!body && images.length === 0 && audios.length === 0) { console.log(`[email:${account}] empty mail from ${from} subj="${mail.subject}" text=${(mail.text||"").length} html=${(mail.html||"").length}, skipping`); return destFolder; } const subject = mail.subject || "(no subject)"; const fingerprint = emailFingerprint(from, subject, body); if (processedFingerprints.has(fingerprint)) { console.log(`[email:${account}] duplicate fingerprint (cross-account dedup), skipping: ${subject}`); return destFolder; } const threadId = getThreadId(mail); registerMessageThread(mail, threadId); console.log(`[email:${account}] from ${from}: ${subject} | thread: ${threadId}`); const inboxSession = inboxSessions.get(account); if (inboxSession) { inboxSession .send(`[inbox update] New email — From: ${from}, Subject: ${subject}`) .catch((err) => console.error(`[email:${account}] inbox notification failed:`, err)); } try { const content: Array> = []; for (const img of images) { content.push({ type: "image", source: { type: "base64", media_type: img.contentType, data: img.content.toString("base64"), }, }); } const transcripts: string[] = []; for (const audio of audios) { console.log(`[email:${account}] transcribing audio: ${audio.filename || "audio"} (${audio.contentType})`); const transcript = await transcribeBuffer( audio.content, audio.filename || "audio.mp3", audio.contentType ); console.log(`[email:${account}] transcript: ${transcript.slice(0, 100)}...`); transcripts.push(`[Voice message: "${transcript}"]`); } const bodyParts = [body, ...transcripts].filter(Boolean).join("\n\n"); const isJunwon = JUNWON_EMAILS.has(from); const fromDomainFamily = FAMILY_DOMAINS.has(fromDomain); const alwaysReply = isJunwon || fromDomainFamily; const prompt = alwaysReply ? [ isJunwon ? `Email from Junwon (your boss). Always reply.` : `Email from ${from} (family/palace member). Always reply.`, `From: ${from}`, `Subject: ${subject}`, ``, bodyParts, ].join("\n") : [ `New email — respond as Ace, or start your reply with [NO_REPLY] (and nothing else) if this email doesn't warrant a response (automated notifications, newsletters, bounces, etc.):`, `From: ${from}`, `Subject: ${subject}`, ``, bodyParts, ].join("\n"); content.push({ type: "text", text: prompt }); let response: string | undefined; try { response = await sendToThread(account, threadId, content); } catch (err) { const hasImages = content.some((c) => c.type === "image"); if (hasImages) { console.error(`[email:${account}] API failed with images, retrying text-only:`, err); const textOnly = content.filter((c) => c.type !== "image"); textOnly[textOnly.length - 1] = { type: "text", text: `[Note: This email had ${images.length} image attachment(s) that could not be processed by the API.]\n\n${textOnly[textOnly.length - 1].text}`, }; try { response = await sendToThread(account, threadId, textOnly); } catch (retryErr) { console.error(`[email:${account}] text-only retry also failed:`, retryErr); } } else { console.error(`[email:${account}] thread session failed:`, err); } } if (!response) { const retryKey = msgId || fingerprint; const attempts = (retryCounts.get(retryKey) || 0) + 1; retryCounts.set(retryKey, attempts); if (attempts <= MAX_RETRIES) { console.error(`[email:${account}] thread session failed for "${subject}", will retry (attempt ${attempts}/${MAX_RETRIES})`); throw new Error(`retry-eligible: attempt ${attempts}`); } const errorReply = `I received your email but hit an error processing it. Junwon — if you're seeing this, the email thread session for "${subject}" failed after ${attempts} attempts. Check email daemon logs for details.`; const smtp = smtpTransports.get(account)!; await smtp.sendMail({ from: `Ace <${account}>`, to: from, subject: subject.startsWith("Re:") ? subject : `Re: ${subject}`, text: errorReply, inReplyTo: mail.messageId, references: mail.messageId ? mail.messageId : undefined, }); console.error(`[email:${account}] sent error notification to ${from} for: ${subject} (after ${attempts} attempts)`); if (msgId) processedMessageIds.set(msgId, Date.now()); processedFingerprints.set(fingerprint, Date.now()); saveDedupToDisk(); return destFolder; } if (msgId) processedMessageIds.set(msgId, Date.now()); processedFingerprints.set(fingerprint, Date.now()); saveDedupToDisk(); if (msgId) retryCounts.delete(msgId); retryCounts.delete(fingerprint); if (!alwaysReply && response.trimStart().startsWith("[NO_REPLY]")) { console.log(`[email:${account}] Ace chose not to reply to ${from}: ${subject}`); return destFolder; } const { cleanText: replyText, attachments: fileAttachments } = extractAttachments(response); const replySubject = subject.startsWith("Re:") ? subject : `Re: ${subject}`; const replyBodyHtml = await marked(replyText, { breaks: true }); const replyHtml = wrapHtml(replyBodyHtml, undefined, "Auto-reply"); const replyPlainText = "[Auto-reply]\n\n" + replyBodyHtml .replace(/<[^>]+>/g, "") .replace(/&/g, "&") .replace(/</g, "<") .replace(/>/g, ">") .replace(/"/g, '"') .replace(/'/g, "'") .replace(/\n{3,}/g, "\n\n") .trim(); let voiceBuffer: Buffer | null = null; if (audios.length > 0) { try { voiceBuffer = await synthesize(replyPlainText); console.log(`[email:${account}] synthesized voice reply (${voiceBuffer.length} bytes)`); } catch (err) { console.error(`[email:${account}] TTS failed, sending text only:`, err); } } const allAttachments: Array<{ filename: string; content: Buffer; contentType: string; cid?: string }> = []; if (voiceBuffer) { allAttachments.push({ filename: "reply.mp3", content: voiceBuffer, contentType: "audio/mpeg" }); } allAttachments.push(...fileAttachments); const smtp = smtpTransports.get(account)!; const sentInfo = await smtp.sendMail({ from: `Ace <${account}>`, to: from, subject: replySubject, text: replyPlainText, html: replyHtml, inReplyTo: mail.messageId, references: mail.messageId ? [ ...(Array.isArray(mail.references) ? mail.references : mail.references ? [mail.references] : []), mail.messageId, ].join(" ") : undefined, attachments: allAttachments.length > 0 ? allAttachments : undefined, }); if (sentInfo.messageId) { const sentId = sentInfo.messageId.startsWith("<") ? sentInfo.messageId : `<${sentInfo.messageId}>`; messageIdToThreadId.set(sentId, { threadId, ts: Date.now() }); } const acctEmails = allEmailsByAccount.get(account) || []; acctEmails.push({ uid: 0, from: account, to: from, date: new Date().toISOString(), subject: replySubject, body: replyPlainText, messageId: sentInfo.messageId, inReplyTo: mail.messageId, }); allEmailsByAccount.set(account, acctEmails); console.log(`[email:${account}] replied to ${from}`); } catch (err: any) { if (err?.message?.startsWith("retry-eligible")) throw err; console.error(`[email:${account}] error:`, err); } return destFolder; } async function ensureFolders(client: ImapFlow): Promise { const existing = await client.list(); const existingNames = new Set(existing.map((m) => m.name)); for (const folder of FOLDERS) { if (!existingNames.has(folder)) { try { await client.mailboxCreate(folder); console.log(`[email] created folder: ${folder}`); } catch (err) { console.error(`[email] failed to create folder ${folder}:`, err); } } } } const claimedUidsByAccount = new Map>(); type ThreadItem = { uid: number; mail: ParsedMail }; const threadQueues = new Map(); const threadProcessing = new Set(); function enqueueThread( account: string, threadId: string, items: ThreadItem[], claimedUids: Set, markSeenAndMove: (uid: number, folder: string) => Promise ): void { const key = threadKey(account, threadId); const queue = threadQueues.get(key) || []; queue.push(...items); threadQueues.set(key, queue); if (threadProcessing.has(key)) return; threadProcessing.add(key); (async () => { while (true) { const q = threadQueues.get(key); if (!q || q.length === 0) break; const item = q.shift()!; try { const folder = await handleMail(account, item.mail); await markSeenAndMove(item.uid, folder); claimedUids.delete(item.uid); } catch (err) { console.error(`[email:${account}] failed uid ${item.uid}:`, err); claimedUids.delete(item.uid); } } threadQueues.delete(key); threadProcessing.delete(key); })(); } async function connect(account: string) { if (!claimedUidsByAccount.has(account)) claimedUidsByAccount.set(account, new Set()); const claimedUids = claimedUidsByAccount.get(account)!; const client = new ImapFlow({ host: "mailserver.purelymail.com", port: 993, secure: true, auth: { user: account, pass: pass! }, maxIdleTime: 4 * 60 * 1000, logger: false, }); let fetching = false; let lastActivity = Date.now(); async function markSeenAndMove(uid: number, folder: string): Promise { const lock = await client.getMailboxLock("INBOX"); try { await client.messageFlagsAdd(String(uid), ["\\Seen"], { uid: true }); await client.messageMove(String(uid), folder, { uid: true }); } finally { lock.release(); } } async function processUnseen() { if (fetching) { console.log(`[email:${account}] fetch already running, skipping`); return; } fetching = true; try { const pending: Array<{ uid: number; mail: ParsedMail }> = []; { const lock = await client.getMailboxLock("INBOX"); lastActivity = Date.now(); try { const uids = (await client.search({ unseen: true }, { uid: true })) || []; const newUids = uids.filter((uid) => !claimedUids.has(uid)); if (newUids.length === 0) return; console.log(`[email:${account}] ${newUids.length} unprocessed message(s)`); for (const uid of newUids) claimedUids.add(uid); for (const uid of newUids) { try { const msg = await client.fetchOne(String(uid), { source: true }, { uid: true }); if (msg?.source) { const mail = await simpleParser(msg.source); pending.push({ uid, mail }); const fromAddr = extractAddress(mail.from); const toAddr = (Array.isArray(mail.to) ? mail.to[0] : mail.to)?.value?.[0]?.address || ""; const refs = mail.references; const refsArr = Array.isArray(refs) ? refs : typeof refs === "string" && refs.trim() ? refs.trim().split(/\s+/) : undefined; const acctEmails = allEmailsByAccount.get(account) || []; acctEmails.push({ uid, from: fromAddr, to: toAddr, date: mail.date?.toISOString() || "", subject: mail.subject || "(no subject)", body: extractBody(mail), messageId: mail.messageId, inReplyTo: typeof mail.inReplyTo === "string" ? mail.inReplyTo : undefined, references: refsArr, }); allEmailsByAccount.set(account, acctEmails); } else { claimedUids.delete(uid); } } catch (fetchErr) { console.error(`[email:${account}] fetchOne failed uid ${uid}:`, fetchErr); claimedUids.delete(uid); } } } finally { lock.release(); } } const byThread = new Map(); for (const item of pending) { const tid = getThreadId(item.mail); if (!byThread.has(tid)) byThread.set(tid, []); byThread.get(tid)!.push(item); } for (const items of byThread.values()) { items.sort((a, b) => (a.mail.date?.getTime() ?? 0) - (b.mail.date?.getTime() ?? 0)); } for (const [threadId, threadItems] of byThread) { enqueueThread(account, threadId, threadItems, claimedUids, markSeenAndMove); } } finally { fetching = false; } } await client.connect(); lastActivity = Date.now(); console.log(`[ace] Email connected: ${account}`); await ensureFolders(client); { const lock = await client.getMailboxLock("INBOX"); try { console.log(`[email:${account}] Fetching email history...`); const emails = await fetchAllEmails(client); allEmailsByAccount.set(account, emails); console.log(`[email:${account}] Loaded ${emails.length} emails`); initInboxSession(account, emails).catch((err) => console.error(`[email:${account}] inbox session init failed:`, err) ); } finally { lock.release(); } } console.log(`[ace] Ready: ${account}`); await processUnseen(); client.on("error", (err: Error) => { console.error(`[email:${account}] IMAP error:`, err.message); console.log(`[email:${account}] forcing close to trigger reconnect`); try { client.close(); } catch {} }); client.on("exists", async ({ count, prevCount }: { count: number; prevCount: number }) => { if (count > prevCount) { console.log(`[email:${account}] IDLE: ${count - prevCount} new message(s)`); try { await processUnseen(); } catch (err) { console.error(`[email:${account}] processUnseen error (exists event):`, err); } } }); const poller = setInterval(async () => { try { await processUnseen(); } catch (err) { console.error(`[email:${account}] processUnseen error (poller):`, err); } }, 60_000); const watchdog = setInterval(() => { const silentMs = Date.now() - lastActivity; if (silentMs > 3 * 60 * 1000) { console.error(`[email:${account}] watchdog: no activity for ${Math.round(silentMs / 60000)}min, forcing reconnect`); clearInterval(watchdog); try { client.close(); } catch {} } }, 2 * 60 * 1000); client.on("close", () => { clearInterval(poller); clearInterval(watchdog); if (reconnectScheduled.has(account)) { console.log(`[email:${account}] reconnect already scheduled, skipping duplicate`); return; } reconnectScheduled.add(account); console.log(`[email:${account}] connection closed, reconnecting in 10s...`); setTimeout(() => { reconnectScheduled.delete(account); connect(account).catch((err: any) => { console.error(`[email:${account}] reconnect failed:`, err?.message); reconnectScheduled.add(account); setTimeout(() => { reconnectScheduled.delete(account); connect(account).catch((err2: any) => console.error(`[email:${account}] reconnect retry failed:`, err2?.message) ); }, 30_000); }); }, 10_000); }); } const reconnectScheduled = new Set(); async function run() { loadDedupFromDisk(); loadThreadMap(); setInterval(pruneDedup, 5 * 60 * 1000); await Promise.all(ACCOUNTS.map((account) => connect(account))); } run().catch((err) => { console.error("[email] fatal:", err); process.exit(1); });