313 lines
11 KiB
Python
313 lines
11 KiB
Python
"""
|
|
邮件接码处理器 - 用于接收和解析 OpenAI 发送的验证码邮件
|
|
"""
|
|
|
|
from typing import Optional, Dict, Any, List
|
|
import re
|
|
import time
|
|
import asyncio
|
|
from utils.logger import logger
|
|
|
|
|
|
class MailHandler:
|
|
"""邮件接码处理器基类"""
|
|
|
|
OTP_SUBJECT_KEYWORDS = ["openai", "verification", "verify", "code"]
|
|
OTP_SENDER = "noreply@tm.openai.com"
|
|
|
|
def __init__(self, config: Optional[Dict[str, Any]] = None):
|
|
self.config = config or {}
|
|
self.mail_type = self.config.get("type", "not_configured")
|
|
|
|
if not config:
|
|
logger.warning("MailHandler initialized without configuration.")
|
|
else:
|
|
logger.info(f"MailHandler initialized with type: {self.mail_type}")
|
|
|
|
@staticmethod
|
|
def create(config: Optional[Dict[str, Any]]) -> "MailHandler":
|
|
"""工厂方法:根据配置创建对应的邮件处理器"""
|
|
if not config:
|
|
return MailHandler(config)
|
|
|
|
mail_type = config.get("type", "manual")
|
|
|
|
if mail_type == "imap":
|
|
return IMAPMailHandler(config)
|
|
elif mail_type == "cloudmail":
|
|
return CloudMailHandler(config)
|
|
else:
|
|
return MailHandler(config)
|
|
|
|
async def wait_for_otp(self, email: str, timeout: int = 300, check_interval: int = 5) -> str:
|
|
"""等待并提取 OTP 验证码(需子类实现)"""
|
|
logger.info(f"Waiting for OTP for {email} (timeout: {timeout}s)")
|
|
raise NotImplementedError("Mail handler not configured.")
|
|
|
|
def _extract_otp(self, text: str) -> Optional[str]:
|
|
"""从邮件正文中提取 6 位 OTP 验证码"""
|
|
text = re.sub(r'<[^>]+>', ' ', text)
|
|
|
|
patterns = [
|
|
r'verification code is[:\s]+(\d{6})',
|
|
r'code[:\s]+(\d{6})',
|
|
r'enter[:\s]+(\d{6})',
|
|
r'(\d{6})',
|
|
]
|
|
|
|
for pattern in patterns:
|
|
match = re.search(pattern, text, re.IGNORECASE)
|
|
if match:
|
|
otp = match.group(1)
|
|
logger.info(f"OTP extracted: {otp}")
|
|
return otp
|
|
|
|
logger.warning("Failed to extract OTP from email text")
|
|
return None
|
|
|
|
def verify_email_deliverability(self, email: str) -> bool:
|
|
"""验证邮箱地址格式是否有效"""
|
|
email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
|
|
if not re.match(email_pattern, email):
|
|
logger.warning(f"Invalid email format: {email}")
|
|
return False
|
|
return True
|
|
|
|
|
|
class IMAPMailHandler(MailHandler):
|
|
"""基于 IMAP 的邮件处理器"""
|
|
|
|
async def wait_for_otp(self, email: str, timeout: int = 300, check_interval: int = 5) -> str:
|
|
"""使用 IMAP 等待 OTP 邮件"""
|
|
try:
|
|
from imap_tools import MailBox, AND
|
|
except ImportError:
|
|
raise ImportError("imap-tools not installed. Install with: pip install imap-tools")
|
|
|
|
if not all(k in self.config for k in ["host", "username", "password"]):
|
|
raise ValueError("IMAP configuration incomplete. Required: host, username, password")
|
|
|
|
start_time = time.time()
|
|
logger.info(f"Connecting to IMAP server: {self.config['host']}")
|
|
|
|
while time.time() - start_time < timeout:
|
|
try:
|
|
with MailBox(self.config["host"]).login(
|
|
self.config["username"],
|
|
self.config["password"]
|
|
) as mailbox:
|
|
for msg in mailbox.fetch(
|
|
AND(from_=self.OTP_SENDER, seen=False),
|
|
reverse=True,
|
|
limit=10
|
|
):
|
|
if any(kw in msg.subject.lower() for kw in self.OTP_SUBJECT_KEYWORDS):
|
|
otp = self._extract_otp(msg.text or msg.html)
|
|
if otp:
|
|
logger.success(f"OTP received: {otp}")
|
|
mailbox.flag([msg.uid], ['\\Seen'], True)
|
|
return otp
|
|
except Exception as e:
|
|
logger.warning(f"IMAP check failed: {e}")
|
|
|
|
logger.debug(f"No OTP found, waiting {check_interval}s")
|
|
time.sleep(check_interval)
|
|
|
|
raise TimeoutError(f"Timeout waiting for OTP email ({timeout}s)")
|
|
|
|
|
|
class CloudMailHandler(MailHandler):
|
|
"""Cloud Mail API 邮件处理器"""
|
|
|
|
def __init__(self, config: Optional[Dict[str, Any]] = None):
|
|
super().__init__(config)
|
|
|
|
required = ["api_base_url", "token"]
|
|
missing = [key for key in required if not self.config.get(key)]
|
|
if missing:
|
|
raise ValueError(f"CloudMail config incomplete. Missing: {', '.join(missing)}")
|
|
|
|
self.api_base_url = self.config["api_base_url"].rstrip("/")
|
|
self.token = self.config["token"]
|
|
self.target_email = self.config.get("target_email")
|
|
self.domain = self.config.get("domain")
|
|
self._client: Optional[Any] = None
|
|
|
|
logger.info(f"CloudMailHandler initialized (API: {self.api_base_url})")
|
|
|
|
async def _get_client(self):
|
|
"""懒加载 HTTP 客户端"""
|
|
if self._client is None:
|
|
try:
|
|
import httpx
|
|
except ImportError:
|
|
raise ImportError("httpx not installed. Install with: pip install httpx")
|
|
|
|
self._client = httpx.AsyncClient(
|
|
timeout=30.0,
|
|
headers={
|
|
"Content-Type": "application/json",
|
|
"Authorization": self.token
|
|
}
|
|
)
|
|
return self._client
|
|
|
|
async def _query_emails(
|
|
self,
|
|
to_email: str,
|
|
send_email: Optional[str] = None,
|
|
subject: Optional[str] = None,
|
|
time_sort: str = "desc",
|
|
num: int = 1,
|
|
size: int = 20
|
|
) -> List[Dict[str, Any]]:
|
|
"""查询邮件列表 (POST /api/public/emailList)"""
|
|
client = await self._get_client()
|
|
url = f"{self.api_base_url}/api/public/emailList"
|
|
|
|
payload = {
|
|
"toEmail": to_email,
|
|
"type": 0,
|
|
"isDel": 0,
|
|
"timeSort": time_sort,
|
|
"num": num,
|
|
"size": size
|
|
}
|
|
|
|
if send_email:
|
|
payload["sendEmail"] = send_email
|
|
if subject:
|
|
payload["subject"] = subject
|
|
|
|
try:
|
|
resp = await client.post(url, json=payload)
|
|
|
|
if resp.status_code in [401, 403]:
|
|
raise RuntimeError("CloudMail token expired or invalid.")
|
|
|
|
if resp.status_code != 200:
|
|
raise RuntimeError(f"CloudMail API error: {resp.status_code}")
|
|
|
|
data = resp.json()
|
|
if data.get("code") != 200:
|
|
raise RuntimeError(f"CloudMail API error: {data.get('message')}")
|
|
|
|
result = data.get("data", {})
|
|
if isinstance(result, list):
|
|
emails = result
|
|
elif isinstance(result, dict):
|
|
emails = result.get("list", [])
|
|
else:
|
|
emails = []
|
|
|
|
logger.debug(f"CloudMail: Fetched {len(emails)} emails")
|
|
return emails
|
|
|
|
except Exception as e:
|
|
if "httpx" in str(type(e).__module__):
|
|
raise RuntimeError(f"CloudMail API network error: {e}")
|
|
raise
|
|
|
|
async def ensure_email_exists(self, email: str) -> bool:
|
|
"""确保邮箱账户存在(如果不存在则创建)"""
|
|
logger.info(f"CloudMail: Creating email account {email}...")
|
|
|
|
try:
|
|
result = await self.add_users([{"email": email}])
|
|
logger.success(f"CloudMail: Email {email} created successfully")
|
|
logger.debug(f"CloudMail: API response: {result}")
|
|
return True
|
|
|
|
except RuntimeError as e:
|
|
error_msg = str(e).lower()
|
|
if "already" in error_msg or "exist" in error_msg or "duplicate" in error_msg:
|
|
logger.debug(f"CloudMail: Email {email} already exists")
|
|
return True
|
|
logger.error(f"CloudMail: Failed to create email {email}: {e}")
|
|
raise
|
|
|
|
except Exception as e:
|
|
logger.error(f"CloudMail: Failed to create email {email}: {e}")
|
|
raise
|
|
|
|
async def wait_for_otp(self, email: str, timeout: int = 300, check_interval: int = 5) -> str:
|
|
"""等待 OTP 邮件(轮询实现)"""
|
|
start_time = time.time()
|
|
logger.info(f"CloudMail: Waiting for OTP for {email} (timeout: {timeout}s)")
|
|
|
|
while time.time() - start_time < timeout:
|
|
try:
|
|
# 模糊匹配 openai 发件人
|
|
emails = await self._query_emails(
|
|
to_email=email,
|
|
send_email="%openai%",
|
|
time_sort="desc",
|
|
size=10
|
|
)
|
|
|
|
# 备选:不过滤发件人
|
|
if not emails:
|
|
emails = await self._query_emails(
|
|
to_email=email,
|
|
time_sort="desc",
|
|
size=10
|
|
)
|
|
|
|
for msg in emails:
|
|
subject = msg.get("subject", "").lower()
|
|
sender = msg.get("sendEmail", "").lower()
|
|
is_from_openai = "openai" in sender or "noreply" in sender
|
|
|
|
if any(kw in subject for kw in self.OTP_SUBJECT_KEYWORDS) or is_from_openai:
|
|
content = msg.get("text") or msg.get("content") or ""
|
|
otp = self._extract_otp(content)
|
|
if otp:
|
|
logger.success(f"CloudMail: OTP received: {otp} (from: {sender})")
|
|
return otp
|
|
|
|
remaining = timeout - (time.time() - start_time)
|
|
logger.debug(f"CloudMail: No OTP found, waiting {check_interval}s (remaining: {int(remaining)}s)")
|
|
await asyncio.sleep(check_interval)
|
|
|
|
except Exception as e:
|
|
logger.warning(f"CloudMail: Query error: {e}")
|
|
await asyncio.sleep(check_interval)
|
|
|
|
raise TimeoutError(f"Timeout waiting for OTP email ({timeout}s)")
|
|
|
|
async def add_users(self, users: List[Dict[str, str]]) -> Dict[str, Any]:
|
|
"""添加用户 (POST /api/public/addUser)"""
|
|
client = await self._get_client()
|
|
url = f"{self.api_base_url}/api/public/addUser"
|
|
payload = {"list": users}
|
|
|
|
try:
|
|
resp = await client.post(url, json=payload)
|
|
|
|
if resp.status_code in [401, 403]:
|
|
raise RuntimeError("CloudMail token expired or invalid.")
|
|
|
|
if resp.status_code != 200:
|
|
raise RuntimeError(f"CloudMail addUser error: {resp.status_code}")
|
|
|
|
data = resp.json()
|
|
if data.get("code") != 200:
|
|
raise RuntimeError(f"CloudMail addUser error: {data.get('message')}")
|
|
|
|
logger.info(f"CloudMail: Users added: {len(users)}")
|
|
return data
|
|
|
|
except Exception as e:
|
|
if "httpx" in str(type(e).__module__):
|
|
raise RuntimeError(f"CloudMail API network error: {e}")
|
|
raise
|
|
|
|
async def close(self):
|
|
"""清理资源"""
|
|
if self._client:
|
|
await self._client.aclose()
|
|
logger.debug("CloudMail: HTTP client closed")
|
|
|
|
|
|
__all__ = ["MailHandler", "IMAPMailHandler", "CloudMailHandler"]
|