""" 邮件接码处理器 - 用于接收和解析 OpenAI 发送的验证码邮件 """ from typing import Optional, Dict, Any, List import re import time import asyncio from utils.logger import logger class MailHandler: """邮件接码处理器基类""" OTP_SUBJECT_KEYWORDS = ["openai", "verification", "verify", "code"] OTP_SENDER = "noreply@tm.openai.com" def __init__(self, config: Optional[Dict[str, Any]] = None): self.config = config or {} self.mail_type = self.config.get("type", "not_configured") if not config: logger.warning("MailHandler initialized without configuration.") else: logger.info(f"MailHandler initialized with type: {self.mail_type}") @staticmethod def create(config: Optional[Dict[str, Any]]) -> "MailHandler": """工厂方法:根据配置创建对应的邮件处理器""" if not config: return MailHandler(config) mail_type = config.get("type", "manual") if mail_type == "imap": return IMAPMailHandler(config) elif mail_type == "cloudmail": return CloudMailHandler(config) else: return MailHandler(config) async def wait_for_otp(self, email: str, timeout: int = 300, check_interval: int = 5) -> str: """等待并提取 OTP 验证码(需子类实现)""" logger.info(f"Waiting for OTP for {email} (timeout: {timeout}s)") raise NotImplementedError("Mail handler not configured.") def _extract_otp(self, text: str) -> Optional[str]: """从邮件正文中提取 6 位 OTP 验证码""" text = re.sub(r'<[^>]+>', ' ', text) patterns = [ r'verification code is[:\s]+(\d{6})', r'code[:\s]+(\d{6})', r'enter[:\s]+(\d{6})', r'(\d{6})', ] for pattern in patterns: match = re.search(pattern, text, re.IGNORECASE) if match: otp = match.group(1) logger.info(f"OTP extracted: {otp}") return otp logger.warning("Failed to extract OTP from email text") return None def verify_email_deliverability(self, email: str) -> bool: """验证邮箱地址格式是否有效""" email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$' if not re.match(email_pattern, email): logger.warning(f"Invalid email format: {email}") return False return True class IMAPMailHandler(MailHandler): """基于 IMAP 的邮件处理器""" async def wait_for_otp(self, email: str, timeout: int = 300, check_interval: int = 5) -> str: """使用 IMAP 等待 OTP 邮件""" try: from imap_tools import MailBox, AND except ImportError: raise ImportError("imap-tools not installed. Install with: pip install imap-tools") if not all(k in self.config for k in ["host", "username", "password"]): raise ValueError("IMAP configuration incomplete. Required: host, username, password") start_time = time.time() logger.info(f"Connecting to IMAP server: {self.config['host']}") while time.time() - start_time < timeout: try: with MailBox(self.config["host"]).login( self.config["username"], self.config["password"] ) as mailbox: for msg in mailbox.fetch( AND(from_=self.OTP_SENDER, seen=False), reverse=True, limit=10 ): if any(kw in msg.subject.lower() for kw in self.OTP_SUBJECT_KEYWORDS): otp = self._extract_otp(msg.text or msg.html) if otp: logger.success(f"OTP received: {otp}") mailbox.flag([msg.uid], ['\\Seen'], True) return otp except Exception as e: logger.warning(f"IMAP check failed: {e}") logger.debug(f"No OTP found, waiting {check_interval}s") time.sleep(check_interval) raise TimeoutError(f"Timeout waiting for OTP email ({timeout}s)") class CloudMailHandler(MailHandler): """Cloud Mail API 邮件处理器""" def __init__(self, config: Optional[Dict[str, Any]] = None): super().__init__(config) required = ["api_base_url", "token"] missing = [key for key in required if not self.config.get(key)] if missing: raise ValueError(f"CloudMail config incomplete. Missing: {', '.join(missing)}") self.api_base_url = self.config["api_base_url"].rstrip("/") self.token = self.config["token"] self.target_email = self.config.get("target_email") self.domain = self.config.get("domain") self._client: Optional[Any] = None logger.info(f"CloudMailHandler initialized (API: {self.api_base_url})") async def _get_client(self): """懒加载 HTTP 客户端""" if self._client is None: try: import httpx except ImportError: raise ImportError("httpx not installed. Install with: pip install httpx") self._client = httpx.AsyncClient( timeout=30.0, headers={ "Content-Type": "application/json", "Authorization": self.token } ) return self._client async def _query_emails( self, to_email: str, send_email: Optional[str] = None, subject: Optional[str] = None, time_sort: str = "desc", num: int = 1, size: int = 20 ) -> List[Dict[str, Any]]: """查询邮件列表 (POST /api/public/emailList)""" client = await self._get_client() url = f"{self.api_base_url}/api/public/emailList" payload = { "toEmail": to_email, "type": 0, "isDel": 0, "timeSort": time_sort, "num": num, "size": size } if send_email: payload["sendEmail"] = send_email if subject: payload["subject"] = subject try: resp = await client.post(url, json=payload) if resp.status_code in [401, 403]: raise RuntimeError("CloudMail token expired or invalid.") if resp.status_code != 200: raise RuntimeError(f"CloudMail API error: {resp.status_code}") data = resp.json() if data.get("code") != 200: raise RuntimeError(f"CloudMail API error: {data.get('message')}") result = data.get("data", {}) if isinstance(result, list): emails = result elif isinstance(result, dict): emails = result.get("list", []) else: emails = [] logger.debug(f"CloudMail: Fetched {len(emails)} emails") return emails except Exception as e: if "httpx" in str(type(e).__module__): raise RuntimeError(f"CloudMail API network error: {e}") raise async def ensure_email_exists(self, email: str) -> bool: """确保邮箱账户存在(如果不存在则创建)""" logger.info(f"CloudMail: Creating email account {email}...") try: result = await self.add_users([{"email": email}]) logger.success(f"CloudMail: Email {email} created successfully") logger.debug(f"CloudMail: API response: {result}") return True except RuntimeError as e: error_msg = str(e).lower() if "already" in error_msg or "exist" in error_msg or "duplicate" in error_msg: logger.debug(f"CloudMail: Email {email} already exists") return True logger.error(f"CloudMail: Failed to create email {email}: {e}") raise except Exception as e: logger.error(f"CloudMail: Failed to create email {email}: {e}") raise async def wait_for_otp(self, email: str, timeout: int = 300, check_interval: int = 5) -> str: """等待 OTP 邮件(轮询实现)""" start_time = time.time() logger.info(f"CloudMail: Waiting for OTP for {email} (timeout: {timeout}s)") while time.time() - start_time < timeout: try: # 模糊匹配 openai 发件人 emails = await self._query_emails( to_email=email, send_email="%openai%", time_sort="desc", size=10 ) # 备选:不过滤发件人 if not emails: emails = await self._query_emails( to_email=email, time_sort="desc", size=10 ) for msg in emails: subject = msg.get("subject", "").lower() sender = msg.get("sendEmail", "").lower() is_from_openai = "openai" in sender or "noreply" in sender if any(kw in subject for kw in self.OTP_SUBJECT_KEYWORDS) or is_from_openai: content = msg.get("text") or msg.get("content") or "" otp = self._extract_otp(content) if otp: logger.success(f"CloudMail: OTP received: {otp} (from: {sender})") return otp remaining = timeout - (time.time() - start_time) logger.debug(f"CloudMail: No OTP found, waiting {check_interval}s (remaining: {int(remaining)}s)") await asyncio.sleep(check_interval) except Exception as e: logger.warning(f"CloudMail: Query error: {e}") await asyncio.sleep(check_interval) raise TimeoutError(f"Timeout waiting for OTP email ({timeout}s)") async def add_users(self, users: List[Dict[str, str]]) -> Dict[str, Any]: """添加用户 (POST /api/public/addUser)""" client = await self._get_client() url = f"{self.api_base_url}/api/public/addUser" payload = {"list": users} try: resp = await client.post(url, json=payload) if resp.status_code in [401, 403]: raise RuntimeError("CloudMail token expired or invalid.") if resp.status_code != 200: raise RuntimeError(f"CloudMail addUser error: {resp.status_code}") data = resp.json() if data.get("code") != 200: raise RuntimeError(f"CloudMail addUser error: {data.get('message')}") logger.info(f"CloudMail: Users added: {len(users)}") return data except Exception as e: if "httpx" in str(type(e).__module__): raise RuntimeError(f"CloudMail API network error: {e}") raise async def close(self): """清理资源""" if self._client: await self._client.aclose() logger.debug("CloudMail: HTTP client closed") __all__ = ["MailHandler", "IMAPMailHandler", "CloudMailHandler"]