""" 邮件接码处理器 用于接收和解析 OpenAI 发送的验证码邮件 ⚠️ 本模块提供预留接口,用户需要根据实际情况配置邮箱服务 支持的邮箱方案: 1. IMAP 收件 (Gmail, Outlook, 自建邮箱) 2. 临时邮箱 API (TempMail, Guerrilla Mail, etc.) 3. 邮件转发服务 """ from typing import Optional, Dict, Any, List import re import time import asyncio from utils.logger import logger class MailHandler: """ 邮件接码处理器 ⚠️ 预留接口 - 用户需要配置实际的邮箱服务 使用场景: - 接收 OpenAI 发送的 6 位数字 OTP 验证码 - 解析邮件内容提取验证码 - 支持超时和重试机制 """ # OTP 邮件特征 OTP_SUBJECT_KEYWORDS = ["openai", "verification", "verify", "code"] OTP_SENDER = "noreply@tm.openai.com" # OpenAI 发件人地址 def __init__(self, config: Optional[Dict[str, Any]] = None): """ 初始化邮件处理器 参数: config: 邮箱配置字典,可能包含: - type: "imap" | "tempmail" | "api" | "cloudmail" - host: IMAP 服务器地址 (如果使用 IMAP) - port: IMAP 端口 (默认 993) - username: 邮箱用户名 - password: 邮箱密码 - api_key: 临时邮箱 API Key (如果使用 API) """ self.config = config or {} self.mail_type = self.config.get("type", "not_configured") if not config: logger.warning( "MailHandler initialized without configuration. " "OTP retrieval will fail until configured." ) else: logger.info(f"MailHandler initialized with type: {self.mail_type}") @staticmethod def create(config: Optional[Dict[str, Any]]) -> "MailHandler": """ 工厂方法:创建合适的邮件处理器 根据配置中的 type 字段自动选择正确的 handler 实现 参数: config: 邮箱配置字典 返回: MailHandler 实例(IMAPMailHandler 或 CloudMailHandler) """ if not config: return MailHandler(config) mail_type = config.get("type", "manual") if mail_type == "imap": return IMAPMailHandler(config) elif mail_type == "cloudmail": return CloudMailHandler(config) else: # 默认处理器(会抛出 NotImplementedError) return MailHandler(config) async def wait_for_otp( self, email: str, timeout: int = 300, check_interval: int = 5 ) -> str: """ 等待并提取 OTP 验证码 ⚠️ 预留接口 - 用户需要实现此方法 参数: email: 注册邮箱地址 timeout: 超时时间(秒),默认 300 秒(5 分钟) check_interval: 检查间隔(秒),默认 5 秒 返回: 6 位数字验证码(例如 "123456") 抛出: NotImplementedError: 用户需要实现此方法 TimeoutError: 超时未收到邮件 ValueError: 邮件格式错误,无法提取 OTP 集成示例: ```python # 方案 1: 使用 IMAP (imap-tools 库) from imap_tools import MailBox with MailBox(self.config["host"]).login( self.config["username"], self.config["password"] ) as mailbox: for msg in mailbox.fetch(AND(from_=self.OTP_SENDER, seen=False)): otp = self._extract_otp(msg.text) if otp: return otp # 方案 2: 使用临时邮箱 API import httpx async with httpx.AsyncClient() as client: resp = await client.get( f"https://tempmail.api/messages?email={email}", headers={"Authorization": f"Bearer {self.config['api_key']}"} ) messages = resp.json() for msg in messages: otp = self._extract_otp(msg["body"]) if otp: return otp # 方案 3: 手动输入(调试用) print(f"Please enter OTP for {email}:") return input().strip() ``` """ logger.info( f"Waiting for OTP for {email} " f"(timeout: {timeout}s, check_interval: {check_interval}s)" ) raise NotImplementedError( "❌ Mail handler not configured.\n\n" "User needs to configure email service for OTP retrieval.\n\n" "Configuration options:\n\n" "1. IMAP (Gmail, Outlook, custom):\n" " config = {\n" " 'type': 'imap',\n" " 'host': 'imap.gmail.com',\n" " 'port': 993,\n" " 'username': 'your@email.com',\n" " 'password': 'app_password'\n" " }\n\n" "2. Temporary email API:\n" " config = {\n" " 'type': 'tempmail',\n" " 'api_key': 'YOUR_API_KEY',\n" " 'api_endpoint': 'https://api.tempmail.com'\n" " }\n\n" "3. Manual input (for debugging):\n" " config = {'type': 'manual'}\n\n" "Example implementation location: utils/mail_box.py -> wait_for_otp()" ) def _extract_otp(self, text: str) -> Optional[str]: """ 从邮件正文中提取 OTP 验证码 OpenAI 邮件格式示例: "Your OpenAI verification code is: 123456" "Enter this code: 123456" 参数: text: 邮件正文(纯文本或 HTML) 返回: 6 位数字验证码,未找到则返回 None """ # 清理 HTML 标签(如果有) text = re.sub(r'<[^>]+>', ' ', text) # 常见的 OTP 模式 patterns = [ r'verification code is[:\s]+(\d{6})', # "verification code is: 123456" r'code[:\s]+(\d{6})', # "code: 123456" r'enter[:\s]+(\d{6})', # "enter: 123456" r'(\d{6})', # 任意 6 位数字(最后尝试) ] for pattern in patterns: match = re.search(pattern, text, re.IGNORECASE) if match: otp = match.group(1) logger.info(f"OTP extracted: {otp}") return otp logger.warning("Failed to extract OTP from email text") return None def _check_imap(self, email: str, timeout: int) -> Optional[str]: """ 使用 IMAP 检查邮件(预留方法) 参数: email: 注册邮箱 timeout: 超时时间(秒) 返回: OTP 验证码,未找到则返回 None """ # TODO: 用户实现 IMAP 检查逻辑 # 需要安装: pip install imap-tools raise NotImplementedError("IMAP checking not implemented") def _check_tempmail_api(self, email: str, timeout: int) -> Optional[str]: """ 使用临时邮箱 API 检查邮件(预留方法) 参数: email: 注册邮箱 timeout: 超时时间(秒) 返回: OTP 验证码,未找到则返回 None """ # TODO: 用户实现临时邮箱 API 调用逻辑 raise NotImplementedError("Temp mail API not implemented") def generate_temp_email(self) -> str: """ 生成临时邮箱地址(可选功能) ⚠️ 预留接口 - 如果使用临时邮箱服务,需要实现此方法 返回: 临时邮箱地址(例如 "random123@tempmail.com") 抛出: NotImplementedError: 用户需要实现此方法 """ raise NotImplementedError( "Temp email generation not implemented. " "Integrate a temp mail service API if needed." ) def verify_email_deliverability(self, email: str) -> bool: """ 验证邮箱地址是否可以接收邮件(可选功能) 参数: email: 邮箱地址 返回: True 如果邮箱有效且可接收邮件,否则 False """ # 基本格式验证 email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$' if not re.match(email_pattern, email): logger.warning(f"Invalid email format: {email}") return False # TODO: 用户可以添加更严格的验证逻辑 # 例如:DNS MX 记录查询、SMTP 验证等 logger.info(f"Email format valid: {email}") return True class IMAPMailHandler(MailHandler): """ 基于 IMAP 的邮件处理器(完整实现示例) ⚠️ 这是一个参考实现,用户可以根据需要修改 依赖: pip install imap-tools """ async def wait_for_otp( self, email: str, timeout: int = 300, check_interval: int = 5 ) -> str: """ 使用 IMAP 等待 OTP 邮件 参数: email: 注册邮箱 timeout: 超时时间(秒) check_interval: 检查间隔(秒) 返回: 6 位数字验证码 抛出: ImportError: 未安装 imap-tools TimeoutError: 超时未收到邮件 ValueError: 配置错误或邮件格式错误 """ try: from imap_tools import MailBox, AND except ImportError: raise ImportError( "imap-tools not installed. Install with: pip install imap-tools" ) if not all(k in self.config for k in ["host", "username", "password"]): raise ValueError( "IMAP configuration incomplete. Required: host, username, password" ) start_time = time.time() logger.info(f"Connecting to IMAP server: {self.config['host']}") while time.time() - start_time < timeout: try: with MailBox(self.config["host"]).login( self.config["username"], self.config["password"] ) as mailbox: # 查找未读邮件,来自 OpenAI for msg in mailbox.fetch( AND(from_=self.OTP_SENDER, seen=False), reverse=True, # 最新的邮件优先 limit=10 ): # 检查主题是否包含 OTP 关键词 if any(kw in msg.subject.lower() for kw in self.OTP_SUBJECT_KEYWORDS): otp = self._extract_otp(msg.text or msg.html) if otp: logger.success(f"OTP received: {otp}") # 标记为已读 mailbox.flag([msg.uid], ['\\Seen'], True) return otp except Exception as e: logger.warning(f"IMAP check failed: {e}") # 等待下一次检查 elapsed = time.time() - start_time remaining = timeout - elapsed logger.debug( f"No OTP found, waiting {check_interval}s " f"(remaining: {int(remaining)}s)" ) time.sleep(check_interval) raise TimeoutError( f"Timeout waiting for OTP email (timeout: {timeout}s). " f"Email: {email}, Sender: {self.OTP_SENDER}" ) class CloudMailHandler(MailHandler): """ Cloud Mail API handler with external token management 使用外部预生成的 Token 管理邮件,不调用 genToken API 依赖: pip install httpx 配置示例: config = { "type": "cloudmail", "api_base_url": "https://your-cloudmail-domain.com", "token": "9f4e298e-7431-4c76-bc15-4931c3a73984", "target_email": "user@example.com" # 可选 } """ def __init__(self, config: Optional[Dict[str, Any]] = None): """ 初始化 Cloud Mail handler 参数: config: 配置字典,必填项: - api_base_url: Cloud Mail API 基础 URL - token: 预生成的身份令牌 - target_email: (可选) 指定监控的邮箱地址 """ super().__init__(config) # 验证必填配置 required = ["api_base_url", "token"] missing = [key for key in required if not self.config.get(key)] if missing: raise ValueError( f"CloudMail configuration incomplete. Missing: {', '.join(missing)}\n" f"Required: api_base_url, token" ) self.api_base_url = self.config["api_base_url"].rstrip("/") self.token = self.config["token"] self.target_email = self.config.get("target_email") self.domain = self.config.get("domain") # 邮箱域名 self._client: Optional[Any] = None logger.info(f"CloudMailHandler initialized (API: {self.api_base_url}, Domain: {self.domain or 'N/A'})") async def _get_client(self): """懒加载 HTTP 客户端""" if self._client is None: try: import httpx except ImportError: raise ImportError( "httpx not installed. Install with: pip install httpx" ) self._client = httpx.AsyncClient( timeout=30.0, headers={ "Content-Type": "application/json", "Authorization": self.token } ) return self._client async def _query_emails( self, to_email: str, send_email: Optional[str] = None, subject: Optional[str] = None, time_sort: str = "desc", num: int = 1, size: int = 20 ) -> List[Dict[str, Any]]: """ 查询邮件列表 (POST /api/public/emailList) 参数: to_email: 收件人邮箱 send_email: 发件人邮箱(可选,支持 % 通配符) subject: 主题关键词(可选) time_sort: 时间排序 (desc/asc) num: 页码(从 1 开始) size: 每页数量 返回: 邮件列表 """ client = await self._get_client() url = f"{self.api_base_url}/api/public/emailList" payload = { "toEmail": to_email, "type": 0, # 0=收件箱 "isDel": 0, # 0=未删除 "timeSort": time_sort, "num": num, "size": size } # 可选参数 if send_email: payload["sendEmail"] = send_email if subject: payload["subject"] = subject try: resp = await client.post(url, json=payload) # 检查认证错误 if resp.status_code in [401, 403]: raise RuntimeError( "CloudMail token expired or invalid.\n" "Please regenerate token and update MAIL_CLOUDMAIL_TOKEN in .env\n" "Steps:\n" "1. Login to Cloud Mail with admin account\n" "2. Call POST /api/public/genToken to generate new token\n" "3. Update MAIL_CLOUDMAIL_TOKEN in .env\n" "4. Restart the program" ) if resp.status_code != 200: raise RuntimeError( f"CloudMail API error: {resp.status_code} - {resp.text[:200]}" ) data = resp.json() # 检查业务逻辑错误 if data.get("code") != 200: error_msg = data.get("message", "Unknown error") raise RuntimeError( f"CloudMail API error: {error_msg} (code: {data.get('code')})" ) # 返回邮件列表 result = data.get("data", {}) # CloudMail API 可能返回两种格式: # 1. {"data": {"list": [...]}} - 标准格式 # 2. {"data": [...]} - 直接列表格式 if isinstance(result, list): emails = result elif isinstance(result, dict): emails = result.get("list", []) else: emails = [] logger.debug(f"CloudMail: Fetched {len(emails)} emails") return emails except Exception as e: if "httpx" in str(type(e).__module__): # httpx 网络错误 raise RuntimeError(f"CloudMail API network error: {e}") else: # 重新抛出其他错误 raise async def ensure_email_exists(self, email: str) -> bool: """ 确保邮箱账户存在(如果不存在则创建) 用于在注册流程开始前自动创建 Cloud Mail 邮箱账户 参数: email: 邮箱地址 返回: True 如果邮箱已存在或成功创建 """ try: # 先尝试查询邮箱(检查是否存在) logger.debug(f"CloudMail: Checking if {email} exists...") emails = await self._query_emails( to_email=email, size=1 ) # 如果能查询到,说明邮箱存在 logger.debug(f"CloudMail: Email {email} already exists") return True except Exception as e: # 查询失败可能是邮箱不存在,尝试创建 logger.info(f"CloudMail: Creating email account {email}...") try: await self.add_users([{"email": email}]) logger.success(f"CloudMail: Email {email} created successfully") return True except Exception as create_error: logger.error(f"CloudMail: Failed to create email {email}: {create_error}") raise async def wait_for_otp( self, email: str, timeout: int = 300, check_interval: int = 5 ) -> str: """ 等待 OTP 邮件(轮询实现) 参数: email: 注册邮箱 timeout: 超时时间(秒) check_interval: 检查间隔(秒) 返回: 6 位数字验证码 抛出: TimeoutError: 超时未收到邮件 ValueError: 邮件格式错误,无法提取 OTP """ start_time = time.time() logger.info( f"CloudMail: Waiting for OTP for {email} " f"(timeout: {timeout}s, interval: {check_interval}s)" ) while time.time() - start_time < timeout: try: # 查询最近的邮件 emails = await self._query_emails( to_email=email, send_email=self.OTP_SENDER, time_sort="desc", size=10 ) # 检查每封邮件 for msg in emails: subject = msg.get("subject", "").lower() # 检查主题是否包含 OTP 关键词 if any(kw in subject for kw in self.OTP_SUBJECT_KEYWORDS): # 尝试从邮件内容提取 OTP content = msg.get("text") or msg.get("content") or "" otp = self._extract_otp(content) if otp: logger.success(f"CloudMail: OTP received: {otp}") return otp # 等待下一次检查 elapsed = time.time() - start_time remaining = timeout - elapsed logger.debug( f"CloudMail: No OTP found, waiting {check_interval}s " f"(remaining: {int(remaining)}s)" ) await asyncio.sleep(check_interval) except Exception as e: logger.warning(f"CloudMail: Query error: {e}") await asyncio.sleep(check_interval) raise TimeoutError( f"Timeout waiting for OTP email (timeout: {timeout}s). " f"Email: {email}, Sender: {self.OTP_SENDER}" ) async def add_users( self, users: List[Dict[str, str]] ) -> Dict[str, Any]: """ 添加用户 (POST /api/public/addUser) 参数: users: 用户列表,格式: [ { "email": "test@example.com", "password": "optional", # 可选 "roleName": "optional" # 可选 } ] 返回: API 响应数据 """ client = await self._get_client() url = f"{self.api_base_url}/api/public/addUser" payload = {"list": users} try: resp = await client.post(url, json=payload) # 检查认证错误 if resp.status_code in [401, 403]: raise RuntimeError( "CloudMail token expired or invalid. " "Please regenerate token and update MAIL_CLOUDMAIL_TOKEN in .env" ) if resp.status_code != 200: raise RuntimeError( f"CloudMail addUser API error: {resp.status_code} - {resp.text[:200]}" ) data = resp.json() # 检查业务逻辑错误 if data.get("code") != 200: error_msg = data.get("message", "Unknown error") raise RuntimeError( f"CloudMail addUser error: {error_msg} (code: {data.get('code')})" ) logger.info(f"CloudMail: Users added successfully: {len(users)} users") return data except Exception as e: if "httpx" in str(type(e).__module__): raise RuntimeError(f"CloudMail API network error: {e}") else: raise async def close(self): """清理资源""" if self._client: await self._client.aclose() logger.debug("CloudMail: HTTP client closed") # 导出主要接口 __all__ = [ "MailHandler", "IMAPMailHandler", "CloudMailHandler", ]