This commit is contained in:
dela
2026-01-26 15:04:02 +08:00
commit 4813449f9c
31 changed files with 8439 additions and 0 deletions

7
utils/__init__.py Normal file
View File

@@ -0,0 +1,7 @@
"""
OpenAI 账号注册系统 - 工具模块
包含日志、加密、邮件等辅助工具
"""
__version__ = "0.1.0"

161
utils/crypto.py Normal file
View File

@@ -0,0 +1,161 @@
"""
加密与指纹生成工具模块
提供以下功能:
- 生成 OpenAI 设备 ID (oai-did)
- 生成符合要求的强密码
- Proof of Work 挑战解决(预留接口)
"""
import uuid
import secrets
import string
from typing import Optional
def generate_oai_did() -> str:
"""
生成 OpenAI 设备 ID
使用 UUIDv4 格式,例如:
"a1b2c3d4-e5f6-4789-a012-b3c4d5e6f7a8"
返回:
36 个字符的 UUID 字符串(包含 4 个连字符)
"""
return str(uuid.uuid4())
def generate_random_password(length: int = 12) -> str:
"""
生成符合 OpenAI 要求的强密码
要求:
- 长度8-16 位(默认 12 位)
- 必须包含:大写字母、小写字母、数字
- 不包含特殊符号(避免编码问题)
参数:
length: 密码长度(默认 12
返回:
符合要求的随机密码
"""
if length < 8 or length > 16:
raise ValueError("Password length must be between 8 and 16")
# 字符集:大写字母 + 小写字母 + 数字
chars = string.ascii_letters + string.digits
# 重复生成直到满足所有条件
max_attempts = 100
for _ in range(max_attempts):
password = ''.join(secrets.choice(chars) for _ in range(length))
# 验证条件
has_lower = any(c.islower() for c in password)
has_upper = any(c.isupper() for c in password)
has_digit = any(c.isdigit() for c in password)
if has_lower and has_upper and has_digit:
return password
# 如果随机生成失败,手动构造一个符合要求的密码
# 确保至少有一个大写、一个小写、一个数字
parts = [
secrets.choice(string.ascii_uppercase), # 至少一个大写
secrets.choice(string.ascii_lowercase), # 至少一个小写
secrets.choice(string.digits), # 至少一个数字
]
# 填充剩余长度
remaining = length - len(parts)
parts.extend(secrets.choice(chars) for _ in range(remaining))
# 打乱顺序
password_list = list(parts)
for i in range(len(password_list) - 1, 0, -1):
j = secrets.randbelow(i + 1)
password_list[i], password_list[j] = password_list[j], password_list[i]
return ''.join(password_list)
def generate_proof_of_work(seed: str, difficulty: str, **kwargs) -> str:
"""
解决 Sentinel 的 Proof of Work 挑战
参数:
seed: PoW 种子值
difficulty: 难度参数
**kwargs: 其他可能需要的参数
返回:
PoW 答案字符串
抛出:
NotImplementedError: 用户需要实现此方法
"""
raise NotImplementedError(
"Proof of Work solver not implemented. "
"User has existing Sentinel solution that should be integrated here.\n"
"Integration options:\n"
"1. Call external script/service\n"
"2. Import existing Python module\n"
"3. HTTP API call to solver service"
)
def validate_oai_did(oai_did: str) -> bool:
"""
验证 oai-did 格式是否正确
参数:
oai_did: 待验证的设备 ID
返回:
True 如果格式正确,否则 False
"""
try:
# 尝试解析为 UUID
uuid_obj = uuid.UUID(oai_did)
# 验证是 UUIDv4
return uuid_obj.version == 4
except (ValueError, AttributeError):
return False
def validate_password(password: str) -> tuple[bool, Optional[str]]:
"""
验证密码是否符合 OpenAI 要求
参数:
password: 待验证的密码
返回:
(是否有效, 错误信息)
"""
if len(password) < 8 or len(password) > 16:
return False, "Password must be 8-16 characters"
if not any(c.islower() for c in password):
return False, "Password must contain at least one lowercase letter"
if not any(c.isupper() for c in password):
return False, "Password must contain at least one uppercase letter"
if not any(c.isdigit() for c in password):
return False, "Password must contain at least one digit"
return True, None
# 导出主要接口
__all__ = [
"generate_oai_did",
"generate_random_password",
"generate_proof_of_work",
"validate_oai_did",
"validate_password",
]

81
utils/fingerprint.py Normal file
View File

@@ -0,0 +1,81 @@
"""
浏览器指纹生成模块
用于生成符合 OpenAI Sentinel 要求的浏览器指纹配置数组
"""
import uuid
import time
from typing import List
class BrowserFingerprint:
"""
浏览器指纹生成器
生成 Sentinel SDK 所需的配置数组18 个元素)
"""
def __init__(self, session_id: str = None):
"""
初始化浏览器指纹
参数:
session_id: 会话 IDoai-did如果不提供则自动生成
"""
self.session_id = session_id or str(uuid.uuid4())
self.start_time = time.time()
def get_config_array(self) -> List:
"""
获取 Sentinel SDK 配置数组
返回:
包含 18 个元素的指纹数组
数组结构(从 JS 逆向):
[0] screen dimensions (width*height)
[1] timestamp
[2] memory (hardwareConcurrency)
[3] nonce (动态值PoW 时会修改)
[4] user agent
[5] random element
[6] script src
[7] language
[8] languages (joined)
[9] elapsed time (ms)
[10] random function test
[11] keys
[12] window keys
[13] performance.now()
[14] uuid (session_id)
[15] URL params
[16] hardware concurrency
[17] timeOrigin
"""
elapsed_ms = int((time.time() - self.start_time) * 1000)
return [
1920 * 1080, # [0] screen dimensions
str(int(time.time() * 1000)), # [1] timestamp
8, # [2] hardware concurrency
0, # [3] nonce (placeholder)
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.0.0 Safari/537.36", # [4] UA
str(0.123456789), # [5] random element
"https://chatgpt.com/_next/static/chunks/sentinel.js", # [6] script src
"en-US", # [7] language
"en-US,en", # [8] languages
elapsed_ms, # [9] elapsed time
"", # [10] random function
"", # [11] keys
"", # [12] window keys
elapsed_ms, # [13] performance.now()
self.session_id, # [14] uuid (oai-did)
"", # [15] URL params
8, # [16] hardware concurrency
int(time.time() * 1000) - elapsed_ms, # [17] timeOrigin
]
# 导出
__all__ = ["BrowserFingerprint"]

112
utils/logger.py Normal file
View File

@@ -0,0 +1,112 @@
"""
日志系统模块
使用 loguru 提供彩色日志输出和文件记录功能
- 彩色控制台输出
- 按账号创建独立日志文件
- 敏感信息脱敏(邮箱、密码)
"""
from loguru import logger
import sys
import time
from pathlib import Path
import re
def mask_sensitive_data(text: str) -> str:
"""
脱敏处理敏感信息
- 邮箱保留前2位和@后的域名,中间用***代替
- 密码:完全替换为 ********
"""
# 邮箱脱敏: user@example.com -> us***@example.com
text = re.sub(
r'\b([a-zA-Z0-9]{1,2})[a-zA-Z0-9._-]*@([a-zA-Z0-9.-]+\.[a-zA-Z]{2,})\b',
r'\1***@\2',
text
)
# 密码脱敏: password=abc123 -> password=********
text = re.sub(
r'(password["\']?\s*[:=]\s*["\']?)([^"\'\s,}]+)(["\']?)',
r'\1********\3',
text,
flags=re.IGNORECASE
)
return text
def setup_logger(log_level: str = "INFO"):
"""
配置全局日志系统
参数:
log_level: 日志级别 (DEBUG, INFO, WARNING, ERROR)
"""
# 移除默认处理器
logger.remove()
# 添加彩色控制台输出
logger.add(
sys.stderr,
format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <level>{message}</level>",
colorize=True,
level=log_level,
filter=lambda record: mask_sensitive_data(str(record["message"]))
)
# 确保 logs 目录存在
Path("logs").mkdir(exist_ok=True)
# 添加通用日志文件(所有账号的汇总日志)
logger.add(
"logs/app_{time:YYYY-MM-DD}.log",
rotation="00:00", # 每天凌晨轮转
retention="7 days", # 保留7天
compression="zip", # 压缩旧日志
format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {message}",
level="DEBUG", # 文件记录所有级别
enqueue=True # 异步写入
)
logger.info("Logger initialized")
def setup_account_logger(email: str) -> str:
"""
为特定账号创建独立日志文件
参数:
email: 注册邮箱
返回:
日志文件路径
"""
# 文件名安全处理:替换特殊字符
safe_email = email.replace("@", "_").replace(".", "_")
timestamp = int(time.time())
log_path = f"logs/account_{safe_email}_{timestamp}.log"
# 添加账号专属日志文件
logger.add(
log_path,
format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {message}",
level="DEBUG",
rotation="10 MB",
retention="30 days",
filter=lambda record: email in str(record["message"]) # 只记录相关日志
)
logger.info(f"Account logger created: {log_path}")
return log_path
# 初始化默认配置
setup_logger()
# 导出主要接口
__all__ = ["logger", "setup_logger", "setup_account_logger", "mask_sensitive_data"]

696
utils/mail_box.py Normal file
View File

@@ -0,0 +1,696 @@
"""
邮件接码处理器
用于接收和解析 OpenAI 发送的验证码邮件
⚠️ 本模块提供预留接口,用户需要根据实际情况配置邮箱服务
支持的邮箱方案:
1. IMAP 收件 (Gmail, Outlook, 自建邮箱)
2. 临时邮箱 API (TempMail, Guerrilla Mail, etc.)
3. 邮件转发服务
"""
from typing import Optional, Dict, Any, List
import re
import time
import asyncio
from utils.logger import logger
class MailHandler:
"""
邮件接码处理器
⚠️ 预留接口 - 用户需要配置实际的邮箱服务
使用场景:
- 接收 OpenAI 发送的 6 位数字 OTP 验证码
- 解析邮件内容提取验证码
- 支持超时和重试机制
"""
# OTP 邮件特征
OTP_SUBJECT_KEYWORDS = ["openai", "verification", "verify", "code"]
OTP_SENDER = "noreply@tm.openai.com" # OpenAI 发件人地址
def __init__(self, config: Optional[Dict[str, Any]] = None):
"""
初始化邮件处理器
参数:
config: 邮箱配置字典,可能包含:
- type: "imap" | "tempmail" | "api" | "cloudmail"
- host: IMAP 服务器地址 (如果使用 IMAP)
- port: IMAP 端口 (默认 993)
- username: 邮箱用户名
- password: 邮箱密码
- api_key: 临时邮箱 API Key (如果使用 API)
"""
self.config = config or {}
self.mail_type = self.config.get("type", "not_configured")
if not config:
logger.warning(
"MailHandler initialized without configuration. "
"OTP retrieval will fail until configured."
)
else:
logger.info(f"MailHandler initialized with type: {self.mail_type}")
@staticmethod
def create(config: Optional[Dict[str, Any]]) -> "MailHandler":
"""
工厂方法:创建合适的邮件处理器
根据配置中的 type 字段自动选择正确的 handler 实现
参数:
config: 邮箱配置字典
返回:
MailHandler 实例IMAPMailHandler 或 CloudMailHandler
"""
if not config:
return MailHandler(config)
mail_type = config.get("type", "manual")
if mail_type == "imap":
return IMAPMailHandler(config)
elif mail_type == "cloudmail":
return CloudMailHandler(config)
else:
# 默认处理器(会抛出 NotImplementedError
return MailHandler(config)
async def wait_for_otp(
self,
email: str,
timeout: int = 300,
check_interval: int = 5
) -> str:
"""
等待并提取 OTP 验证码
⚠️ 预留接口 - 用户需要实现此方法
参数:
email: 注册邮箱地址
timeout: 超时时间(秒),默认 300 秒5 分钟)
check_interval: 检查间隔(秒),默认 5 秒
返回:
6 位数字验证码(例如 "123456"
抛出:
NotImplementedError: 用户需要实现此方法
TimeoutError: 超时未收到邮件
ValueError: 邮件格式错误,无法提取 OTP
集成示例:
```python
# 方案 1: 使用 IMAP (imap-tools 库)
from imap_tools import MailBox
with MailBox(self.config["host"]).login(
self.config["username"],
self.config["password"]
) as mailbox:
for msg in mailbox.fetch(AND(from_=self.OTP_SENDER, seen=False)):
otp = self._extract_otp(msg.text)
if otp:
return otp
# 方案 2: 使用临时邮箱 API
import httpx
async with httpx.AsyncClient() as client:
resp = await client.get(
f"https://tempmail.api/messages?email={email}",
headers={"Authorization": f"Bearer {self.config['api_key']}"}
)
messages = resp.json()
for msg in messages:
otp = self._extract_otp(msg["body"])
if otp:
return otp
# 方案 3: 手动输入(调试用)
print(f"Please enter OTP for {email}:")
return input().strip()
```
"""
logger.info(
f"Waiting for OTP for {email} "
f"(timeout: {timeout}s, check_interval: {check_interval}s)"
)
raise NotImplementedError(
"❌ Mail handler not configured.\n\n"
"User needs to configure email service for OTP retrieval.\n\n"
"Configuration options:\n\n"
"1. IMAP (Gmail, Outlook, custom):\n"
" config = {\n"
" 'type': 'imap',\n"
" 'host': 'imap.gmail.com',\n"
" 'port': 993,\n"
" 'username': 'your@email.com',\n"
" 'password': 'app_password'\n"
" }\n\n"
"2. Temporary email API:\n"
" config = {\n"
" 'type': 'tempmail',\n"
" 'api_key': 'YOUR_API_KEY',\n"
" 'api_endpoint': 'https://api.tempmail.com'\n"
" }\n\n"
"3. Manual input (for debugging):\n"
" config = {'type': 'manual'}\n\n"
"Example implementation location: utils/mail_box.py -> wait_for_otp()"
)
def _extract_otp(self, text: str) -> Optional[str]:
"""
从邮件正文中提取 OTP 验证码
OpenAI 邮件格式示例:
"Your OpenAI verification code is: 123456"
"Enter this code: 123456"
参数:
text: 邮件正文(纯文本或 HTML
返回:
6 位数字验证码,未找到则返回 None
"""
# 清理 HTML 标签(如果有)
text = re.sub(r'<[^>]+>', ' ', text)
# 常见的 OTP 模式
patterns = [
r'verification code is[:\s]+(\d{6})', # "verification code is: 123456"
r'code[:\s]+(\d{6})', # "code: 123456"
r'enter[:\s]+(\d{6})', # "enter: 123456"
r'(\d{6})', # 任意 6 位数字(最后尝试)
]
for pattern in patterns:
match = re.search(pattern, text, re.IGNORECASE)
if match:
otp = match.group(1)
logger.info(f"OTP extracted: {otp}")
return otp
logger.warning("Failed to extract OTP from email text")
return None
def _check_imap(self, email: str, timeout: int) -> Optional[str]:
"""
使用 IMAP 检查邮件(预留方法)
参数:
email: 注册邮箱
timeout: 超时时间(秒)
返回:
OTP 验证码,未找到则返回 None
"""
# TODO: 用户实现 IMAP 检查逻辑
# 需要安装: pip install imap-tools
raise NotImplementedError("IMAP checking not implemented")
def _check_tempmail_api(self, email: str, timeout: int) -> Optional[str]:
"""
使用临时邮箱 API 检查邮件(预留方法)
参数:
email: 注册邮箱
timeout: 超时时间(秒)
返回:
OTP 验证码,未找到则返回 None
"""
# TODO: 用户实现临时邮箱 API 调用逻辑
raise NotImplementedError("Temp mail API not implemented")
def generate_temp_email(self) -> str:
"""
生成临时邮箱地址(可选功能)
⚠️ 预留接口 - 如果使用临时邮箱服务,需要实现此方法
返回:
临时邮箱地址(例如 "random123@tempmail.com"
抛出:
NotImplementedError: 用户需要实现此方法
"""
raise NotImplementedError(
"Temp email generation not implemented. "
"Integrate a temp mail service API if needed."
)
def verify_email_deliverability(self, email: str) -> bool:
"""
验证邮箱地址是否可以接收邮件(可选功能)
参数:
email: 邮箱地址
返回:
True 如果邮箱有效且可接收邮件,否则 False
"""
# 基本格式验证
email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
if not re.match(email_pattern, email):
logger.warning(f"Invalid email format: {email}")
return False
# TODO: 用户可以添加更严格的验证逻辑
# 例如DNS MX 记录查询、SMTP 验证等
logger.info(f"Email format valid: {email}")
return True
class IMAPMailHandler(MailHandler):
"""
基于 IMAP 的邮件处理器(完整实现示例)
⚠️ 这是一个参考实现,用户可以根据需要修改
依赖:
pip install imap-tools
"""
async def wait_for_otp(
self,
email: str,
timeout: int = 300,
check_interval: int = 5
) -> str:
"""
使用 IMAP 等待 OTP 邮件
参数:
email: 注册邮箱
timeout: 超时时间(秒)
check_interval: 检查间隔(秒)
返回:
6 位数字验证码
抛出:
ImportError: 未安装 imap-tools
TimeoutError: 超时未收到邮件
ValueError: 配置错误或邮件格式错误
"""
try:
from imap_tools import MailBox, AND
except ImportError:
raise ImportError(
"imap-tools not installed. Install with: pip install imap-tools"
)
if not all(k in self.config for k in ["host", "username", "password"]):
raise ValueError(
"IMAP configuration incomplete. Required: host, username, password"
)
start_time = time.time()
logger.info(f"Connecting to IMAP server: {self.config['host']}")
while time.time() - start_time < timeout:
try:
with MailBox(self.config["host"]).login(
self.config["username"],
self.config["password"]
) as mailbox:
# 查找未读邮件,来自 OpenAI
for msg in mailbox.fetch(
AND(from_=self.OTP_SENDER, seen=False),
reverse=True, # 最新的邮件优先
limit=10
):
# 检查主题是否包含 OTP 关键词
if any(kw in msg.subject.lower() for kw in self.OTP_SUBJECT_KEYWORDS):
otp = self._extract_otp(msg.text or msg.html)
if otp:
logger.success(f"OTP received: {otp}")
# 标记为已读
mailbox.flag([msg.uid], ['\\Seen'], True)
return otp
except Exception as e:
logger.warning(f"IMAP check failed: {e}")
# 等待下一次检查
elapsed = time.time() - start_time
remaining = timeout - elapsed
logger.debug(
f"No OTP found, waiting {check_interval}s "
f"(remaining: {int(remaining)}s)"
)
time.sleep(check_interval)
raise TimeoutError(
f"Timeout waiting for OTP email (timeout: {timeout}s). "
f"Email: {email}, Sender: {self.OTP_SENDER}"
)
class CloudMailHandler(MailHandler):
"""
Cloud Mail API handler with external token management
使用外部预生成的 Token 管理邮件,不调用 genToken API
依赖:
pip install httpx
配置示例:
config = {
"type": "cloudmail",
"api_base_url": "https://your-cloudmail-domain.com",
"token": "9f4e298e-7431-4c76-bc15-4931c3a73984",
"target_email": "user@example.com" # 可选
}
"""
def __init__(self, config: Optional[Dict[str, Any]] = None):
"""
初始化 Cloud Mail handler
参数:
config: 配置字典,必填项:
- api_base_url: Cloud Mail API 基础 URL
- token: 预生成的身份令牌
- target_email: (可选) 指定监控的邮箱地址
"""
super().__init__(config)
# 验证必填配置
required = ["api_base_url", "token"]
missing = [key for key in required if not self.config.get(key)]
if missing:
raise ValueError(
f"CloudMail configuration incomplete. Missing: {', '.join(missing)}\n"
f"Required: api_base_url, token"
)
self.api_base_url = self.config["api_base_url"].rstrip("/")
self.token = self.config["token"]
self.target_email = self.config.get("target_email")
self.domain = self.config.get("domain") # 邮箱域名
self._client: Optional[Any] = None
logger.info(f"CloudMailHandler initialized (API: {self.api_base_url}, Domain: {self.domain or 'N/A'})")
async def _get_client(self):
"""懒加载 HTTP 客户端"""
if self._client is None:
try:
import httpx
except ImportError:
raise ImportError(
"httpx not installed. Install with: pip install httpx"
)
self._client = httpx.AsyncClient(
timeout=30.0,
headers={
"Content-Type": "application/json",
"Authorization": self.token
}
)
return self._client
async def _query_emails(
self,
to_email: str,
send_email: Optional[str] = None,
subject: Optional[str] = None,
time_sort: str = "desc",
num: int = 1,
size: int = 20
) -> List[Dict[str, Any]]:
"""
查询邮件列表 (POST /api/public/emailList)
参数:
to_email: 收件人邮箱
send_email: 发件人邮箱(可选,支持 % 通配符)
subject: 主题关键词(可选)
time_sort: 时间排序 (desc/asc)
num: 页码(从 1 开始)
size: 每页数量
返回:
邮件列表
"""
client = await self._get_client()
url = f"{self.api_base_url}/api/public/emailList"
payload = {
"toEmail": to_email,
"type": 0, # 0=收件箱
"isDel": 0, # 0=未删除
"timeSort": time_sort,
"num": num,
"size": size
}
# 可选参数
if send_email:
payload["sendEmail"] = send_email
if subject:
payload["subject"] = subject
try:
resp = await client.post(url, json=payload)
# 检查认证错误
if resp.status_code in [401, 403]:
raise RuntimeError(
"CloudMail token expired or invalid.\n"
"Please regenerate token and update MAIL_CLOUDMAIL_TOKEN in .env\n"
"Steps:\n"
"1. Login to Cloud Mail with admin account\n"
"2. Call POST /api/public/genToken to generate new token\n"
"3. Update MAIL_CLOUDMAIL_TOKEN in .env\n"
"4. Restart the program"
)
if resp.status_code != 200:
raise RuntimeError(
f"CloudMail API error: {resp.status_code} - {resp.text[:200]}"
)
data = resp.json()
# 检查业务逻辑错误
if data.get("code") != 200:
error_msg = data.get("message", "Unknown error")
raise RuntimeError(
f"CloudMail API error: {error_msg} (code: {data.get('code')})"
)
# 返回邮件列表
result = data.get("data", {})
# CloudMail API 可能返回两种格式:
# 1. {"data": {"list": [...]}} - 标准格式
# 2. {"data": [...]} - 直接列表格式
if isinstance(result, list):
emails = result
elif isinstance(result, dict):
emails = result.get("list", [])
else:
emails = []
logger.debug(f"CloudMail: Fetched {len(emails)} emails")
return emails
except Exception as e:
if "httpx" in str(type(e).__module__):
# httpx 网络错误
raise RuntimeError(f"CloudMail API network error: {e}")
else:
# 重新抛出其他错误
raise
async def ensure_email_exists(self, email: str) -> bool:
"""
确保邮箱账户存在(如果不存在则创建)
用于在注册流程开始前自动创建 Cloud Mail 邮箱账户
参数:
email: 邮箱地址
返回:
True 如果邮箱已存在或成功创建
"""
try:
# 先尝试查询邮箱(检查是否存在)
logger.debug(f"CloudMail: Checking if {email} exists...")
emails = await self._query_emails(
to_email=email,
size=1
)
# 如果能查询到,说明邮箱存在
logger.debug(f"CloudMail: Email {email} already exists")
return True
except Exception as e:
# 查询失败可能是邮箱不存在,尝试创建
logger.info(f"CloudMail: Creating email account {email}...")
try:
await self.add_users([{"email": email}])
logger.success(f"CloudMail: Email {email} created successfully")
return True
except Exception as create_error:
logger.error(f"CloudMail: Failed to create email {email}: {create_error}")
raise
async def wait_for_otp(
self,
email: str,
timeout: int = 300,
check_interval: int = 5
) -> str:
"""
等待 OTP 邮件(轮询实现)
参数:
email: 注册邮箱
timeout: 超时时间(秒)
check_interval: 检查间隔(秒)
返回:
6 位数字验证码
抛出:
TimeoutError: 超时未收到邮件
ValueError: 邮件格式错误,无法提取 OTP
"""
start_time = time.time()
logger.info(
f"CloudMail: Waiting for OTP for {email} "
f"(timeout: {timeout}s, interval: {check_interval}s)"
)
while time.time() - start_time < timeout:
try:
# 查询最近的邮件
emails = await self._query_emails(
to_email=email,
send_email=self.OTP_SENDER,
time_sort="desc",
size=10
)
# 检查每封邮件
for msg in emails:
subject = msg.get("subject", "").lower()
# 检查主题是否包含 OTP 关键词
if any(kw in subject for kw in self.OTP_SUBJECT_KEYWORDS):
# 尝试从邮件内容提取 OTP
content = msg.get("text") or msg.get("content") or ""
otp = self._extract_otp(content)
if otp:
logger.success(f"CloudMail: OTP received: {otp}")
return otp
# 等待下一次检查
elapsed = time.time() - start_time
remaining = timeout - elapsed
logger.debug(
f"CloudMail: No OTP found, waiting {check_interval}s "
f"(remaining: {int(remaining)}s)"
)
await asyncio.sleep(check_interval)
except Exception as e:
logger.warning(f"CloudMail: Query error: {e}")
await asyncio.sleep(check_interval)
raise TimeoutError(
f"Timeout waiting for OTP email (timeout: {timeout}s). "
f"Email: {email}, Sender: {self.OTP_SENDER}"
)
async def add_users(
self,
users: List[Dict[str, str]]
) -> Dict[str, Any]:
"""
添加用户 (POST /api/public/addUser)
参数:
users: 用户列表,格式:
[
{
"email": "test@example.com",
"password": "optional", # 可选
"roleName": "optional" # 可选
}
]
返回:
API 响应数据
"""
client = await self._get_client()
url = f"{self.api_base_url}/api/public/addUser"
payload = {"list": users}
try:
resp = await client.post(url, json=payload)
# 检查认证错误
if resp.status_code in [401, 403]:
raise RuntimeError(
"CloudMail token expired or invalid. "
"Please regenerate token and update MAIL_CLOUDMAIL_TOKEN in .env"
)
if resp.status_code != 200:
raise RuntimeError(
f"CloudMail addUser API error: {resp.status_code} - {resp.text[:200]}"
)
data = resp.json()
# 检查业务逻辑错误
if data.get("code") != 200:
error_msg = data.get("message", "Unknown error")
raise RuntimeError(
f"CloudMail addUser error: {error_msg} (code: {data.get('code')})"
)
logger.info(f"CloudMail: Users added successfully: {len(users)} users")
return data
except Exception as e:
if "httpx" in str(type(e).__module__):
raise RuntimeError(f"CloudMail API network error: {e}")
else:
raise
async def close(self):
"""清理资源"""
if self._client:
await self._client.aclose()
logger.debug("CloudMail: HTTP client closed")
# 导出主要接口
__all__ = [
"MailHandler",
"IMAPMailHandler",
"CloudMailHandler",
]