Files
autoPlus/utils/mail_box.py
2026-01-26 16:25:22 +08:00

313 lines
11 KiB
Python

"""
邮件接码处理器 - 用于接收和解析 OpenAI 发送的验证码邮件
"""
from typing import Optional, Dict, Any, List
import re
import time
import asyncio
from utils.logger import logger
class MailHandler:
"""邮件接码处理器基类"""
OTP_SUBJECT_KEYWORDS = ["openai", "verification", "verify", "code"]
OTP_SENDER = "noreply@tm.openai.com"
def __init__(self, config: Optional[Dict[str, Any]] = None):
self.config = config or {}
self.mail_type = self.config.get("type", "not_configured")
if not config:
logger.warning("MailHandler initialized without configuration.")
else:
logger.info(f"MailHandler initialized with type: {self.mail_type}")
@staticmethod
def create(config: Optional[Dict[str, Any]]) -> "MailHandler":
"""工厂方法:根据配置创建对应的邮件处理器"""
if not config:
return MailHandler(config)
mail_type = config.get("type", "manual")
if mail_type == "imap":
return IMAPMailHandler(config)
elif mail_type == "cloudmail":
return CloudMailHandler(config)
else:
return MailHandler(config)
async def wait_for_otp(self, email: str, timeout: int = 300, check_interval: int = 5) -> str:
"""等待并提取 OTP 验证码(需子类实现)"""
logger.info(f"Waiting for OTP for {email} (timeout: {timeout}s)")
raise NotImplementedError("Mail handler not configured.")
def _extract_otp(self, text: str) -> Optional[str]:
"""从邮件正文中提取 6 位 OTP 验证码"""
text = re.sub(r'<[^>]+>', ' ', text)
patterns = [
r'verification code is[:\s]+(\d{6})',
r'code[:\s]+(\d{6})',
r'enter[:\s]+(\d{6})',
r'(\d{6})',
]
for pattern in patterns:
match = re.search(pattern, text, re.IGNORECASE)
if match:
otp = match.group(1)
logger.info(f"OTP extracted: {otp}")
return otp
logger.warning("Failed to extract OTP from email text")
return None
def verify_email_deliverability(self, email: str) -> bool:
"""验证邮箱地址格式是否有效"""
email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
if not re.match(email_pattern, email):
logger.warning(f"Invalid email format: {email}")
return False
return True
class IMAPMailHandler(MailHandler):
"""基于 IMAP 的邮件处理器"""
async def wait_for_otp(self, email: str, timeout: int = 300, check_interval: int = 5) -> str:
"""使用 IMAP 等待 OTP 邮件"""
try:
from imap_tools import MailBox, AND
except ImportError:
raise ImportError("imap-tools not installed. Install with: pip install imap-tools")
if not all(k in self.config for k in ["host", "username", "password"]):
raise ValueError("IMAP configuration incomplete. Required: host, username, password")
start_time = time.time()
logger.info(f"Connecting to IMAP server: {self.config['host']}")
while time.time() - start_time < timeout:
try:
with MailBox(self.config["host"]).login(
self.config["username"],
self.config["password"]
) as mailbox:
for msg in mailbox.fetch(
AND(from_=self.OTP_SENDER, seen=False),
reverse=True,
limit=10
):
if any(kw in msg.subject.lower() for kw in self.OTP_SUBJECT_KEYWORDS):
otp = self._extract_otp(msg.text or msg.html)
if otp:
logger.success(f"OTP received: {otp}")
mailbox.flag([msg.uid], ['\\Seen'], True)
return otp
except Exception as e:
logger.warning(f"IMAP check failed: {e}")
logger.debug(f"No OTP found, waiting {check_interval}s")
time.sleep(check_interval)
raise TimeoutError(f"Timeout waiting for OTP email ({timeout}s)")
class CloudMailHandler(MailHandler):
"""Cloud Mail API 邮件处理器"""
def __init__(self, config: Optional[Dict[str, Any]] = None):
super().__init__(config)
required = ["api_base_url", "token"]
missing = [key for key in required if not self.config.get(key)]
if missing:
raise ValueError(f"CloudMail config incomplete. Missing: {', '.join(missing)}")
self.api_base_url = self.config["api_base_url"].rstrip("/")
self.token = self.config["token"]
self.target_email = self.config.get("target_email")
self.domain = self.config.get("domain")
self._client: Optional[Any] = None
logger.info(f"CloudMailHandler initialized (API: {self.api_base_url})")
async def _get_client(self):
"""懒加载 HTTP 客户端"""
if self._client is None:
try:
import httpx
except ImportError:
raise ImportError("httpx not installed. Install with: pip install httpx")
self._client = httpx.AsyncClient(
timeout=30.0,
headers={
"Content-Type": "application/json",
"Authorization": self.token
}
)
return self._client
async def _query_emails(
self,
to_email: str,
send_email: Optional[str] = None,
subject: Optional[str] = None,
time_sort: str = "desc",
num: int = 1,
size: int = 20
) -> List[Dict[str, Any]]:
"""查询邮件列表 (POST /api/public/emailList)"""
client = await self._get_client()
url = f"{self.api_base_url}/api/public/emailList"
payload = {
"toEmail": to_email,
"type": 0,
"isDel": 0,
"timeSort": time_sort,
"num": num,
"size": size
}
if send_email:
payload["sendEmail"] = send_email
if subject:
payload["subject"] = subject
try:
resp = await client.post(url, json=payload)
if resp.status_code in [401, 403]:
raise RuntimeError("CloudMail token expired or invalid.")
if resp.status_code != 200:
raise RuntimeError(f"CloudMail API error: {resp.status_code}")
data = resp.json()
if data.get("code") != 200:
raise RuntimeError(f"CloudMail API error: {data.get('message')}")
result = data.get("data", {})
if isinstance(result, list):
emails = result
elif isinstance(result, dict):
emails = result.get("list", [])
else:
emails = []
logger.debug(f"CloudMail: Fetched {len(emails)} emails")
return emails
except Exception as e:
if "httpx" in str(type(e).__module__):
raise RuntimeError(f"CloudMail API network error: {e}")
raise
async def ensure_email_exists(self, email: str) -> bool:
"""确保邮箱账户存在(如果不存在则创建)"""
logger.info(f"CloudMail: Creating email account {email}...")
try:
result = await self.add_users([{"email": email}])
logger.success(f"CloudMail: Email {email} created successfully")
logger.debug(f"CloudMail: API response: {result}")
return True
except RuntimeError as e:
error_msg = str(e).lower()
if "already" in error_msg or "exist" in error_msg or "duplicate" in error_msg:
logger.debug(f"CloudMail: Email {email} already exists")
return True
logger.error(f"CloudMail: Failed to create email {email}: {e}")
raise
except Exception as e:
logger.error(f"CloudMail: Failed to create email {email}: {e}")
raise
async def wait_for_otp(self, email: str, timeout: int = 300, check_interval: int = 5) -> str:
"""等待 OTP 邮件(轮询实现)"""
start_time = time.time()
logger.info(f"CloudMail: Waiting for OTP for {email} (timeout: {timeout}s)")
while time.time() - start_time < timeout:
try:
# 模糊匹配 openai 发件人
emails = await self._query_emails(
to_email=email,
send_email="%openai%",
time_sort="desc",
size=10
)
# 备选:不过滤发件人
if not emails:
emails = await self._query_emails(
to_email=email,
time_sort="desc",
size=10
)
for msg in emails:
subject = msg.get("subject", "").lower()
sender = msg.get("sendEmail", "").lower()
is_from_openai = "openai" in sender or "noreply" in sender
if any(kw in subject for kw in self.OTP_SUBJECT_KEYWORDS) or is_from_openai:
content = msg.get("text") or msg.get("content") or ""
otp = self._extract_otp(content)
if otp:
logger.success(f"CloudMail: OTP received: {otp} (from: {sender})")
return otp
remaining = timeout - (time.time() - start_time)
logger.debug(f"CloudMail: No OTP found, waiting {check_interval}s (remaining: {int(remaining)}s)")
await asyncio.sleep(check_interval)
except Exception as e:
logger.warning(f"CloudMail: Query error: {e}")
await asyncio.sleep(check_interval)
raise TimeoutError(f"Timeout waiting for OTP email ({timeout}s)")
async def add_users(self, users: List[Dict[str, str]]) -> Dict[str, Any]:
"""添加用户 (POST /api/public/addUser)"""
client = await self._get_client()
url = f"{self.api_base_url}/api/public/addUser"
payload = {"list": users}
try:
resp = await client.post(url, json=payload)
if resp.status_code in [401, 403]:
raise RuntimeError("CloudMail token expired or invalid.")
if resp.status_code != 200:
raise RuntimeError(f"CloudMail addUser error: {resp.status_code}")
data = resp.json()
if data.get("code") != 200:
raise RuntimeError(f"CloudMail addUser error: {data.get('message')}")
logger.info(f"CloudMail: Users added: {len(users)}")
return data
except Exception as e:
if "httpx" in str(type(e).__module__):
raise RuntimeError(f"CloudMail API network error: {e}")
raise
async def close(self):
"""清理资源"""
if self._client:
await self._client.aclose()
logger.debug("CloudMail: HTTP client closed")
__all__ = ["MailHandler", "IMAPMailHandler", "CloudMailHandler"]