清理代码

This commit is contained in:
dela
2026-01-26 16:25:22 +08:00
parent 4813449f9c
commit 70627f09fe
18 changed files with 494 additions and 3074 deletions

View File

@@ -73,6 +73,10 @@ SENTINEL_API_KEY=
# Python 模块名称(如果使用 module # Python 模块名称(如果使用 module
SENTINEL_MODULE_NAME= SENTINEL_MODULE_NAME=
# Sentinel 内部配置
SENTINEL_DEBUG=false
SENTINEL_SDK_PATH=sdk/sdk.js
# ========== TLS 指纹配置 ========== # ========== TLS 指纹配置 ==========
# 模拟的浏览器版本chrome110, chrome120, chrome124 # 模拟的浏览器版本chrome110, chrome120, chrome124
TLS_IMPERSONATE=chrome124 TLS_IMPERSONATE=chrome124

5
.gitignore vendored
View File

@@ -31,3 +31,8 @@ results_*.json
# OS # OS
.DS_Store .DS_Store
Thumbs.db Thumbs.db
/docs
/.claude

View File

@@ -262,6 +262,13 @@ class AppConfig(BaseSettings):
sentinel_api_key: Optional[str] = Field(default=None, description="API Key") sentinel_api_key: Optional[str] = Field(default=None, description="API Key")
sentinel_module_name: Optional[str] = Field(default=None, description="模块名称") sentinel_module_name: Optional[str] = Field(default=None, description="模块名称")
# Sentinel 内部配置
sentinel_debug: bool = Field(default=False, description="Sentinel 调试模式")
sentinel_sdk_path: str = Field(
default="sdk/sdk.js",
description="Sentinel SDK JS 文件路径(相对于项目根目录)"
)
# ========== TLS 指纹配置 ========== # ========== TLS 指纹配置 ==========
tls_impersonate: Literal["chrome110", "chrome120", "chrome124"] = Field( tls_impersonate: Literal["chrome110", "chrome120", "chrome124"] = Field(
default="chrome124", default="chrome124",
@@ -337,6 +344,17 @@ class AppConfig(BaseSettings):
module_name=self.sentinel_module_name, module_name=self.sentinel_module_name,
) )
@property
def fingerprint_config(self) -> Dict[str, Any]:
"""获取指纹配置"""
return {
"user_agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.0.0 Safari/537.36",
"screen_width": 1920,
"screen_height": 1080,
"languages": ["en-US", "en"],
"hardware_concurrency": 8,
}
def validate_config(self) -> List[str]: def validate_config(self) -> List[str]:
""" """
验证配置完整性,返回警告列表 验证配置完整性,返回警告列表

View File

@@ -1,32 +1,13 @@
""" """OpenAI 账号注册流程编排模块"""
OpenAI 账号注册流程编排模块
完整的注册流程实现,包含以下步骤:
1. 初始化会话(访问主页 + API providers
2. 获取 CSRF Token
3. OAuth 流程(跳转到 auth.openai.com
4. Sentinel 握手(获取 Token
5. 提交注册信息(邮箱 + 密码)
6. 触发邮件验证(可能遇到 Cloudflare 403
7. 提交 OTP 验证码
8. 完成用户信息(姓名 + 生日)
参考文档: /home/carry/myprj/gptAutoPlus/docs/开发文档.md
"""
from typing import Dict, Any, Optional from typing import Dict, Any, Optional
import secrets import secrets
import random import random
import asyncio
import json import json
import uuid
from core.session import ( from core.session import OAISession, CloudflareBlockError, SessionInvalidError, RateLimitError
OAISession, from core.sentinel_handler import SentinelHandler
CloudflareBlockError,
SessionInvalidError,
RateLimitError
)
from core.sentinel import SentinelHandler
from core.challenge import CloudflareSolver from core.challenge import CloudflareSolver
from utils.mail_box import MailHandler from utils.mail_box import MailHandler
from utils.crypto import generate_random_password from utils.crypto import generate_random_password
@@ -34,13 +15,8 @@ from utils.logger import logger
class RegisterFlow: class RegisterFlow:
""" """OpenAI 账号注册流程编排器"""
OpenAI 账号注册流程编排器
负责协调各个组件,按照正确的顺序执行注册流程
"""
# OpenAI 相关 URL
CHATGPT_HOME = "https://chatgpt.com/" CHATGPT_HOME = "https://chatgpt.com/"
CHATGPT_PROVIDERS = "https://chatgpt.com/api/auth/providers" CHATGPT_PROVIDERS = "https://chatgpt.com/api/auth/providers"
CHATGPT_CSRF = "https://chatgpt.com/api/auth/csrf" CHATGPT_CSRF = "https://chatgpt.com/api/auth/csrf"
@@ -52,96 +28,47 @@ class RegisterFlow:
AUTH_VALIDATE_OTP = "https://auth.openai.com/api/accounts/email-otp/validate" AUTH_VALIDATE_OTP = "https://auth.openai.com/api/accounts/email-otp/validate"
AUTH_COMPLETE_PROFILE = "https://auth.openai.com/api/accounts/create_account" AUTH_COMPLETE_PROFILE = "https://auth.openai.com/api/accounts/create_account"
def __init__( def __init__(self, session: OAISession, config, email: Optional[str] = None, password: Optional[str] = None):
self,
session: OAISession,
config,
email: Optional[str] = None,
password: Optional[str] = None
):
"""
初始化注册流程
参数:
session: OAISession 实例(已配置 TLS 指纹和代理)
config: AppConfig 配置对象
email: 注册邮箱(可选,不提供则自动生成)
password: 密码(可选,不提供则自动生成)
"""
self.s = session self.s = session
self.config = config self.config = config
self.email = email or self._generate_email() self.email = email or self._generate_email()
self.password = password or generate_random_password() self.password = password or generate_random_password()
# 初始化子模块
self.sentinel = SentinelHandler(session) self.sentinel = SentinelHandler(session)
self.mail = MailHandler.create( self.mail = MailHandler.create(config.mail.to_dict() if config.mail.enabled else None)
config.mail.to_dict() if config.mail.enabled else None
)
self.cloudflare_solver = CloudflareSolver() self.cloudflare_solver = CloudflareSolver()
# 流程状态
self.csrf_token: Optional[str] = None self.csrf_token: Optional[str] = None
self.sentinel_token: Optional[Dict[str, Any]] = None self.sentinel_token: Optional[Dict[str, Any]] = None
self.otp: Optional[str] = None self.otp: Optional[str] = None
logger.info( logger.info(f"RegisterFlow initialized for {self.email} (oai-did: {self.s.oai_did})")
f"RegisterFlow initialized for {self.email} "
f"(oai-did: {self.s.oai_did})"
)
async def run(self) -> Dict[str, Any]: async def run(self) -> Dict[str, Any]:
""" """执行完整注册流程"""
执行完整注册流程
返回:
注册结果字典,包含:
- email: 注册邮箱
- password: 密码
- status: 状态 ("success", "failed", "pending_otp", etc.)
- error: 错误信息(如果失败)
- message: 额外信息
"""
try: try:
logger.info(f"[{self.email}] Starting registration flow") logger.info(f"[{self.email}] Starting registration flow")
# Step 0: 如果使用 CloudMail,确保邮箱账户存在 # Step 0: CloudMail 邮箱账户
if self.config.mail.enabled and self.config.mail.type == "cloudmail": if self.config.mail.enabled and self.config.mail.type == "cloudmail":
try: try:
from utils.mail_box import CloudMailHandler from utils.mail_box import CloudMailHandler
if isinstance(self.mail, CloudMailHandler): if isinstance(self.mail, CloudMailHandler):
logger.info(f"[{self.email}] Step 0: Ensuring email account exists in CloudMail") logger.info(f"[{self.email}] Step 0: Ensuring email account exists")
await self.mail.ensure_email_exists(self.email) await self.mail.ensure_email_exists(self.email)
logger.info(f"[{self.email}] ✓ Email account ready") logger.info(f"[{self.email}] ✓ Email account ready")
except Exception as e: except Exception as e:
logger.warning(f"[{self.email}] Failed to create CloudMail account: {e}") logger.warning(f"[{self.email}] Failed to create CloudMail account: {e}")
# 继续执行,可能邮箱已经存在
# Step 1: 初始化会话
await self._step1_init_session() await self._step1_init_session()
# Step 2: 获取 CSRF Token
await self._step2_get_csrf_token() await self._step2_get_csrf_token()
# Step 3: OAuth 流程
await self._step3_oauth_signin() await self._step3_oauth_signin()
# Step 4: Sentinel 握手
await self._step4_get_sentinel_token() await self._step4_get_sentinel_token()
# Step 5: 提交注册信息
await self._step5_submit_registration() await self._step5_submit_registration()
# Step 6: 触发邮件验证
await self._step6_send_email_otp() await self._step6_send_email_otp()
# Step 7: 提交 OTP
await self._step7_submit_otp() await self._step7_submit_otp()
# Step 8: 完成用户信息
await self._step8_complete_profile() await self._step8_complete_profile()
# 注册成功
logger.success(f"[{self.email}] Registration completed successfully! ✅") logger.success(f"[{self.email}] Registration completed successfully! ✅")
return { return {
"email": self.email, "email": self.email,
@@ -153,437 +80,197 @@ class RegisterFlow:
except CloudflareBlockError as e: except CloudflareBlockError as e:
logger.error(f"[{self.email}] Cloudflare blocked: {e}") logger.error(f"[{self.email}] Cloudflare blocked: {e}")
return { return {"email": self.email, "password": self.password, "status": "cloudflare_blocked", "error": str(e)}
"email": self.email,
"password": self.password,
"status": "cloudflare_blocked",
"error": str(e),
"message": "Encountered Cloudflare challenge. Consider using residential proxy or solver."
}
except SessionInvalidError as e: except SessionInvalidError as e:
logger.error(f"[{self.email}] Session invalid (409): {e}") logger.error(f"[{self.email}] Session invalid: {e}")
return { return {"email": self.email, "password": self.password, "status": "session_invalid", "error": str(e)}
"email": self.email,
"password": self.password,
"status": "session_invalid",
"error": str(e),
"message": "Session conflict. CSRF token expired. Retry recommended."
}
except RateLimitError as e: except RateLimitError as e:
logger.error(f"[{self.email}] Rate limited (429): {e}") logger.error(f"[{self.email}] Rate limited: {e}")
return { return {"email": self.email, "password": self.password, "status": "rate_limited", "error": str(e)}
"email": self.email,
"password": self.password,
"status": "rate_limited",
"error": str(e),
"message": "Rate limit exceeded. Wait and retry with different IP."
}
except NotImplementedError as e: except NotImplementedError as e:
logger.warning(f"[{self.email}] Feature not implemented: {e}") logger.warning(f"[{self.email}] Feature not implemented: {e}")
return { return {"email": self.email, "password": self.password, "status": "pending_manual", "error": str(e)}
"email": self.email,
"password": self.password,
"status": "pending_manual",
"error": str(e),
"message": "Partial success. User needs to complete manual steps (Sentinel or OTP)."
}
except Exception as e: except Exception as e:
logger.exception(f"[{self.email}] Unexpected error during registration") logger.exception(f"[{self.email}] Unexpected error")
return { return {"email": self.email, "password": self.password, "status": "failed", "error": str(e)}
"email": self.email,
"password": self.password,
"status": "failed",
"error": str(e),
"message": f"Registration failed: {type(e).__name__}"
}
async def _step1_init_session(self): async def _step1_init_session(self):
""" """Step 1: 初始化会话"""
Step 1: 初始化会话
访问 ChatGPT 主页和 API providers 端点,建立基础会话
"""
logger.info(f"[{self.email}] Step 1: Initializing session") logger.info(f"[{self.email}] Step 1: Initializing session")
self.s.get(self.CHATGPT_HOME)
# 访问主页 self.s.get(self.CHATGPT_PROVIDERS)
resp = self.s.get(self.CHATGPT_HOME)
logger.debug(f" - GET {self.CHATGPT_HOME}: {resp.status_code}")
# 获取 auth providers
resp = self.s.get(self.CHATGPT_PROVIDERS)
logger.debug(f" - GET {self.CHATGPT_PROVIDERS}: {resp.status_code}")
logger.info(f"[{self.email}] ✓ Session initialized") logger.info(f"[{self.email}] ✓ Session initialized")
async def _step2_get_csrf_token(self): async def _step2_get_csrf_token(self):
""" """Step 2: 获取 CSRF Token"""
Step 2: 获取 CSRF Token
CSRF Token 用于后续的 OAuth 登录流程
"""
logger.info(f"[{self.email}] Step 2: Getting CSRF token") logger.info(f"[{self.email}] Step 2: Getting CSRF token")
resp = self.s.get(self.CHATGPT_CSRF) resp = self.s.get(self.CHATGPT_CSRF)
if resp.status_code != 200: if resp.status_code != 200:
raise RuntimeError(f"Failed to get CSRF token: {resp.status_code}") raise RuntimeError(f"Failed to get CSRF token: {resp.status_code}")
self.csrf_token = resp.json().get("csrfToken")
data = resp.json()
self.csrf_token = data.get("csrfToken")
if not self.csrf_token: if not self.csrf_token:
raise RuntimeError(f"CSRF token not found in response: {data}") raise RuntimeError("CSRF token not found")
logger.info(f"[{self.email}] ✓ CSRF token obtained")
logger.info(f"[{self.email}] ✓ CSRF token obtained: {self.csrf_token[:20]}...")
async def _step3_oauth_signin(self): async def _step3_oauth_signin(self):
""" """Step 3: OAuth 登录流程"""
Step 3: OAuth 登录流程
启动 OAuth 流程,跳转到 auth.openai.com
确保获取所有必要的 session cookies
"""
logger.info(f"[{self.email}] Step 3: Starting OAuth flow") logger.info(f"[{self.email}] Step 3: Starting OAuth flow")
# 生成 auth_session_logging_id
import uuid
auth_session_logging_id = str(uuid.uuid4())
# 发起 OAuth signin 请求(添加关键参数)
signin_params = { signin_params = {
'prompt': 'login', 'prompt': 'login',
'ext-oai-did': self.s.oai_did, 'ext-oai-did': self.s.oai_did,
'auth_session_logging_id': auth_session_logging_id, 'auth_session_logging_id': str(uuid.uuid4()),
'screen_hint': 'signup', # 🔥 明确指定注册 'screen_hint': 'signup',
'login_hint': self.email, # 🔥 传入邮箱 'login_hint': self.email,
} }
payload = { payload = {"callbackUrl": "/", "csrfToken": self.csrf_token, "json": "true"}
"callbackUrl": "/",
"csrfToken": self.csrf_token,
"json": "true"
}
resp = self.s.post( resp = self.s.post(
self.CHATGPT_SIGNIN, self.CHATGPT_SIGNIN,
params=signin_params, # ✅ 添加 URL 参数 params=signin_params,
data=payload, data=payload,
headers={"Content-Type": "application/x-www-form-urlencoded"} headers={"Content-Type": "application/x-www-form-urlencoded"}
) )
if resp.status_code != 200: if resp.status_code != 200:
raise RuntimeError(f"OAuth signin failed: {resp.status_code} - {resp.text[:200]}") raise RuntimeError(f"OAuth signin failed: {resp.status_code}")
data = resp.json()
auth_url = data.get("url")
auth_url = resp.json().get("url")
if not auth_url: if not auth_url:
raise RuntimeError(f"OAuth URL not found in response: {data}") raise RuntimeError("OAuth URL not found")
logger.debug(f" - OAuth URL: {auth_url[:100]}...") self.s.get(auth_url, allow_redirects=True)
self.s.get(self.AUTH_CREATE_ACCOUNT)
# 访问 OAuth 跳转链接(建立 auth.openai.com 会话) logger.info(f"[{self.email}] ✓ OAuth flow completed")
# 这一步会设置关键的 cookies: login_session, oai-client-auth-session, auth_provider 等
resp = self.s.get(auth_url, allow_redirects=True)
logger.debug(f" - GET OAuth URL: {resp.status_code}")
logger.debug(f" - Final URL after redirects: {resp.url}")
# 检查关键 cookies 是否已设置
important_cookies = ["login_session", "oai-client-auth-session"]
cookies_status = {
cookie: cookie in self.s.client.cookies
for cookie in important_cookies
}
logger.debug(f" - Cookies status: {cookies_status}")
# 访问注册页面
resp = self.s.get(self.AUTH_CREATE_ACCOUNT)
logger.debug(f" - GET create-account page: {resp.status_code}")
logger.info(f"[{self.email}] ✓ OAuth flow completed, redirected to auth.openai.com")
async def _step4_get_sentinel_token(self): async def _step4_get_sentinel_token(self):
""" """Step 4: Sentinel 握手"""
Step 4: Sentinel 握手
获取 Sentinel Token 用于提交注册信息
✅ 已集成完整的 Sentinel 解决方案
"""
logger.info(f"[{self.email}] Step 4: Getting Sentinel token") logger.info(f"[{self.email}] Step 4: Getting Sentinel token")
try: try:
self.sentinel_token = await self.sentinel.get_token( self.sentinel_token = await self.sentinel.get_token(flow="username_password_create")
flow="username_password_create" logger.info(f"[{self.email}] ✓ Sentinel token obtained")
)
logger.info(f"[{self.email}] ✓ Sentinel token obtained: {str(self.sentinel_token)[:50]}...")
except (NotImplementedError, ImportError) as e: except (NotImplementedError, ImportError) as e:
logger.error( logger.error(f"[{self.email}] Sentinel solver not available: {e}")
f"[{self.email}] ❌ Sentinel solver not available: {e}"
)
# 重新抛出异常,让调用方知道需要修复
raise raise
async def _step5_submit_registration(self): async def _step5_submit_registration(self):
""" """Step 5: 提交注册信息"""
Step 5: 提交注册信息
POST /api/accounts/user/register
提交用户名(邮箱)、密码Sentinel Token 放在 Header 中
"""
logger.info(f"[{self.email}] Step 5: Submitting registration") logger.info(f"[{self.email}] Step 5: Submitting registration")
# 请求 Bodyusername 和 password payload = {"username": self.email, "password": self.password}
payload = {
"username": self.email, # ✅ 改为 username
"password": self.password
}
# Sentinel Token 作为 Header 传递JSON 字符串格式)
headers = { headers = {
"Content-Type": "application/json", "Content-Type": "application/json",
"Accept": "application/json", "Accept": "application/json",
"Origin": "https://auth.openai.com", "Origin": "https://auth.openai.com",
"Referer": self.AUTH_CREATE_ACCOUNT, "Referer": self.AUTH_CREATE_ACCOUNT,
"Openai-Sentinel-Token": json.dumps(self.sentinel_token), # ✅ 注意大小写 "Openai-Sentinel-Token": json.dumps(self.sentinel_token),
"Sec-Fetch-Dest": "empty",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Site": "same-origin",
"Priority": "u=1, i",
}
# 添加 Datadog tracing headers模拟真实浏览器
headers.update({
"X-Datadog-Trace-Id": str(secrets.randbits(63)), "X-Datadog-Trace-Id": str(secrets.randbits(63)),
"X-Datadog-Parent-Id": str(secrets.randbits(63)), "X-Datadog-Parent-Id": str(secrets.randbits(63)),
"X-Datadog-Sampling-Priority": "1", "X-Datadog-Sampling-Priority": "1",
"X-Datadog-Origin": "rum", "X-Datadog-Origin": "rum",
"Traceparent": f"00-0000000000000000{secrets.token_hex(8)}-{secrets.token_hex(8)}-01", "Traceparent": f"00-0000000000000000{secrets.token_hex(8)}-{secrets.token_hex(8)}-01",
"Tracestate": "dd=s:1;o:rum", "Tracestate": "dd=s:1;o:rum",
}) }
resp = self.s.post( resp = self.s.post(self.AUTH_REGISTER, json=payload, headers=headers)
self.AUTH_REGISTER,
json=payload,
headers=headers
)
if resp.status_code != 200: if resp.status_code != 200:
error_msg = resp.text[:300]
logger.error(f" - Registration failed: {resp.status_code} - {error_msg}")
# 检查常见错误
if "email already exists" in resp.text.lower(): if "email already exists" in resp.text.lower():
raise ValueError(f"Email already registered: {self.email}") raise ValueError(f"Email already registered: {self.email}")
elif "invalid token" in resp.text.lower(): raise RuntimeError(f"Registration failed: {resp.status_code}")
raise ValueError("Invalid Sentinel token. Check your Sentinel solver.")
else:
raise RuntimeError(f"Registration submission failed: {resp.status_code} - {error_msg}")
logger.info(f"[{self.email}] ✓ Registration info submitted successfully") logger.info(f"[{self.email}] ✓ Registration submitted")
async def _step6_send_email_otp(self): async def _step6_send_email_otp(self):
""" """Step 6: 触发邮件验证"""
Step 6: 触发邮件验证
POST /api/accounts/email-otp/send
触发 OpenAI 发送 OTP 验证码到注册邮箱
⚠️ 此步骤最容易触发 Cloudflare 403
"""
logger.info(f"[{self.email}] Step 6: Sending email OTP") logger.info(f"[{self.email}] Step 6: Sending email OTP")
resp = self.s.post( resp = self.s.post(
self.AUTH_SEND_OTP, self.AUTH_SEND_OTP,
json={}, json={},
headers={ headers={"Content-Type": "application/json", "Referer": self.AUTH_CREATE_ACCOUNT}
"Content-Type": "application/json",
"Referer": self.AUTH_CREATE_ACCOUNT
}
) )
# 检查 Cloudflare 拦截 if resp.status_code == 403 and CloudflareSolver.detect_challenge(resp):
if resp.status_code == 403: raise CloudflareBlockError("Cloudflare challenge triggered")
if CloudflareSolver.detect_challenge(resp):
logger.error(f"[{self.email}] ⚠️ Cloudflare challenge detected at OTP send")
raise CloudflareBlockError(
"Cloudflare Turnstile challenge triggered when sending OTP. "
"Recommendations: use residential proxy or integrate captcha solver."
)
if resp.status_code != 200: if resp.status_code != 200:
raise RuntimeError( raise RuntimeError(f"OTP send failed: {resp.status_code}")
f"Email OTP send failed: {resp.status_code} - {resp.text[:200]}"
)
logger.info(f"[{self.email}] ✓ OTP email sent successfully") logger.info(f"[{self.email}] ✓ OTP email sent")
async def _step7_submit_otp(self): async def _step7_submit_otp(self):
""" """Step 7: 提交 OTP 验证码"""
Step 7: 提交 OTP 验证码
等待邮件接收 OTP然后提交验证
⚠️ 用户需要配置邮箱服务
"""
logger.info(f"[{self.email}] Step 7: Waiting for OTP") logger.info(f"[{self.email}] Step 7: Waiting for OTP")
try: try:
# 等待 OTP最多 5 分钟) self.otp = await self.mail.wait_for_otp(email=self.email, timeout=300)
self.otp = await self.mail.wait_for_otp(
email=self.email,
timeout=300
)
logger.info(f"[{self.email}] ✓ OTP received: {self.otp}") logger.info(f"[{self.email}] ✓ OTP received: {self.otp}")
except NotImplementedError: except NotImplementedError:
logger.warning( logger.warning(f"[{self.email}] Mail handler not configured")
f"[{self.email}] ⚠️ Mail handler not configured, cannot retrieve OTP"
)
# 重新抛出异常,让调用方知道需要手动输入 OTP
raise raise
except TimeoutError: except TimeoutError:
logger.error(f"[{self.email}] ⏱ Timeout waiting for OTP") raise TimeoutError(f"Timeout waiting for OTP: {self.email}")
raise TimeoutError(
f"Timeout waiting for OTP email (5 minutes). "
f"Please check email: {self.email}"
)
# 提交 OTP
payload = {"code": self.otp}
resp = self.s.post( resp = self.s.post(
self.AUTH_VALIDATE_OTP, self.AUTH_VALIDATE_OTP,
json=payload, json={"code": self.otp},
headers={ headers={"Content-Type": "application/json", "Referer": self.AUTH_CREATE_ACCOUNT}
"Content-Type": "application/json",
"Referer": self.AUTH_CREATE_ACCOUNT
}
) )
if resp.status_code != 200: if resp.status_code != 200:
error_msg = resp.text[:200] raise RuntimeError(f"OTP validation failed: {resp.status_code}")
logger.error(f" - OTP validation failed: {resp.status_code} - {error_msg}")
if "invalid code" in resp.text.lower(): logger.info(f"[{self.email}] ✓ OTP validated")
raise ValueError(f"Invalid OTP code: {self.otp}")
else:
raise RuntimeError(f"OTP validation failed: {resp.status_code} - {error_msg}")
logger.info(f"[{self.email}] ✓ OTP validated successfully")
async def _step8_complete_profile(self): async def _step8_complete_profile(self):
""" """Step 8: 完成用户信息"""
Step 8: 完成用户信息
POST /api/accounts/create_account
提交姓名和生日,完成注册
"""
logger.info(f"[{self.email}] Step 8: Completing profile") logger.info(f"[{self.email}] Step 8: Completing profile")
name = self._generate_name() name = self._generate_name()
birthdate = self._generate_birthdate() birthdate = self._generate_birthdate()
payload = {
"name": name,
"birthdate": birthdate
}
resp = self.s.post( resp = self.s.post(
self.AUTH_COMPLETE_PROFILE, self.AUTH_COMPLETE_PROFILE,
json=payload, json={"name": name, "birthdate": birthdate},
headers={ headers={"Content-Type": "application/json", "Referer": self.AUTH_CREATE_ACCOUNT}
"Content-Type": "application/json",
"Referer": self.AUTH_CREATE_ACCOUNT
}
) )
if resp.status_code != 200: if resp.status_code != 200:
raise RuntimeError( raise RuntimeError(f"Profile completion failed: {resp.status_code}")
f"Profile completion failed: {resp.status_code} - {resp.text[:200]}"
)
logger.info( logger.info(f"[{self.email}] ✓ Profile completed")
f"[{self.email}] ✓ Profile completed: name={name}, birthdate={birthdate}"
)
# ========== 辅助方法 ==========
def _generate_email(self) -> str: def _generate_email(self) -> str:
""" """生成随机邮箱"""
生成随机邮箱
如果启用了 CloudMail 且配置了域名,使用 CloudMail 域名
否则使用 example.com需要用户替换
返回:
邮箱地址
"""
random_part = secrets.token_hex(8) random_part = secrets.token_hex(8)
# 如果使用 CloudMail 且配置了域名,使用真实域名
if self.config.mail.enabled and self.config.mail.type == "cloudmail": if self.config.mail.enabled and self.config.mail.type == "cloudmail":
domain = self.config.mail.cloudmail_domain domain = self.config.mail.cloudmail_domain
if domain: if domain:
return f"user_{random_part}@{domain}" return f"user_{random_part}@{domain}"
# 默认使用 example.com用户应该替换为实际域名 logger.warning("Using example.com domain. Configure MAIL_CLOUDMAIL_DOMAIN.")
logger.warning(
f"Using example.com domain. Please configure MAIL_CLOUDMAIL_DOMAIN "
f"or replace with your actual domain."
)
return f"user_{random_part}@example.com" return f"user_{random_part}@example.com"
def _generate_name(self) -> str: def _generate_name(self) -> str:
""" """生成随机姓名"""
生成随机姓名 first_names = ["James", "John", "Robert", "Michael", "William", "David", "Mary", "Patricia", "Jennifer", "Linda"]
last_names = ["Smith", "Johnson", "Williams", "Brown", "Jones", "Garcia", "Miller", "Davis", "Wilson", "Anderson"]
返回: return f"{random.choice(first_names)} {random.choice(last_names)}"
姓名字符串
"""
first_names = [
"James", "John", "Robert", "Michael", "William",
"David", "Richard", "Joseph", "Thomas", "Charles",
"Mary", "Patricia", "Jennifer", "Linda", "Elizabeth",
"Barbara", "Susan", "Jessica", "Sarah", "Karen"
]
last_names = [
"Smith", "Johnson", "Williams", "Brown", "Jones",
"Garcia", "Miller", "Davis", "Rodriguez", "Martinez",
"Hernandez", "Lopez", "Gonzalez", "Wilson", "Anderson",
"Thomas", "Taylor", "Moore", "Jackson", "Martin"
]
first = random.choice(first_names)
last = random.choice(last_names)
return f"{first} {last}"
def _generate_birthdate(self) -> str: def _generate_birthdate(self) -> str:
""" """生成随机生日 (1980-2002)"""
生成随机生日1980-2002 年,确保满 18 岁)
返回:
日期字符串,格式: YYYY-MM-DD
"""
year = random.randint(1980, 2002) year = random.randint(1980, 2002)
month = random.randint(1, 12) month = random.randint(1, 12)
day = random.randint(1, 28) if month == 2 else random.randint(1, 30) if month in [4, 6, 9, 11] else random.randint(1, 31)
# 避免 2 月 29/30/31 日等无效日期
if month == 2:
day = random.randint(1, 28)
elif month in [4, 6, 9, 11]:
day = random.randint(1, 30)
else:
day = random.randint(1, 31)
return f"{year}-{month:02d}-{day:02d}" return f"{year}-{month:02d}-{day:02d}"
# 导出主要接口
__all__ = ["RegisterFlow"] __all__ = ["RegisterFlow"]

View File

@@ -1,225 +0,0 @@
"""
Sentinel 反爬机制处理器
Sentinel 是 OpenAI 用于防止自动化注册的安全机制,包括:
- Proof of Work (PoW) 挑战
- 设备指纹验证
- 行为分析
✅ 已集成完整的 Sentinel 解决方案(使用 reference/ 下的代码)
"""
from typing import Optional, Dict, Any
from utils.logger import logger
from utils.fingerprint import BrowserFingerprint
class SentinelHandler:
"""
Sentinel 反爬机制处理器
✅ 已集成用户的 Sentinel 解决方案
使用 reference/ 目录下的完整实现
"""
# Sentinel API 端点(从开发文档提取)
SENTINEL_API = "https://chatgpt.com/_next/static/chunks/sentinel.js"
SENTINEL_TOKEN_ENDPOINT = "https://api.openai.com/sentinel/token"
def __init__(self, session):
"""
初始化 Sentinel 处理器
参数:
session: OAISession 实例(需要使用其 Cookie 和代理)
"""
from core.session import OAISession
self.session: OAISession = session
self.oai_did = session.oai_did
# 初始化浏览器指纹
self.fingerprint = BrowserFingerprint(session_id=self.oai_did)
# 延迟导入 SentinelSolver避免循环导入
self._solver = None
logger.info("SentinelHandler initialized")
def _get_solver(self):
"""延迟初始化 Sentinel 求解器"""
if self._solver is None:
try:
from reference.sentinel_solver import SentinelSolver
self._solver = SentinelSolver(self.fingerprint)
logger.debug("SentinelSolver initialized successfully")
except ImportError as e:
logger.error(f"Failed to import SentinelSolver: {e}")
raise ImportError(
"Sentinel solver not found. Please check reference/ directory."
) from e
return self._solver
async def get_token(
self,
flow: str = "username_password_create",
**kwargs
) -> Dict[str, Any]:
"""
获取 Sentinel Token
✅ 已实现 - 使用 reference/ 下的完整解决方案
参数:
flow: 注册流程类型,常见值:
- "username_password_create" (注册流程)
- "username_password_create__auto" (自动注册)
**kwargs: 其他可能需要的参数
返回:
Sentinel Token 字典 {"p": "...", "t": "...", "c": "...", "id": "...", "flow": "..."}
实现说明:
使用 reference/sentinel_solver.py 生成 requirements token
返回完整的 JSON 对象(用于 Header
"""
logger.info(f"Generating Sentinel token for flow='{flow}'")
try:
# 获取求解器
solver = self._get_solver()
# 生成 requirements token
token_dict = solver.generate_requirements_token()
# 构建完整 token
# 格式: {"p": "gAAAAAC...", "t": "...", "c": "...", "id": "uuid", "flow": "..."}
token_dict["flow"] = flow
token_dict["id"] = self.oai_did
# 验证必需字段
if "p" not in token_dict or not token_dict["p"]:
raise ValueError("Generated token missing 'p' field")
logger.success(f"Sentinel token generated: {str(token_dict)[:50]}...")
return token_dict
except ImportError as e:
logger.error(f"Sentinel solver not available: {e}")
raise NotImplementedError(
"❌ Sentinel solver import failed.\n\n"
"Please ensure:\n"
"1. reference/ directory exists with sentinel_solver.py\n"
"2. sdk/sdk.js file exists\n"
"3. Node.js is installed and available in PATH\n\n"
f"Error: {e}"
) from e
except Exception as e:
logger.exception(f"Failed to generate Sentinel token: {e}")
raise RuntimeError(f"Sentinel token generation failed: {e}") from e
async def solve_enforcement(
self,
enforcement_config: Dict[str, Any]
) -> str:
"""
解决完整的 enforcement 挑战PoW + Turnstile
✅ 已实现 - 使用 reference/sentinel_solver.py
参数:
enforcement_config: 服务器返回的挑战配置
{
'proofofwork': {
'seed': '...',
'difficulty': '0003a',
'token': '...', # cached token
'turnstile': {
'dx': '...' # VM bytecode
}
}
}
返回:
完整的 Sentinel token (JSON string)
"""
logger.info("Solving enforcement challenge...")
try:
solver = self._get_solver()
token_json = solver.solve_enforcement(enforcement_config)
logger.success(f"Enforcement solved: {token_json[:50]}...")
return token_json
except Exception as e:
logger.exception(f"Failed to solve enforcement: {e}")
raise RuntimeError(f"Enforcement solving failed: {e}") from e
def _build_payload(self, flow: str) -> Dict[str, Any]:
"""
构建 Sentinel 请求 Payload
参考开发文档中的请求格式:
{
"p": "gAAAAAB...", # Proof of Work 答案
"id": "a1b2c3d4-...", # oai-did
"flow": "username_password_create__auto"
}
参数:
flow: 注册流程类型
返回:
Payload 字典
"""
return {
"p": "gAAAAAB_PLACEHOLDER_POW_ANSWER",
"id": self.oai_did,
"flow": flow
}
async def verify_token(self, token: str) -> bool:
"""
验证 Sentinel Token 是否有效(可选功能)
参数:
token: 待验证的 Sentinel Token
返回:
True 如果有效,否则 False
"""
if not token or token == "placeholder_token":
logger.warning("Received placeholder token, validation skipped")
return False
# Token 基本格式检查
if not token.startswith("gAAAAA"):
logger.warning(f"Invalid token format: {token[:20]}...")
return False
logger.info(f"Token validation: {token[:20]}... (length={len(token)})")
return True
def get_sentinel_script(self) -> Optional[str]:
"""
获取 Sentinel JavaScript 代码(可选,用于分析)
返回:
Sentinel JS 代码内容,失败则返回 None
"""
try:
response = self.session.get(self.SENTINEL_API)
if response.status_code == 200:
logger.info(f"Sentinel script fetched: {len(response.text)} bytes")
return response.text
else:
logger.error(f"Failed to fetch Sentinel script: {response.status_code}")
return None
except Exception as e:
logger.error(f"Error fetching Sentinel script: {e}")
return None
# 导出主要接口
__all__ = ["SentinelHandler"]

View File

@@ -0,0 +1,7 @@
"""
Sentinel 反爬机制解决方案
"""
from .solver import SentinelSolver
from .js_executor import JSExecutor
__all__ = ["SentinelSolver", "JSExecutor"]

View File

@@ -11,31 +11,43 @@ from __future__ import annotations
import json import json
import re import re
import subprocess import subprocess
import sys
from pathlib import Path from pathlib import Path
from typing import Any, Dict, Optional from typing import Any, Dict, Optional
from reference.config import DEBUG, SDK_JS_PATH from config import load_config
class JSExecutor: class JSExecutor:
"""通过 Node.js 执行 Sentinel SDK 内部逻辑(支持 async Turnstile VM""" """通过 Node.js 执行 Sentinel SDK 内部逻辑(支持 async Turnstile VM"""
def __init__(self): def __init__(self):
# 加载配置
config = load_config()
self.debug = config.sentinel_debug
# 解析 SDK 路径
sdk_path = Path(config.sentinel_sdk_path)
if not sdk_path.is_absolute():
# 相对路径:相对于项目根目录
project_root = Path(sys.argv[0]).parent.resolve()
sdk_path = project_root / sdk_path
self._sdk_path = sdk_path
self._sdk_code: str = "" self._sdk_code: str = ""
self._load_sdk() self._load_sdk()
def _load_sdk(self) -> None: def _load_sdk(self) -> None:
sdk_path = Path(SDK_JS_PATH) if not self._sdk_path.exists():
if not sdk_path.exists(): raise FileNotFoundError(f"SDK not found at {self._sdk_path}")
raise FileNotFoundError(f"SDK not found at {SDK_JS_PATH}")
sdk_code = sdk_path.read_text(encoding="utf-8") sdk_code = self._sdk_path.read_text(encoding="utf-8")
sdk_code = self._sanitize_sdk(sdk_code) sdk_code = self._sanitize_sdk(sdk_code)
sdk_code = self._inject_internal_exports(sdk_code) sdk_code = self._inject_internal_exports(sdk_code)
self._sdk_code = sdk_code self._sdk_code = sdk_code
if DEBUG: if self.debug:
print("[JSExecutor] SDK loaded successfully (sanitized)") print("[JSExecutor] SDK loaded successfully (sanitized)")
def _sanitize_sdk(self, sdk_code: str) -> str: def _sanitize_sdk(self, sdk_code: str) -> str:
@@ -169,7 +181,7 @@ async function __entry() {{
def _run_node(self, payload: Dict[str, Any], entry: str, timeout_s: int = 30) -> Any: def _run_node(self, payload: Dict[str, Any], entry: str, timeout_s: int = 30) -> Any:
script = self._node_script(payload, entry) script = self._node_script(payload, entry)
if DEBUG: if self.debug:
print("[JSExecutor] Running Node worker...") print("[JSExecutor] Running Node worker...")
try: try:
@@ -200,7 +212,7 @@ async function __entry() {{
return obj.get("result") return obj.get("result")
def solve_pow(self, seed: str, difficulty: str, config_array: list) -> str: def solve_pow(self, seed: str, difficulty: str, config_array: list) -> str:
if DEBUG: if self.debug:
print(f"[JSExecutor] Solving PoW: seed={seed[:10]}..., difficulty={difficulty}") print(f"[JSExecutor] Solving PoW: seed={seed[:10]}..., difficulty={difficulty}")
result = self._run_node( result = self._run_node(
@@ -209,7 +221,7 @@ async function __entry() {{
timeout_s=60, timeout_s=60,
) )
if DEBUG and isinstance(result, str): if self.debug and isinstance(result, str):
print(f"[JSExecutor] PoW solved: {result[:50]}...") print(f"[JSExecutor] PoW solved: {result[:50]}...")
return result return result
@@ -227,7 +239,7 @@ async function __entry() {{
return result return result
def execute_turnstile(self, dx_bytecode: str, xor_key: str) -> str: def execute_turnstile(self, dx_bytecode: str, xor_key: str) -> str:
if DEBUG: if self.debug:
print("[JSExecutor] Executing Turnstile VM...") print("[JSExecutor] Executing Turnstile VM...")
result = self._run_node( result = self._run_node(
@@ -239,7 +251,7 @@ async function __entry() {{
timeout_s=30, timeout_s=30,
) )
if DEBUG and isinstance(result, str): if self.debug and isinstance(result, str):
print(f"[JSExecutor] Turnstile result: {result[:50]}...") print(f"[JSExecutor] Turnstile result: {result[:50]}...")
return result return result

View File

@@ -5,9 +5,9 @@ import json
import uuid import uuid
from typing import Dict, Optional from typing import Dict, Optional
from reference.js_executor import JSExecutor from core.sentinel.js_executor import JSExecutor
from utils.fingerprint import BrowserFingerprint from utils.fingerprint import BrowserFingerprint
from reference.config import DEBUG from config import load_config
class SentinelSolver: class SentinelSolver:
@@ -17,6 +17,10 @@ class SentinelSolver:
self.fingerprint = fingerprint self.fingerprint = fingerprint
self.js_executor = JSExecutor() self.js_executor = JSExecutor()
# 加载配置
config = load_config()
self.debug = config.sentinel_debug
def generate_requirements_token(self) -> Dict[str, str]: def generate_requirements_token(self) -> Dict[str, str]:
""" """
生成 requirements token初始化时需要 生成 requirements token初始化时需要
@@ -24,7 +28,7 @@ class SentinelSolver:
Returns: Returns:
{'p': 'gAAAAAC...', 'id': 'uuid'} {'p': 'gAAAAAC...', 'id': 'uuid'}
""" """
if DEBUG: if self.debug:
print("[Solver] Generating requirements token...") print("[Solver] Generating requirements token...")
# 生成随机 seed # 生成随机 seed
@@ -41,7 +45,7 @@ class SentinelSolver:
'id': self.fingerprint.session_id, 'id': self.fingerprint.session_id,
} }
if DEBUG: if self.debug:
print(f"[Solver] Requirements token: {token['p'][:30]}...") print(f"[Solver] Requirements token: {token['p'][:30]}...")
return token return token
@@ -66,7 +70,7 @@ class SentinelSolver:
Returns: Returns:
完整的 Sentinel token (JSON string) 完整的 Sentinel token (JSON string)
""" """
if DEBUG: if self.debug:
print("[Solver] Solving enforcement challenge...") print("[Solver] Solving enforcement challenge...")
pow_data = enforcement_config.get('proofofwork', {}) pow_data = enforcement_config.get('proofofwork', {})
@@ -105,9 +109,7 @@ class SentinelSolver:
token_json = json.dumps(sentinel_token) token_json = json.dumps(sentinel_token)
if DEBUG: if self.debug:
print(f"[Solver] Sentinel token generated: {token_json[:80]}...") print(f"[Solver] Sentinel token generated: {token_json[:80]}...")
return token_json return token_json

95
core/sentinel_handler.py Normal file
View File

@@ -0,0 +1,95 @@
"""Sentinel 反爬机制处理器"""
from typing import Optional, Dict, Any
from utils.logger import logger
from utils.fingerprint import BrowserFingerprint
class SentinelHandler:
"""Sentinel 反爬机制处理器"""
SENTINEL_API = "https://chatgpt.com/_next/static/chunks/sentinel.js"
SENTINEL_TOKEN_ENDPOINT = "https://api.openai.com/sentinel/token"
def __init__(self, session):
from core.session import OAISession
self.session: OAISession = session
self.oai_did = session.oai_did
self.fingerprint = BrowserFingerprint(session_id=self.oai_did)
self._solver = None
logger.info("SentinelHandler initialized")
def _get_solver(self):
"""延迟初始化 Sentinel 求解器"""
if self._solver is None:
try:
from core.sentinel import SentinelSolver
self._solver = SentinelSolver(self.fingerprint)
logger.debug("SentinelSolver initialized")
except ImportError as e:
logger.error(f"Failed to import SentinelSolver: {e}")
raise ImportError("Sentinel solver not found. Check core/sentinel/ directory.") from e
return self._solver
async def get_token(self, flow: str = "username_password_create", **kwargs) -> Dict[str, Any]:
"""获取 Sentinel Token"""
logger.info(f"Generating Sentinel token for flow='{flow}'")
try:
solver = self._get_solver()
token_dict = solver.generate_requirements_token()
token_dict["flow"] = flow
token_dict["id"] = self.oai_did
if "p" not in token_dict or not token_dict["p"]:
raise ValueError("Generated token missing 'p' field")
logger.success(f"Sentinel token generated: {str(token_dict)[:50]}...")
return token_dict
except ImportError as e:
logger.error(f"Sentinel solver not available: {e}")
raise NotImplementedError(f"Sentinel solver import failed: {e}") from e
except Exception as e:
logger.exception(f"Failed to generate Sentinel token: {e}")
raise RuntimeError(f"Sentinel token generation failed: {e}") from e
async def solve_enforcement(self, enforcement_config: Dict[str, Any]) -> str:
"""解决 enforcement 挑战PoW + Turnstile"""
logger.info("Solving enforcement challenge...")
try:
solver = self._get_solver()
token_json = solver.solve_enforcement(enforcement_config)
logger.success(f"Enforcement solved: {token_json[:50]}...")
return token_json
except Exception as e:
logger.exception(f"Failed to solve enforcement: {e}")
raise RuntimeError(f"Enforcement solving failed: {e}") from e
async def verify_token(self, token: str) -> bool:
"""验证 Sentinel Token 格式"""
if not token or token == "placeholder_token":
return False
if not token.startswith("gAAAAA"):
logger.warning(f"Invalid token format: {token[:20]}...")
return False
return True
def get_sentinel_script(self) -> Optional[str]:
"""获取 Sentinel JavaScript 代码"""
try:
response = self.session.get(self.SENTINEL_API)
if response.status_code == 200:
logger.info(f"Sentinel script fetched: {len(response.text)} bytes")
return response.text
logger.error(f"Failed to fetch Sentinel script: {response.status_code}")
return None
except Exception as e:
logger.error(f"Error fetching Sentinel script: {e}")
return None
__all__ = ["SentinelHandler"]

View File

@@ -1,11 +0,0 @@
"""
Reference 模块 - Sentinel 解决方案
包含 Sentinel Token 生成的完整实现
"""
from .sentinel_solver import SentinelSolver
from .js_executor import JSExecutor
from .pow_solver import ProofOfWorkSolver
__all__ = ["SentinelSolver", "JSExecutor", "ProofOfWorkSolver"]

View File

@@ -1,14 +0,0 @@
"""
Reference 模块配置文件
供 Sentinel 解决器使用的配置项
"""
# 调试模式
DEBUG = False
# SDK JS 文件路径
SDK_JS_PATH = "/home/carry/myprj/gptAutoPlus/sdk/sdk.js"
# 导出
__all__ = ["DEBUG", "SDK_JS_PATH"]

View File

@@ -1,167 +0,0 @@
# modules/fingerprint.py
"""浏览器指纹生成器"""
import uuid
import random
import time
from datetime import datetime
from typing import Dict, List, Any
from config import FINGERPRINT_CONFIG
class BrowserFingerprint:
"""生成符合 SDK 期望的浏览器指纹"""
def __init__(self, session_id: str = None):
self.session_id = session_id or str(uuid.uuid4())
# 新增: 使用确定性方法从 session_id 派生 Stripe 指纹
import hashlib
seed = hashlib.sha256(self.session_id.encode()).hexdigest()
# seed 是64个hex字符我们需要确保切片正确
# 从 seed 生成一致的 guid/muid/sid
# UUID需要32个hex字符去掉连字符额外部分直接拼接
self.stripe_guid = seed[:8] + '-' + seed[8:12] + '-' + seed[12:16] + '-' + seed[16:20] + '-' + seed[20:32] + seed[32:40]
self.stripe_muid = seed[:8] + '-' + seed[8:12] + '-' + seed[12:16] + '-' + seed[16:20] + '-' + seed[20:32] + seed[40:46]
self.stripe_sid = seed[:8] + '-' + seed[8:12] + '-' + seed[12:16] + '-' + seed[16:20] + '-' + seed[20:32] + seed[46:52]
self.user_agent = FINGERPRINT_CONFIG['user_agent']
self.screen_width = FINGERPRINT_CONFIG['screen_width']
self.screen_height = FINGERPRINT_CONFIG['screen_height']
self.languages = FINGERPRINT_CONFIG['languages']
self.hardware_concurrency = FINGERPRINT_CONFIG['hardware_concurrency']
def get_config_array(self) -> List[Any]:
"""
生成 SDK getConfig() 函数返回的 18 元素数组
对应 SDK 源码:
[0]: screen.width + screen.height
[1]: new Date().toString()
[2]: performance.memory.jsHeapSizeLimit (可选)
[3]: nonce (PoW 填充)
[4]: navigator.userAgent
[5]: 随机 script.src
[6]: build ID
[7]: navigator.language
[8]: navigator.languages.join(',')
[9]: 运行时间 (PoW 填充)
[10]: 随机 navigator 属性
[11]: 随机 document key
[12]: 随机 window key
[13]: performance.now()
[14]: session UUID
[15]: URL search params
[16]: navigator.hardwareConcurrency
[17]: performance.timeOrigin
"""
# 模拟的 script sources
fake_scripts = [
"https://sentinel.openai.com/sentinel/97790f37/sdk.js",
"https://chatgpt.com/static/js/main.abc123.js",
"https://cdn.oaistatic.com/_next/static/chunks/main.js",
]
# 模拟的 navigator 属性名
navigator_props = [
'hardwareConcurrency', 'language', 'languages',
'platform', 'userAgent', 'vendor'
]
# 模拟的 document keys
document_keys = ['body', 'head', 'documentElement', 'scripts']
# 模拟的 window keys
window_keys = ['performance', 'navigator', 'document', 'location']
current_time = time.time() * 1000
return [
self.screen_width + self.screen_height, # [0]
str(datetime.now()), # [1]
None, # [2] memory
None, # [3] nonce (placeholder)
self.user_agent, # [4]
random.choice(fake_scripts), # [5]
"97790f37", # [6] build ID
self.languages[0], # [7]
",".join(self.languages), # [8]
None, # [9] runtime (placeholder)
f"{random.choice(navigator_props)}{random.randint(1, 16)}", # [10]
random.choice(document_keys), # [11]
random.choice(window_keys), # [12]
current_time, # [13]
self.session_id, # [14]
"", # [15] URL params
self.hardware_concurrency, # [16]
current_time - random.uniform(100, 1000), # [17] timeOrigin
]
def get_cookies(self) -> Dict[str, str]:
"""生成初始 cookies"""
return {
'oai-did': self.session_id,
}
def get_stripe_fingerprint(self) -> Dict[str, str]:
"""获取 Stripe 支付指纹(与 session_id 一致派生)"""
return {
'guid': self.stripe_guid,
'muid': self.stripe_muid,
'sid': self.stripe_sid,
}
def get_headers(self, with_sentinel: str = None, host: str = 'auth.openai.com') -> Dict[str, str]:
"""生成 HTTP headers支持多域名"""
# 基础 headers
headers = {
'User-Agent': self.user_agent,
'Accept': 'application/json',
'Accept-Language': f"{self.languages[0]},{self.languages[1]};q=0.5",
# Note: urllib3/requests only auto-decompress brotli/zstd when optional
# deps are installed; avoid advertising unsupported encodings.
'Accept-Encoding': 'gzip, deflate',
'Connection': 'keep-alive',
'Sec-Fetch-Dest': 'empty',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Site': 'same-origin',
'Priority': 'u=1, i',
'Pragma': 'no-cache',
'Cache-Control': 'no-cache',
}
# 根据域名设置特定 headers
if 'chatgpt.com' in host:
headers.update({
'Origin': 'https://chatgpt.com',
'Referer': 'https://chatgpt.com/',
})
else:
headers.update({
'Origin': 'https://auth.openai.com',
'Referer': 'https://auth.openai.com/create-account/password',
'Content-Type': 'application/json',
})
# Sentinel token
if with_sentinel:
headers['openai-sentinel-token'] = with_sentinel
# Datadog RUM tracing
trace_id = random.randint(10**18, 10**19 - 1)
parent_id = random.randint(10**18, 10**19 - 1)
headers.update({
'traceparent': f'00-0000000000000000{trace_id:016x}-{parent_id:016x}-01',
'tracestate': 'dd=s:1;o:rum',
'x-datadog-origin': 'rum',
'x-datadog-parent-id': str(parent_id),
'x-datadog-sampling-priority': '1',
'x-datadog-trace-id': str(trace_id),
})
return headers

View File

@@ -1,114 +0,0 @@
import time
import json
import base64
from typing import List
DEBUG = True
class ProofOfWorkSolver:
"""解决 OpenAI Sentinel 的 Proof of Work challenge"""
def __init__(self):
# FNV-1a 常量
self.FNV_OFFSET = 2166136261
self.FNV_PRIME = 16777619
def fnv1a_hash(self, data: str) -> str:
"""FNV-1a hash 算法"""
hash_value = self.FNV_OFFSET
for char in data:
hash_value ^= ord(char)
hash_value = (hash_value * self.FNV_PRIME) & 0xFFFFFFFF
# 额外的混合步骤(从 JS 代码复制)
hash_value ^= hash_value >> 16
hash_value = (hash_value * 2246822507) & 0xFFFFFFFF
hash_value ^= hash_value >> 13
hash_value = (hash_value * 3266489909) & 0xFFFFFFFF
hash_value ^= hash_value >> 16
# 转为 8 位十六进制字符串
return format(hash_value, '08x')
def serialize_array(self, arr: List) -> str:
"""模拟 JS 的 T() 函数JSON.stringify + Base64"""
json_str = json.dumps(arr, separators=(',', ':'))
return base64.b64encode(json_str.encode()).decode()
def build_fingerprint_array(self, nonce: int, elapsed_ms: int) -> List:
"""构建指纹数组(简化版)"""
return [
0, # [0] screen dimensions
"", # [1] timestamp
0, # [2] memory
nonce, # [3] nonce ← 关键
"", # [4] user agent
"", # [5] random element
"", # [6] script src
"", # [7] language
"", # [8] languages
elapsed_ms, # [9] elapsed time ← 关键
"", # [10] random function
"", # [11] keys
"", # [12] window keys
0, # [13] performance.now()
"", # [14] uuid
"", # [15] URL params
0, # [16] hardware concurrency
0 # [17] timeOrigin
]
def solve(self, seed: str, difficulty: str, max_iterations: int = 10000000) -> str:
"""
解决 PoW challenge
Args:
seed: Challenge seed
difficulty: 目标难度(十六进制字符串)
max_iterations: 最大尝试次数
Returns:
序列化的答案(包含 nonce
"""
if DEBUG:
print(f"[PoW] Solving challenge:")
print(f" Seed: {seed}")
print(f" Difficulty: {difficulty}")
start_time = time.time()
for nonce in range(max_iterations):
elapsed_ms = int((time.time() - start_time) * 1000)
# 构建指纹数组
fingerprint = self.build_fingerprint_array(nonce, elapsed_ms)
# 序列化
serialized = self.serialize_array(fingerprint)
# 计算 hash(seed + serialized)
hash_input = seed + serialized
hash_result = self.fnv1a_hash(hash_input)
# 检查是否满足难度要求
# 比较方式hash 的前 N 位(作为整数)<= difficulty作为整数
difficulty_len = len(difficulty)
hash_prefix = hash_result[:difficulty_len]
if hash_prefix <= difficulty:
elapsed = time.time() - start_time
if DEBUG:
print(f"[PoW] ✓ Found solution in {elapsed:.2f}s")
print(f" Nonce: {nonce}")
print(f" Hash: {hash_result}")
print(f" Serialized: {serialized[:100]}...")
# 返回 serialized + "~S" (表示成功)
return serialized + "~S"
# 每 100k 次迭代打印进度
if DEBUG and nonce > 0 and nonce % 100000 == 0:
print(f"[PoW] Tried {nonce:,} iterations...")
raise Exception(f"Failed to solve PoW after {max_iterations:,} iterations")

File diff suppressed because it is too large Load Diff

View File

@@ -1,183 +0,0 @@
#!/usr/bin/env python3
"""
Cloud Mail API 独立测试脚本
使用方法:
1. 配置 .env 文件中的 Cloud Mail 参数
2. 运行: python test_cloudmail_standalone.py
"""
import asyncio
import time
from config import load_config
from utils.mail_box import CloudMailHandler
from utils.logger import logger
async def test_email_query(handler: CloudMailHandler, test_email: str):
"""测试邮件查询功能"""
logger.info("=" * 60)
logger.info("测试 1: 查询最近的邮件")
logger.info("=" * 60)
try:
emails = await handler._query_emails(
to_email=test_email,
time_sort="desc",
size=5
)
logger.success(f"✓ 查询到 {len(emails)} 封邮件")
if emails:
for i, email in enumerate(emails, 1):
logger.info(
f" 邮件 {i}:\n"
f" 发件人: {email.get('sendEmail')}\n"
f" 主题: {email.get('subject')}\n"
f" 时间: {email.get('createTime')}"
)
else:
logger.warning(" (邮箱为空)")
return len(emails) > 0
except Exception as e:
logger.error(f"✗ 测试失败: {e}")
return False
async def test_otp_waiting(handler: CloudMailHandler, test_email: str):
"""测试 OTP 等待功能"""
logger.info("")
logger.info("=" * 60)
logger.info("测试 2: OTP 等待功能")
logger.info("=" * 60)
logger.warning(f"请在 60 秒内向 {test_email} 发送测试 OTP 邮件")
logger.warning(f"发件人应为: {handler.OTP_SENDER}")
try:
otp = await handler.wait_for_otp(test_email, timeout=60)
logger.success(f"✓ OTP 接收成功: {otp}")
return True
except TimeoutError:
logger.error("✗ 超时未收到 OTP")
return False
except Exception as e:
logger.error(f"✗ 测试失败: {e}")
return False
async def test_add_user(handler: CloudMailHandler):
"""测试添加用户功能"""
logger.info("")
logger.info("=" * 60)
logger.info("测试 3: 添加测试用户")
logger.info("=" * 60)
# 生成测试邮箱
test_users = [
{"email": f"test_{int(time.time())}@example.com"}
]
try:
result = await handler.add_users(test_users)
logger.success(f"✓ 用户创建请求已发送")
logger.info(f" 响应: {result}")
return True
except Exception as e:
logger.error(f"✗ 测试失败: {e}")
return False
async def main():
"""主测试流程"""
logger.info("=" * 60)
logger.info("Cloud Mail API 测试开始")
logger.info("=" * 60)
# 加载配置
try:
config = load_config()
except Exception as e:
logger.error(f"配置加载失败: {e}")
return
# 验证配置
if not config.mail.enabled or config.mail.type != "cloudmail":
logger.error("")
logger.error("请在 .env 中配置 Cloud Mail 参数:")
logger.error(" MAIL_ENABLED=true")
logger.error(" MAIL_TYPE=cloudmail")
logger.error(" MAIL_CLOUDMAIL_API_URL=https://your-domain.com")
logger.error(" MAIL_CLOUDMAIL_TOKEN=your_token")
return
# 初始化 handler
try:
handler = CloudMailHandler(config.mail.to_dict())
except Exception as e:
logger.error(f"CloudMailHandler 初始化失败: {e}")
return
# 获取测试邮箱
logger.info("")
test_email = input("请输入测试邮箱地址: ").strip()
if not test_email:
logger.error("未输入邮箱地址,退出测试")
return
# 运行测试
results = {}
try:
# 测试 1: 邮件查询
results["email_query"] = await test_email_query(handler, test_email)
# 测试 2: OTP 等待(可选)
if input("\n是否测试 OTP 等待功能? (y/N): ").lower() == 'y':
results["otp_waiting"] = await test_otp_waiting(handler, test_email)
# 测试 3: 添加用户(可选)
if input("\n是否测试添加用户功能? (y/N): ").lower() == 'y':
results["add_user"] = await test_add_user(handler)
finally:
# 清理资源
await handler.close()
# 测试总结
logger.info("")
logger.info("=" * 60)
logger.info("测试结果总结")
logger.info("=" * 60)
if results:
for name, passed in results.items():
status = "✓ 通过" if passed else "✗ 失败"
logger.info(f" {name}: {status}")
# 总体结果
total = len(results)
passed = sum(1 for v in results.values() if v)
logger.info("")
logger.info(f"总计: {passed}/{total} 测试通过")
if passed == total:
logger.success("所有测试通过!✅")
else:
logger.warning(f"部分测试失败 ({total - passed} 个)")
else:
logger.warning("未执行任何测试")
logger.info("=" * 60)
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
logger.warning("\n测试被用户中断")
except Exception as e:
logger.exception(f"测试过程中发生未捕获的异常: {e}")

View File

@@ -1,277 +0,0 @@
#!/usr/bin/env python3
"""
Sentinel 集成测试脚本
验证 Sentinel 解决方案是否正确集成
"""
import sys
import asyncio
from pathlib import Path
def test_imports():
"""测试所有必要的模块导入"""
print("=" * 60)
print("测试 1: 模块导入")
print("=" * 60)
try:
print("✓ 导入 utils.logger...")
from utils.logger import logger
print("✓ 导入 utils.crypto...")
from utils.crypto import generate_oai_did, generate_random_password
print("✓ 导入 utils.fingerprint...")
from utils.fingerprint import BrowserFingerprint
print("✓ 导入 core.session...")
from core.session import OAISession
print("✓ 导入 core.sentinel...")
from core.sentinel import SentinelHandler
print("✓ 导入 reference.sentinel_solver...")
from reference.sentinel_solver import SentinelSolver
print("✓ 导入 reference.js_executor...")
from reference.js_executor import JSExecutor
print("\n✅ 所有模块导入成功!\n")
return True
except Exception as e:
print(f"\n❌ 导入失败: {e}\n")
import traceback
traceback.print_exc()
return False
def test_node_availability():
"""测试 Node.js 是否可用"""
print("=" * 60)
print("测试 2: Node.js 环境检查")
print("=" * 60)
import subprocess
try:
result = subprocess.run(
["node", "--version"],
capture_output=True,
text=True,
timeout=5
)
if result.returncode == 0:
version = result.stdout.strip()
print(f"✓ Node.js 已安装: {version}")
return True
else:
print(f"❌ Node.js 执行失败: {result.stderr}")
return False
except FileNotFoundError:
print("❌ Node.js 未安装或不在 PATH 中")
print(" 请安装 Node.js: https://nodejs.org/")
return False
except Exception as e:
print(f"❌ Node.js 检查失败: {e}")
return False
def test_sdk_file():
"""测试 SDK 文件是否存在"""
print("\n" + "=" * 60)
print("测试 3: SDK 文件检查")
print("=" * 60)
sdk_path = Path("/home/carry/myprj/gptAutoPlus/sdk/sdk.js")
if sdk_path.exists():
size = sdk_path.stat().st_size
print(f"✓ SDK 文件存在: {sdk_path}")
print(f" 文件大小: {size:,} bytes ({size/1024:.1f} KB)")
return True
else:
print(f"❌ SDK 文件不存在: {sdk_path}")
print(" 请确保 sdk/sdk.js 文件存在")
return False
def test_fingerprint():
"""测试浏览器指纹生成"""
print("\n" + "=" * 60)
print("测试 4: 浏览器指纹生成")
print("=" * 60)
try:
from utils.fingerprint import BrowserFingerprint
fp = BrowserFingerprint()
config_array = fp.get_config_array()
print(f"✓ 指纹生成成功")
print(f" Session ID: {fp.session_id}")
print(f" 配置数组长度: {len(config_array)}")
print(f" 配置数组前 3 项: {config_array[:3]}")
if len(config_array) == 18:
print("✓ 配置数组长度正确 (18 个元素)")
return True
else:
print(f"❌ 配置数组长度错误: {len(config_array)} (期望 18)")
return False
except Exception as e:
print(f"❌ 指纹生成失败: {e}")
import traceback
traceback.print_exc()
return False
async def test_sentinel_token():
"""测试 Sentinel Token 生成"""
print("\n" + "=" * 60)
print("测试 5: Sentinel Token 生成")
print("=" * 60)
try:
from core.session import OAISession
from core.sentinel import SentinelHandler
print("✓ 创建测试会话...")
session = OAISession()
print(f"✓ Session 创建成功oai-did: {session.oai_did}")
print("✓ 初始化 SentinelHandler...")
sentinel = SentinelHandler(session)
print("✓ 生成 Sentinel Token...")
print(" (这可能需要几秒钟,正在执行 PoW 计算...")
token = await sentinel.get_token()
print(f"\n✅ Sentinel Token 生成成功!")
print(f" Token 前缀: {token[:30]}...")
print(f" Token 长度: {len(token)}")
# 验证 token 格式
if token.startswith("gAAAAA"):
print("✓ Token 格式正确")
return True
else:
print(f"⚠️ Token 格式异常: {token[:20]}...")
return True # 仍然算成功,因为可能是格式变化
except Exception as e:
print(f"\n❌ Sentinel Token 生成失败: {e}")
import traceback
traceback.print_exc()
return False
def test_crypto_utils():
"""测试加密工具"""
print("\n" + "=" * 60)
print("测试 6: 加密工具")
print("=" * 60)
try:
from utils.crypto import (
generate_oai_did,
generate_random_password,
validate_oai_did,
validate_password
)
# 测试 oai-did 生成
oai_did = generate_oai_did()
print(f"✓ OAI-DID 生成: {oai_did}")
is_valid = validate_oai_did(oai_did)
print(f"✓ OAI-DID 验证: {is_valid}")
# 测试密码生成
password = generate_random_password()
print(f"✓ 密码生成: {password}")
is_valid, error = validate_password(password)
print(f"✓ 密码验证: {is_valid} {f'({error})' if error else ''}")
if is_valid:
print("\n✅ 加密工具测试通过!")
return True
else:
print(f"\n❌ 密码验证失败: {error}")
return False
except Exception as e:
print(f"\n❌ 加密工具测试失败: {e}")
import traceback
traceback.print_exc()
return False
async def main():
"""运行所有测试"""
print("\n" + "=" * 60)
print(" OpenAI 注册系统 - Sentinel 集成测试")
print("=" * 60)
print()
results = []
# 运行所有测试
results.append(("模块导入", test_imports()))
results.append(("Node.js 环境", test_node_availability()))
results.append(("SDK 文件", test_sdk_file()))
results.append(("浏览器指纹", test_fingerprint()))
results.append(("加密工具", test_crypto_utils()))
results.append(("Sentinel Token", await test_sentinel_token()))
# 打印总结
print("\n" + "=" * 60)
print(" 测试总结")
print("=" * 60)
for name, passed in results:
status = "✅ 通过" if passed else "❌ 失败"
print(f" {name:20s} {status}")
print("=" * 60)
total = len(results)
passed = sum(1 for _, p in results if p)
print(f"\n总计: {passed}/{total} 个测试通过")
if passed == total:
print("\n🎉 所有测试通过!系统已准备就绪。")
print("\n下一步:")
print(" 1. 配置邮箱(修改 .env 文件)")
print(" 2. 运行主程序: python main.py")
return 0
else:
print("\n⚠️ 部分测试失败,请检查上述错误信息。")
print("\n常见问题:")
print(" - Node.js 未安装: 请安装 Node.js v16+")
print(" - SDK 文件缺失: 确保 sdk/sdk.js 存在")
print(" - 依赖未安装: 运行 pip install -e .")
return 1
if __name__ == "__main__":
try:
exit_code = asyncio.run(main())
sys.exit(exit_code)
except KeyboardInterrupt:
print("\n\n⚠️ 测试被用户中断")
sys.exit(1)
except Exception as e:
print(f"\n\n❌ 测试程序异常: {e}")
import traceback
traceback.print_exc()
sys.exit(1)

View File

@@ -1,81 +1,175 @@
""" # modules/fingerprint.py
浏览器指纹生成模块 """浏览器指纹生成"""
用于生成符合 OpenAI Sentinel 要求的浏览器指纹配置数组
"""
import uuid import uuid
import random
import time import time
from typing import List from datetime import datetime
from typing import Dict, List, Any
from config import load_config
class BrowserFingerprint: class BrowserFingerprint:
""" """生成符合 SDK 期望的浏览器指纹"""
浏览器指纹生成器
生成 Sentinel SDK 所需的配置数组18 个元素)
"""
def __init__(self, session_id: str = None): def __init__(self, session_id: str = None):
"""
初始化浏览器指纹
参数:
session_id: 会话 IDoai-did如果不提供则自动生成
"""
self.session_id = session_id or str(uuid.uuid4()) self.session_id = session_id or str(uuid.uuid4())
self.start_time = time.time()
def get_config_array(self) -> List: # 新增: 使用确定性方法从 session_id 派生 Stripe 指纹
import hashlib
seed = hashlib.sha256(self.session_id.encode()).hexdigest()
# seed 是64个hex字符我们需要确保切片正确
# 从 seed 生成一致的 guid/muid/sid
# UUID需要32个hex字符去掉连字符额外部分直接拼接
self.stripe_guid = seed[:8] + '-' + seed[8:12] + '-' + seed[12:16] + '-' + seed[16:20] + '-' + seed[20:32] + seed[32:40]
self.stripe_muid = seed[:8] + '-' + seed[8:12] + '-' + seed[12:16] + '-' + seed[16:20] + '-' + seed[20:32] + seed[40:46]
self.stripe_sid = seed[:8] + '-' + seed[8:12] + '-' + seed[12:16] + '-' + seed[16:20] + '-' + seed[20:32] + seed[46:52]
# 从配置加载指纹参数
config = load_config()
fingerprint_cfg = config.fingerprint_config
self.user_agent = fingerprint_cfg['user_agent']
self.screen_width = fingerprint_cfg['screen_width']
self.screen_height = fingerprint_cfg['screen_height']
self.languages = fingerprint_cfg['languages']
self.hardware_concurrency = fingerprint_cfg['hardware_concurrency']
def get_config_array(self) -> List[Any]:
""" """
获取 Sentinel SDK 配置数组 生成 SDK getConfig() 函数返回的 18 元素数组
返回: 对应 SDK 源码:
包含 18 个元素的指纹数组 [0]: screen.width + screen.height
[1]: new Date().toString()
数组结构(从 JS 逆向): [2]: performance.memory.jsHeapSizeLimit (可选)
[0] screen dimensions (width*height) [3]: nonce (PoW 填充)
[1] timestamp [4]: navigator.userAgent
[2] memory (hardwareConcurrency) [5]: 随机 script.src
[3] nonce (动态值PoW 时会修改) [6]: build ID
[4] user agent [7]: navigator.language
[5] random element [8]: navigator.languages.join(',')
[6] script src [9]: 运行时间 (PoW 填充)
[7] language [10]: 随机 navigator 属性
[8] languages (joined) [11]: 随机 document key
[9] elapsed time (ms) [12]: 随机 window key
[10] random function test [13]: performance.now()
[11] keys [14]: session UUID
[12] window keys [15]: URL search params
[13] performance.now() [16]: navigator.hardwareConcurrency
[14] uuid (session_id) [17]: performance.timeOrigin
[15] URL params
[16] hardware concurrency
[17] timeOrigin
""" """
elapsed_ms = int((time.time() - self.start_time) * 1000)
# 模拟的 script sources
fake_scripts = [
"https://sentinel.openai.com/sentinel/97790f37/sdk.js",
"https://chatgpt.com/static/js/main.abc123.js",
"https://cdn.oaistatic.com/_next/static/chunks/main.js",
]
# 模拟的 navigator 属性名
navigator_props = [
'hardwareConcurrency', 'language', 'languages',
'platform', 'userAgent', 'vendor'
]
# 模拟的 document keys
document_keys = ['body', 'head', 'documentElement', 'scripts']
# 模拟的 window keys
window_keys = ['performance', 'navigator', 'document', 'location']
current_time = time.time() * 1000
return [ return [
1920 * 1080, # [0] screen dimensions self.screen_width + self.screen_height, # [0]
str(int(time.time() * 1000)), # [1] timestamp str(datetime.now()), # [1]
8, # [2] hardware concurrency None, # [2] memory
0, # [3] nonce (placeholder) None, # [3] nonce (placeholder)
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.0.0 Safari/537.36", # [4] UA self.user_agent, # [4]
str(0.123456789), # [5] random element random.choice(fake_scripts), # [5]
"https://chatgpt.com/_next/static/chunks/sentinel.js", # [6] script src "97790f37", # [6] build ID
"en-US", # [7] language self.languages[0], # [7]
"en-US,en", # [8] languages ",".join(self.languages), # [8]
elapsed_ms, # [9] elapsed time None, # [9] runtime (placeholder)
"", # [10] random function f"{random.choice(navigator_props)}{random.randint(1, 16)}", # [10]
"", # [11] keys random.choice(document_keys), # [11]
"", # [12] window keys random.choice(window_keys), # [12]
elapsed_ms, # [13] performance.now() current_time, # [13]
self.session_id, # [14] uuid (oai-did) self.session_id, # [14]
"", # [15] URL params "", # [15] URL params
8, # [16] hardware concurrency self.hardware_concurrency, # [16]
int(time.time() * 1000) - elapsed_ms, # [17] timeOrigin current_time - random.uniform(100, 1000), # [17] timeOrigin
] ]
def get_cookies(self) -> Dict[str, str]:
"""生成初始 cookies"""
return {
'oai-did': self.session_id,
}
def get_stripe_fingerprint(self) -> Dict[str, str]:
"""获取 Stripe 支付指纹(与 session_id 一致派生)"""
return {
'guid': self.stripe_guid,
'muid': self.stripe_muid,
'sid': self.stripe_sid,
}
def get_headers(self, with_sentinel: str = None, host: str = 'auth.openai.com') -> Dict[str, str]:
"""生成 HTTP headers支持多域名"""
# 基础 headers
headers = {
'User-Agent': self.user_agent,
'Accept': 'application/json',
'Accept-Language': f"{self.languages[0]},{self.languages[1]};q=0.5",
# Note: urllib3/requests only auto-decompress brotli/zstd when optional
# deps are installed; avoid advertising unsupported encodings.
'Accept-Encoding': 'gzip, deflate',
'Connection': 'keep-alive',
'Sec-Fetch-Dest': 'empty',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Site': 'same-origin',
'Priority': 'u=1, i',
'Pragma': 'no-cache',
'Cache-Control': 'no-cache',
}
# 根据域名设置特定 headers
if 'chatgpt.com' in host:
headers.update({
'Origin': 'https://chatgpt.com',
'Referer': 'https://chatgpt.com/',
})
else:
headers.update({
'Origin': 'https://auth.openai.com',
'Referer': 'https://auth.openai.com/create-account/password',
'Content-Type': 'application/json',
})
# Sentinel token
if with_sentinel:
headers['openai-sentinel-token'] = with_sentinel
# Datadog RUM tracing
trace_id = random.randint(10**18, 10**19 - 1)
parent_id = random.randint(10**18, 10**19 - 1)
headers.update({
'traceparent': f'00-0000000000000000{trace_id:016x}-{parent_id:016x}-01',
'tracestate': 'dd=s:1;o:rum',
'x-datadog-origin': 'rum',
'x-datadog-parent-id': str(parent_id),
'x-datadog-sampling-priority': '1',
'x-datadog-trace-id': str(trace_id),
})
return headers
# 导出 # 导出
__all__ = ["BrowserFingerprint"] __all__ = ["BrowserFingerprint"]

View File

@@ -1,14 +1,5 @@
""" """
邮件接码处理器 邮件接码处理器 - 用于接收和解析 OpenAI 发送的验证码邮件
用于接收和解析 OpenAI 发送的验证码邮件
⚠️ 本模块提供预留接口,用户需要根据实际情况配置邮箱服务
支持的邮箱方案:
1. IMAP 收件 (Gmail, Outlook, 自建邮箱)
2. 临时邮箱 API (TempMail, Guerrilla Mail, etc.)
3. 邮件转发服务
""" """
from typing import Optional, Dict, Any, List from typing import Optional, Dict, Any, List
@@ -19,58 +10,23 @@ from utils.logger import logger
class MailHandler: class MailHandler:
""" """邮件接码处理器基类"""
邮件接码处理器
⚠️ 预留接口 - 用户需要配置实际的邮箱服务
使用场景:
- 接收 OpenAI 发送的 6 位数字 OTP 验证码
- 解析邮件内容提取验证码
- 支持超时和重试机制
"""
# OTP 邮件特征
OTP_SUBJECT_KEYWORDS = ["openai", "verification", "verify", "code"] OTP_SUBJECT_KEYWORDS = ["openai", "verification", "verify", "code"]
OTP_SENDER = "noreply@tm.openai.com" # OpenAI 发件人地址 OTP_SENDER = "noreply@tm.openai.com"
def __init__(self, config: Optional[Dict[str, Any]] = None): def __init__(self, config: Optional[Dict[str, Any]] = None):
"""
初始化邮件处理器
参数:
config: 邮箱配置字典,可能包含:
- type: "imap" | "tempmail" | "api" | "cloudmail"
- host: IMAP 服务器地址 (如果使用 IMAP)
- port: IMAP 端口 (默认 993)
- username: 邮箱用户名
- password: 邮箱密码
- api_key: 临时邮箱 API Key (如果使用 API)
"""
self.config = config or {} self.config = config or {}
self.mail_type = self.config.get("type", "not_configured") self.mail_type = self.config.get("type", "not_configured")
if not config: if not config:
logger.warning( logger.warning("MailHandler initialized without configuration.")
"MailHandler initialized without configuration. "
"OTP retrieval will fail until configured."
)
else: else:
logger.info(f"MailHandler initialized with type: {self.mail_type}") logger.info(f"MailHandler initialized with type: {self.mail_type}")
@staticmethod @staticmethod
def create(config: Optional[Dict[str, Any]]) -> "MailHandler": def create(config: Optional[Dict[str, Any]]) -> "MailHandler":
""" """工厂方法:根据配置创建对应的邮件处理器"""
工厂方法:创建合适的邮件处理器
根据配置中的 type 字段自动选择正确的 handler 实现
参数:
config: 邮箱配置字典
返回:
MailHandler 实例IMAPMailHandler 或 CloudMailHandler
"""
if not config: if not config:
return MailHandler(config) return MailHandler(config)
@@ -81,115 +37,22 @@ class MailHandler:
elif mail_type == "cloudmail": elif mail_type == "cloudmail":
return CloudMailHandler(config) return CloudMailHandler(config)
else: else:
# 默认处理器(会抛出 NotImplementedError
return MailHandler(config) return MailHandler(config)
async def wait_for_otp( async def wait_for_otp(self, email: str, timeout: int = 300, check_interval: int = 5) -> str:
self, """等待并提取 OTP 验证码(需子类实现)"""
email: str, logger.info(f"Waiting for OTP for {email} (timeout: {timeout}s)")
timeout: int = 300, raise NotImplementedError("Mail handler not configured.")
check_interval: int = 5
) -> str:
"""
等待并提取 OTP 验证码
⚠️ 预留接口 - 用户需要实现此方法
参数:
email: 注册邮箱地址
timeout: 超时时间(秒),默认 300 秒5 分钟)
check_interval: 检查间隔(秒),默认 5 秒
返回:
6 位数字验证码(例如 "123456"
抛出:
NotImplementedError: 用户需要实现此方法
TimeoutError: 超时未收到邮件
ValueError: 邮件格式错误,无法提取 OTP
集成示例:
```python
# 方案 1: 使用 IMAP (imap-tools 库)
from imap_tools import MailBox
with MailBox(self.config["host"]).login(
self.config["username"],
self.config["password"]
) as mailbox:
for msg in mailbox.fetch(AND(from_=self.OTP_SENDER, seen=False)):
otp = self._extract_otp(msg.text)
if otp:
return otp
# 方案 2: 使用临时邮箱 API
import httpx
async with httpx.AsyncClient() as client:
resp = await client.get(
f"https://tempmail.api/messages?email={email}",
headers={"Authorization": f"Bearer {self.config['api_key']}"}
)
messages = resp.json()
for msg in messages:
otp = self._extract_otp(msg["body"])
if otp:
return otp
# 方案 3: 手动输入(调试用)
print(f"Please enter OTP for {email}:")
return input().strip()
```
"""
logger.info(
f"Waiting for OTP for {email} "
f"(timeout: {timeout}s, check_interval: {check_interval}s)"
)
raise NotImplementedError(
"❌ Mail handler not configured.\n\n"
"User needs to configure email service for OTP retrieval.\n\n"
"Configuration options:\n\n"
"1. IMAP (Gmail, Outlook, custom):\n"
" config = {\n"
" 'type': 'imap',\n"
" 'host': 'imap.gmail.com',\n"
" 'port': 993,\n"
" 'username': 'your@email.com',\n"
" 'password': 'app_password'\n"
" }\n\n"
"2. Temporary email API:\n"
" config = {\n"
" 'type': 'tempmail',\n"
" 'api_key': 'YOUR_API_KEY',\n"
" 'api_endpoint': 'https://api.tempmail.com'\n"
" }\n\n"
"3. Manual input (for debugging):\n"
" config = {'type': 'manual'}\n\n"
"Example implementation location: utils/mail_box.py -> wait_for_otp()"
)
def _extract_otp(self, text: str) -> Optional[str]: def _extract_otp(self, text: str) -> Optional[str]:
""" """从邮件正文中提取 6 位 OTP 验证码"""
从邮件正文中提取 OTP 验证码
OpenAI 邮件格式示例:
"Your OpenAI verification code is: 123456"
"Enter this code: 123456"
参数:
text: 邮件正文(纯文本或 HTML
返回:
6 位数字验证码,未找到则返回 None
"""
# 清理 HTML 标签(如果有)
text = re.sub(r'<[^>]+>', ' ', text) text = re.sub(r'<[^>]+>', ' ', text)
# 常见的 OTP 模式
patterns = [ patterns = [
r'verification code is[:\s]+(\d{6})', # "verification code is: 123456" r'verification code is[:\s]+(\d{6})',
r'code[:\s]+(\d{6})', # "code: 123456" r'code[:\s]+(\d{6})',
r'enter[:\s]+(\d{6})', # "enter: 123456" r'enter[:\s]+(\d{6})',
r'(\d{6})', # 任意 6 位数字(最后尝试) r'(\d{6})',
] ]
for pattern in patterns: for pattern in patterns:
@@ -202,118 +65,27 @@ class MailHandler:
logger.warning("Failed to extract OTP from email text") logger.warning("Failed to extract OTP from email text")
return None return None
def _check_imap(self, email: str, timeout: int) -> Optional[str]:
"""
使用 IMAP 检查邮件(预留方法)
参数:
email: 注册邮箱
timeout: 超时时间(秒)
返回:
OTP 验证码,未找到则返回 None
"""
# TODO: 用户实现 IMAP 检查逻辑
# 需要安装: pip install imap-tools
raise NotImplementedError("IMAP checking not implemented")
def _check_tempmail_api(self, email: str, timeout: int) -> Optional[str]:
"""
使用临时邮箱 API 检查邮件(预留方法)
参数:
email: 注册邮箱
timeout: 超时时间(秒)
返回:
OTP 验证码,未找到则返回 None
"""
# TODO: 用户实现临时邮箱 API 调用逻辑
raise NotImplementedError("Temp mail API not implemented")
def generate_temp_email(self) -> str:
"""
生成临时邮箱地址(可选功能)
⚠️ 预留接口 - 如果使用临时邮箱服务,需要实现此方法
返回:
临时邮箱地址(例如 "random123@tempmail.com"
抛出:
NotImplementedError: 用户需要实现此方法
"""
raise NotImplementedError(
"Temp email generation not implemented. "
"Integrate a temp mail service API if needed."
)
def verify_email_deliverability(self, email: str) -> bool: def verify_email_deliverability(self, email: str) -> bool:
""" """验证邮箱地址格式是否有效"""
验证邮箱地址是否可以接收邮件(可选功能)
参数:
email: 邮箱地址
返回:
True 如果邮箱有效且可接收邮件,否则 False
"""
# 基本格式验证
email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$' email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
if not re.match(email_pattern, email): if not re.match(email_pattern, email):
logger.warning(f"Invalid email format: {email}") logger.warning(f"Invalid email format: {email}")
return False return False
# TODO: 用户可以添加更严格的验证逻辑
# 例如DNS MX 记录查询、SMTP 验证等
logger.info(f"Email format valid: {email}")
return True return True
class IMAPMailHandler(MailHandler): class IMAPMailHandler(MailHandler):
""" """基于 IMAP 的邮件处理器"""
基于 IMAP 的邮件处理器(完整实现示例)
⚠️ 这是一个参考实现,用户可以根据需要修改 async def wait_for_otp(self, email: str, timeout: int = 300, check_interval: int = 5) -> str:
"""使用 IMAP 等待 OTP 邮件"""
依赖:
pip install imap-tools
"""
async def wait_for_otp(
self,
email: str,
timeout: int = 300,
check_interval: int = 5
) -> str:
"""
使用 IMAP 等待 OTP 邮件
参数:
email: 注册邮箱
timeout: 超时时间(秒)
check_interval: 检查间隔(秒)
返回:
6 位数字验证码
抛出:
ImportError: 未安装 imap-tools
TimeoutError: 超时未收到邮件
ValueError: 配置错误或邮件格式错误
"""
try: try:
from imap_tools import MailBox, AND from imap_tools import MailBox, AND
except ImportError: except ImportError:
raise ImportError( raise ImportError("imap-tools not installed. Install with: pip install imap-tools")
"imap-tools not installed. Install with: pip install imap-tools"
)
if not all(k in self.config for k in ["host", "username", "password"]): if not all(k in self.config for k in ["host", "username", "password"]):
raise ValueError( raise ValueError("IMAP configuration incomplete. Required: host, username, password")
"IMAP configuration incomplete. Required: host, username, password"
)
start_time = time.time() start_time = time.time()
logger.info(f"Connecting to IMAP server: {self.config['host']}") logger.info(f"Connecting to IMAP server: {self.config['host']}")
@@ -324,86 +96,44 @@ class IMAPMailHandler(MailHandler):
self.config["username"], self.config["username"],
self.config["password"] self.config["password"]
) as mailbox: ) as mailbox:
# 查找未读邮件,来自 OpenAI
for msg in mailbox.fetch( for msg in mailbox.fetch(
AND(from_=self.OTP_SENDER, seen=False), AND(from_=self.OTP_SENDER, seen=False),
reverse=True, # 最新的邮件优先 reverse=True,
limit=10 limit=10
): ):
# 检查主题是否包含 OTP 关键词
if any(kw in msg.subject.lower() for kw in self.OTP_SUBJECT_KEYWORDS): if any(kw in msg.subject.lower() for kw in self.OTP_SUBJECT_KEYWORDS):
otp = self._extract_otp(msg.text or msg.html) otp = self._extract_otp(msg.text or msg.html)
if otp: if otp:
logger.success(f"OTP received: {otp}") logger.success(f"OTP received: {otp}")
# 标记为已读
mailbox.flag([msg.uid], ['\\Seen'], True) mailbox.flag([msg.uid], ['\\Seen'], True)
return otp return otp
except Exception as e: except Exception as e:
logger.warning(f"IMAP check failed: {e}") logger.warning(f"IMAP check failed: {e}")
# 等待下一次检查 logger.debug(f"No OTP found, waiting {check_interval}s")
elapsed = time.time() - start_time
remaining = timeout - elapsed
logger.debug(
f"No OTP found, waiting {check_interval}s "
f"(remaining: {int(remaining)}s)"
)
time.sleep(check_interval) time.sleep(check_interval)
raise TimeoutError( raise TimeoutError(f"Timeout waiting for OTP email ({timeout}s)")
f"Timeout waiting for OTP email (timeout: {timeout}s). "
f"Email: {email}, Sender: {self.OTP_SENDER}"
)
class CloudMailHandler(MailHandler): class CloudMailHandler(MailHandler):
""" """Cloud Mail API 邮件处理器"""
Cloud Mail API handler with external token management
使用外部预生成的 Token 管理邮件,不调用 genToken API
依赖:
pip install httpx
配置示例:
config = {
"type": "cloudmail",
"api_base_url": "https://your-cloudmail-domain.com",
"token": "9f4e298e-7431-4c76-bc15-4931c3a73984",
"target_email": "user@example.com" # 可选
}
"""
def __init__(self, config: Optional[Dict[str, Any]] = None): def __init__(self, config: Optional[Dict[str, Any]] = None):
"""
初始化 Cloud Mail handler
参数:
config: 配置字典,必填项:
- api_base_url: Cloud Mail API 基础 URL
- token: 预生成的身份令牌
- target_email: (可选) 指定监控的邮箱地址
"""
super().__init__(config) super().__init__(config)
# 验证必填配置
required = ["api_base_url", "token"] required = ["api_base_url", "token"]
missing = [key for key in required if not self.config.get(key)] missing = [key for key in required if not self.config.get(key)]
if missing: if missing:
raise ValueError( raise ValueError(f"CloudMail config incomplete. Missing: {', '.join(missing)}")
f"CloudMail configuration incomplete. Missing: {', '.join(missing)}\n"
f"Required: api_base_url, token"
)
self.api_base_url = self.config["api_base_url"].rstrip("/") self.api_base_url = self.config["api_base_url"].rstrip("/")
self.token = self.config["token"] self.token = self.config["token"]
self.target_email = self.config.get("target_email") self.target_email = self.config.get("target_email")
self.domain = self.config.get("domain") # 邮箱域名 self.domain = self.config.get("domain")
self._client: Optional[Any] = None self._client: Optional[Any] = None
logger.info(f"CloudMailHandler initialized (API: {self.api_base_url}, Domain: {self.domain or 'N/A'})") logger.info(f"CloudMailHandler initialized (API: {self.api_base_url})")
async def _get_client(self): async def _get_client(self):
"""懒加载 HTTP 客户端""" """懒加载 HTTP 客户端"""
@@ -411,9 +141,7 @@ class CloudMailHandler(MailHandler):
try: try:
import httpx import httpx
except ImportError: except ImportError:
raise ImportError( raise ImportError("httpx not installed. Install with: pip install httpx")
"httpx not installed. Install with: pip install httpx"
)
self._client = httpx.AsyncClient( self._client = httpx.AsyncClient(
timeout=30.0, timeout=30.0,
@@ -433,33 +161,19 @@ class CloudMailHandler(MailHandler):
num: int = 1, num: int = 1,
size: int = 20 size: int = 20
) -> List[Dict[str, Any]]: ) -> List[Dict[str, Any]]:
""" """查询邮件列表 (POST /api/public/emailList)"""
查询邮件列表 (POST /api/public/emailList)
参数:
to_email: 收件人邮箱
send_email: 发件人邮箱(可选,支持 % 通配符)
subject: 主题关键词(可选)
time_sort: 时间排序 (desc/asc)
num: 页码(从 1 开始)
size: 每页数量
返回:
邮件列表
"""
client = await self._get_client() client = await self._get_client()
url = f"{self.api_base_url}/api/public/emailList" url = f"{self.api_base_url}/api/public/emailList"
payload = { payload = {
"toEmail": to_email, "toEmail": to_email,
"type": 0, # 0=收件箱 "type": 0,
"isDel": 0, # 0=未删除 "isDel": 0,
"timeSort": time_sort, "timeSort": time_sort,
"num": num, "num": num,
"size": size "size": size
} }
# 可选参数
if send_email: if send_email:
payload["sendEmail"] = send_email payload["sendEmail"] = send_email
if subject: if subject:
@@ -468,38 +182,17 @@ class CloudMailHandler(MailHandler):
try: try:
resp = await client.post(url, json=payload) resp = await client.post(url, json=payload)
# 检查认证错误
if resp.status_code in [401, 403]: if resp.status_code in [401, 403]:
raise RuntimeError( raise RuntimeError("CloudMail token expired or invalid.")
"CloudMail token expired or invalid.\n"
"Please regenerate token and update MAIL_CLOUDMAIL_TOKEN in .env\n"
"Steps:\n"
"1. Login to Cloud Mail with admin account\n"
"2. Call POST /api/public/genToken to generate new token\n"
"3. Update MAIL_CLOUDMAIL_TOKEN in .env\n"
"4. Restart the program"
)
if resp.status_code != 200: if resp.status_code != 200:
raise RuntimeError( raise RuntimeError(f"CloudMail API error: {resp.status_code}")
f"CloudMail API error: {resp.status_code} - {resp.text[:200]}"
)
data = resp.json() data = resp.json()
# 检查业务逻辑错误
if data.get("code") != 200: if data.get("code") != 200:
error_msg = data.get("message", "Unknown error") raise RuntimeError(f"CloudMail API error: {data.get('message')}")
raise RuntimeError(
f"CloudMail API error: {error_msg} (code: {data.get('code')})"
)
# 返回邮件列表
result = data.get("data", {}) result = data.get("data", {})
# CloudMail API 可能返回两种格式:
# 1. {"data": {"list": [...]}} - 标准格式
# 2. {"data": [...]} - 直接列表格式
if isinstance(result, list): if isinstance(result, list):
emails = result emails = result
elif isinstance(result, dict): elif isinstance(result, dict):
@@ -512,174 +205,102 @@ class CloudMailHandler(MailHandler):
except Exception as e: except Exception as e:
if "httpx" in str(type(e).__module__): if "httpx" in str(type(e).__module__):
# httpx 网络错误
raise RuntimeError(f"CloudMail API network error: {e}") raise RuntimeError(f"CloudMail API network error: {e}")
else: raise
# 重新抛出其他错误
raise
async def ensure_email_exists(self, email: str) -> bool: async def ensure_email_exists(self, email: str) -> bool:
""" """确保邮箱账户存在(如果不存在则创建)"""
确保邮箱账户存在(如果不存在则创建) logger.info(f"CloudMail: Creating email account {email}...")
用于在注册流程开始前自动创建 Cloud Mail 邮箱账户
参数:
email: 邮箱地址
返回:
True 如果邮箱已存在或成功创建
"""
try: try:
# 先尝试查询邮箱(检查是否存在) result = await self.add_users([{"email": email}])
logger.debug(f"CloudMail: Checking if {email} exists...") logger.success(f"CloudMail: Email {email} created successfully")
emails = await self._query_emails( logger.debug(f"CloudMail: API response: {result}")
to_email=email,
size=1
)
# 如果能查询到,说明邮箱存在
logger.debug(f"CloudMail: Email {email} already exists")
return True return True
except Exception as e: except RuntimeError as e:
# 查询失败可能是邮箱不存在,尝试创建 error_msg = str(e).lower()
logger.info(f"CloudMail: Creating email account {email}...") if "already" in error_msg or "exist" in error_msg or "duplicate" in error_msg:
logger.debug(f"CloudMail: Email {email} already exists")
try:
await self.add_users([{"email": email}])
logger.success(f"CloudMail: Email {email} created successfully")
return True return True
except Exception as create_error: logger.error(f"CloudMail: Failed to create email {email}: {e}")
logger.error(f"CloudMail: Failed to create email {email}: {create_error}") raise
raise
async def wait_for_otp( except Exception as e:
self, logger.error(f"CloudMail: Failed to create email {email}: {e}")
email: str, raise
timeout: int = 300,
check_interval: int = 5
) -> str:
"""
等待 OTP 邮件(轮询实现)
参数: async def wait_for_otp(self, email: str, timeout: int = 300, check_interval: int = 5) -> str:
email: 注册邮箱 """等待 OTP 邮件(轮询实现)"""
timeout: 超时时间(秒)
check_interval: 检查间隔(秒)
返回:
6 位数字验证码
抛出:
TimeoutError: 超时未收到邮件
ValueError: 邮件格式错误,无法提取 OTP
"""
start_time = time.time() start_time = time.time()
logger.info( logger.info(f"CloudMail: Waiting for OTP for {email} (timeout: {timeout}s)")
f"CloudMail: Waiting for OTP for {email} "
f"(timeout: {timeout}s, interval: {check_interval}s)"
)
while time.time() - start_time < timeout: while time.time() - start_time < timeout:
try: try:
# 查询最近的邮件 # 模糊匹配 openai 发件人
emails = await self._query_emails( emails = await self._query_emails(
to_email=email, to_email=email,
send_email=self.OTP_SENDER, send_email="%openai%",
time_sort="desc", time_sort="desc",
size=10 size=10
) )
# 检查每封邮件 # 备选:不过滤发件人
if not emails:
emails = await self._query_emails(
to_email=email,
time_sort="desc",
size=10
)
for msg in emails: for msg in emails:
subject = msg.get("subject", "").lower() subject = msg.get("subject", "").lower()
sender = msg.get("sendEmail", "").lower()
is_from_openai = "openai" in sender or "noreply" in sender
# 检查主题是否包含 OTP 关键词 if any(kw in subject for kw in self.OTP_SUBJECT_KEYWORDS) or is_from_openai:
if any(kw in subject for kw in self.OTP_SUBJECT_KEYWORDS):
# 尝试从邮件内容提取 OTP
content = msg.get("text") or msg.get("content") or "" content = msg.get("text") or msg.get("content") or ""
otp = self._extract_otp(content) otp = self._extract_otp(content)
if otp: if otp:
logger.success(f"CloudMail: OTP received: {otp}") logger.success(f"CloudMail: OTP received: {otp} (from: {sender})")
return otp return otp
# 等待下一次检查 remaining = timeout - (time.time() - start_time)
elapsed = time.time() - start_time logger.debug(f"CloudMail: No OTP found, waiting {check_interval}s (remaining: {int(remaining)}s)")
remaining = timeout - elapsed
logger.debug(
f"CloudMail: No OTP found, waiting {check_interval}s "
f"(remaining: {int(remaining)}s)"
)
await asyncio.sleep(check_interval) await asyncio.sleep(check_interval)
except Exception as e: except Exception as e:
logger.warning(f"CloudMail: Query error: {e}") logger.warning(f"CloudMail: Query error: {e}")
await asyncio.sleep(check_interval) await asyncio.sleep(check_interval)
raise TimeoutError( raise TimeoutError(f"Timeout waiting for OTP email ({timeout}s)")
f"Timeout waiting for OTP email (timeout: {timeout}s). "
f"Email: {email}, Sender: {self.OTP_SENDER}"
)
async def add_users( async def add_users(self, users: List[Dict[str, str]]) -> Dict[str, Any]:
self, """添加用户 (POST /api/public/addUser)"""
users: List[Dict[str, str]]
) -> Dict[str, Any]:
"""
添加用户 (POST /api/public/addUser)
参数:
users: 用户列表,格式:
[
{
"email": "test@example.com",
"password": "optional", # 可选
"roleName": "optional" # 可选
}
]
返回:
API 响应数据
"""
client = await self._get_client() client = await self._get_client()
url = f"{self.api_base_url}/api/public/addUser" url = f"{self.api_base_url}/api/public/addUser"
payload = {"list": users} payload = {"list": users}
try: try:
resp = await client.post(url, json=payload) resp = await client.post(url, json=payload)
# 检查认证错误
if resp.status_code in [401, 403]: if resp.status_code in [401, 403]:
raise RuntimeError( raise RuntimeError("CloudMail token expired or invalid.")
"CloudMail token expired or invalid. "
"Please regenerate token and update MAIL_CLOUDMAIL_TOKEN in .env"
)
if resp.status_code != 200: if resp.status_code != 200:
raise RuntimeError( raise RuntimeError(f"CloudMail addUser error: {resp.status_code}")
f"CloudMail addUser API error: {resp.status_code} - {resp.text[:200]}"
)
data = resp.json() data = resp.json()
# 检查业务逻辑错误
if data.get("code") != 200: if data.get("code") != 200:
error_msg = data.get("message", "Unknown error") raise RuntimeError(f"CloudMail addUser error: {data.get('message')}")
raise RuntimeError(
f"CloudMail addUser error: {error_msg} (code: {data.get('code')})"
)
logger.info(f"CloudMail: Users added successfully: {len(users)} users") logger.info(f"CloudMail: Users added: {len(users)}")
return data return data
except Exception as e: except Exception as e:
if "httpx" in str(type(e).__module__): if "httpx" in str(type(e).__module__):
raise RuntimeError(f"CloudMail API network error: {e}") raise RuntimeError(f"CloudMail API network error: {e}")
else: raise
raise
async def close(self): async def close(self):
"""清理资源""" """清理资源"""
@@ -688,9 +309,4 @@ class CloudMailHandler(MailHandler):
logger.debug("CloudMail: HTTP client closed") logger.debug("CloudMail: HTTP client closed")
# 导出主要接口 __all__ = ["MailHandler", "IMAPMailHandler", "CloudMailHandler"]
__all__ = [
"MailHandler",
"IMAPMailHandler",
"CloudMailHandler",
]