diff --git a/.env.example b/.env.example index af73811..ff534c5 100644 --- a/.env.example +++ b/.env.example @@ -73,6 +73,10 @@ SENTINEL_API_KEY= # Python 模块名称(如果使用 module) SENTINEL_MODULE_NAME= +# Sentinel 内部配置 +SENTINEL_DEBUG=false +SENTINEL_SDK_PATH=sdk/sdk.js + # ========== TLS 指纹配置 ========== # 模拟的浏览器版本(chrome110, chrome120, chrome124) TLS_IMPERSONATE=chrome124 diff --git a/.gitignore b/.gitignore index 3b29a30..7b01424 100644 --- a/.gitignore +++ b/.gitignore @@ -31,3 +31,8 @@ results_*.json # OS .DS_Store Thumbs.db + +/docs + +/.claude + diff --git a/config.py b/config.py index efe0cfd..fe4e475 100644 --- a/config.py +++ b/config.py @@ -262,6 +262,13 @@ class AppConfig(BaseSettings): sentinel_api_key: Optional[str] = Field(default=None, description="API Key") sentinel_module_name: Optional[str] = Field(default=None, description="模块名称") + # Sentinel 内部配置 + sentinel_debug: bool = Field(default=False, description="Sentinel 调试模式") + sentinel_sdk_path: str = Field( + default="sdk/sdk.js", + description="Sentinel SDK JS 文件路径(相对于项目根目录)" + ) + # ========== TLS 指纹配置 ========== tls_impersonate: Literal["chrome110", "chrome120", "chrome124"] = Field( default="chrome124", @@ -337,6 +344,17 @@ class AppConfig(BaseSettings): module_name=self.sentinel_module_name, ) + @property + def fingerprint_config(self) -> Dict[str, Any]: + """获取指纹配置""" + return { + "user_agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.0.0 Safari/537.36", + "screen_width": 1920, + "screen_height": 1080, + "languages": ["en-US", "en"], + "hardware_concurrency": 8, + } + def validate_config(self) -> List[str]: """ 验证配置完整性,返回警告列表 diff --git a/core/flow.py b/core/flow.py index 49ae69e..16efd2d 100644 --- a/core/flow.py +++ b/core/flow.py @@ -1,32 +1,13 @@ -""" -OpenAI 账号注册流程编排模块 - -完整的注册流程实现,包含以下步骤: -1. 初始化会话(访问主页 + API providers) -2. 获取 CSRF Token -3. OAuth 流程(跳转到 auth.openai.com) -4. Sentinel 握手(获取 Token) -5. 提交注册信息(邮箱 + 密码) -6. 触发邮件验证(可能遇到 Cloudflare 403) -7. 提交 OTP 验证码 -8. 完成用户信息(姓名 + 生日) - -参考文档: /home/carry/myprj/gptAutoPlus/docs/开发文档.md -""" +"""OpenAI 账号注册流程编排模块""" from typing import Dict, Any, Optional import secrets import random -import asyncio import json +import uuid -from core.session import ( - OAISession, - CloudflareBlockError, - SessionInvalidError, - RateLimitError -) -from core.sentinel import SentinelHandler +from core.session import OAISession, CloudflareBlockError, SessionInvalidError, RateLimitError +from core.sentinel_handler import SentinelHandler from core.challenge import CloudflareSolver from utils.mail_box import MailHandler from utils.crypto import generate_random_password @@ -34,13 +15,8 @@ from utils.logger import logger class RegisterFlow: - """ - OpenAI 账号注册流程编排器 + """OpenAI 账号注册流程编排器""" - 负责协调各个组件,按照正确的顺序执行注册流程 - """ - - # OpenAI 相关 URL CHATGPT_HOME = "https://chatgpt.com/" CHATGPT_PROVIDERS = "https://chatgpt.com/api/auth/providers" CHATGPT_CSRF = "https://chatgpt.com/api/auth/csrf" @@ -52,96 +28,47 @@ class RegisterFlow: AUTH_VALIDATE_OTP = "https://auth.openai.com/api/accounts/email-otp/validate" AUTH_COMPLETE_PROFILE = "https://auth.openai.com/api/accounts/create_account" - def __init__( - self, - session: OAISession, - config, - email: Optional[str] = None, - password: Optional[str] = None - ): - """ - 初始化注册流程 - - 参数: - session: OAISession 实例(已配置 TLS 指纹和代理) - config: AppConfig 配置对象 - email: 注册邮箱(可选,不提供则自动生成) - password: 密码(可选,不提供则自动生成) - """ + def __init__(self, session: OAISession, config, email: Optional[str] = None, password: Optional[str] = None): self.s = session self.config = config self.email = email or self._generate_email() self.password = password or generate_random_password() - # 初始化子模块 self.sentinel = SentinelHandler(session) - self.mail = MailHandler.create( - config.mail.to_dict() if config.mail.enabled else None - ) + self.mail = MailHandler.create(config.mail.to_dict() if config.mail.enabled else None) self.cloudflare_solver = CloudflareSolver() - # 流程状态 self.csrf_token: Optional[str] = None self.sentinel_token: Optional[Dict[str, Any]] = None self.otp: Optional[str] = None - logger.info( - f"RegisterFlow initialized for {self.email} " - f"(oai-did: {self.s.oai_did})" - ) + logger.info(f"RegisterFlow initialized for {self.email} (oai-did: {self.s.oai_did})") async def run(self) -> Dict[str, Any]: - """ - 执行完整注册流程 - - 返回: - 注册结果字典,包含: - - email: 注册邮箱 - - password: 密码 - - status: 状态 ("success", "failed", "pending_otp", etc.) - - error: 错误信息(如果失败) - - message: 额外信息 - """ + """执行完整注册流程""" try: logger.info(f"[{self.email}] Starting registration flow") - # Step 0: 如果使用 CloudMail,确保邮箱账户存在 + # Step 0: CloudMail 邮箱账户 if self.config.mail.enabled and self.config.mail.type == "cloudmail": try: from utils.mail_box import CloudMailHandler if isinstance(self.mail, CloudMailHandler): - logger.info(f"[{self.email}] Step 0: Ensuring email account exists in CloudMail") + logger.info(f"[{self.email}] Step 0: Ensuring email account exists") await self.mail.ensure_email_exists(self.email) logger.info(f"[{self.email}] ✓ Email account ready") except Exception as e: logger.warning(f"[{self.email}] Failed to create CloudMail account: {e}") - # 继续执行,可能邮箱已经存在 - # Step 1: 初始化会话 await self._step1_init_session() - - # Step 2: 获取 CSRF Token await self._step2_get_csrf_token() - - # Step 3: OAuth 流程 await self._step3_oauth_signin() - - # Step 4: Sentinel 握手 await self._step4_get_sentinel_token() - - # Step 5: 提交注册信息 await self._step5_submit_registration() - - # Step 6: 触发邮件验证 await self._step6_send_email_otp() - - # Step 7: 提交 OTP await self._step7_submit_otp() - - # Step 8: 完成用户信息 await self._step8_complete_profile() - # 注册成功 logger.success(f"[{self.email}] Registration completed successfully! ✅") return { "email": self.email, @@ -153,437 +80,197 @@ class RegisterFlow: except CloudflareBlockError as e: logger.error(f"[{self.email}] Cloudflare blocked: {e}") - return { - "email": self.email, - "password": self.password, - "status": "cloudflare_blocked", - "error": str(e), - "message": "Encountered Cloudflare challenge. Consider using residential proxy or solver." - } + return {"email": self.email, "password": self.password, "status": "cloudflare_blocked", "error": str(e)} except SessionInvalidError as e: - logger.error(f"[{self.email}] Session invalid (409): {e}") - return { - "email": self.email, - "password": self.password, - "status": "session_invalid", - "error": str(e), - "message": "Session conflict. CSRF token expired. Retry recommended." - } + logger.error(f"[{self.email}] Session invalid: {e}") + return {"email": self.email, "password": self.password, "status": "session_invalid", "error": str(e)} except RateLimitError as e: - logger.error(f"[{self.email}] Rate limited (429): {e}") - return { - "email": self.email, - "password": self.password, - "status": "rate_limited", - "error": str(e), - "message": "Rate limit exceeded. Wait and retry with different IP." - } + logger.error(f"[{self.email}] Rate limited: {e}") + return {"email": self.email, "password": self.password, "status": "rate_limited", "error": str(e)} except NotImplementedError as e: logger.warning(f"[{self.email}] Feature not implemented: {e}") - return { - "email": self.email, - "password": self.password, - "status": "pending_manual", - "error": str(e), - "message": "Partial success. User needs to complete manual steps (Sentinel or OTP)." - } + return {"email": self.email, "password": self.password, "status": "pending_manual", "error": str(e)} except Exception as e: - logger.exception(f"[{self.email}] Unexpected error during registration") - return { - "email": self.email, - "password": self.password, - "status": "failed", - "error": str(e), - "message": f"Registration failed: {type(e).__name__}" - } + logger.exception(f"[{self.email}] Unexpected error") + return {"email": self.email, "password": self.password, "status": "failed", "error": str(e)} async def _step1_init_session(self): - """ - Step 1: 初始化会话 - - 访问 ChatGPT 主页和 API providers 端点,建立基础会话 - """ + """Step 1: 初始化会话""" logger.info(f"[{self.email}] Step 1: Initializing session") - - # 访问主页 - resp = self.s.get(self.CHATGPT_HOME) - logger.debug(f" - GET {self.CHATGPT_HOME}: {resp.status_code}") - - # 获取 auth providers - resp = self.s.get(self.CHATGPT_PROVIDERS) - logger.debug(f" - GET {self.CHATGPT_PROVIDERS}: {resp.status_code}") - + self.s.get(self.CHATGPT_HOME) + self.s.get(self.CHATGPT_PROVIDERS) logger.info(f"[{self.email}] ✓ Session initialized") async def _step2_get_csrf_token(self): - """ - Step 2: 获取 CSRF Token - - CSRF Token 用于后续的 OAuth 登录流程 - """ + """Step 2: 获取 CSRF Token""" logger.info(f"[{self.email}] Step 2: Getting CSRF token") - resp = self.s.get(self.CHATGPT_CSRF) - if resp.status_code != 200: raise RuntimeError(f"Failed to get CSRF token: {resp.status_code}") - - data = resp.json() - self.csrf_token = data.get("csrfToken") - + self.csrf_token = resp.json().get("csrfToken") if not self.csrf_token: - raise RuntimeError(f"CSRF token not found in response: {data}") - - logger.info(f"[{self.email}] ✓ CSRF token obtained: {self.csrf_token[:20]}...") + raise RuntimeError("CSRF token not found") + logger.info(f"[{self.email}] ✓ CSRF token obtained") async def _step3_oauth_signin(self): - """ - Step 3: OAuth 登录流程 - - 启动 OAuth 流程,跳转到 auth.openai.com - 确保获取所有必要的 session cookies - """ + """Step 3: OAuth 登录流程""" logger.info(f"[{self.email}] Step 3: Starting OAuth flow") - # 生成 auth_session_logging_id - import uuid - auth_session_logging_id = str(uuid.uuid4()) - - # 发起 OAuth signin 请求(添加关键参数) signin_params = { 'prompt': 'login', 'ext-oai-did': self.s.oai_did, - 'auth_session_logging_id': auth_session_logging_id, - 'screen_hint': 'signup', # 🔥 明确指定注册 - 'login_hint': self.email, # 🔥 传入邮箱 + 'auth_session_logging_id': str(uuid.uuid4()), + 'screen_hint': 'signup', + 'login_hint': self.email, } - payload = { - "callbackUrl": "/", - "csrfToken": self.csrf_token, - "json": "true" - } + payload = {"callbackUrl": "/", "csrfToken": self.csrf_token, "json": "true"} resp = self.s.post( self.CHATGPT_SIGNIN, - params=signin_params, # ✅ 添加 URL 参数 + params=signin_params, data=payload, headers={"Content-Type": "application/x-www-form-urlencoded"} ) if resp.status_code != 200: - raise RuntimeError(f"OAuth signin failed: {resp.status_code} - {resp.text[:200]}") - - data = resp.json() - auth_url = data.get("url") + raise RuntimeError(f"OAuth signin failed: {resp.status_code}") + auth_url = resp.json().get("url") if not auth_url: - raise RuntimeError(f"OAuth URL not found in response: {data}") + raise RuntimeError("OAuth URL not found") - logger.debug(f" - OAuth URL: {auth_url[:100]}...") - - # 访问 OAuth 跳转链接(建立 auth.openai.com 会话) - # 这一步会设置关键的 cookies: login_session, oai-client-auth-session, auth_provider 等 - resp = self.s.get(auth_url, allow_redirects=True) - logger.debug(f" - GET OAuth URL: {resp.status_code}") - logger.debug(f" - Final URL after redirects: {resp.url}") - - # 检查关键 cookies 是否已设置 - important_cookies = ["login_session", "oai-client-auth-session"] - cookies_status = { - cookie: cookie in self.s.client.cookies - for cookie in important_cookies - } - logger.debug(f" - Cookies status: {cookies_status}") - - # 访问注册页面 - resp = self.s.get(self.AUTH_CREATE_ACCOUNT) - logger.debug(f" - GET create-account page: {resp.status_code}") - - logger.info(f"[{self.email}] ✓ OAuth flow completed, redirected to auth.openai.com") + self.s.get(auth_url, allow_redirects=True) + self.s.get(self.AUTH_CREATE_ACCOUNT) + logger.info(f"[{self.email}] ✓ OAuth flow completed") async def _step4_get_sentinel_token(self): - """ - Step 4: Sentinel 握手 - - 获取 Sentinel Token 用于提交注册信息 - ✅ 已集成完整的 Sentinel 解决方案 - """ + """Step 4: Sentinel 握手""" logger.info(f"[{self.email}] Step 4: Getting Sentinel token") - try: - self.sentinel_token = await self.sentinel.get_token( - flow="username_password_create" - ) - logger.info(f"[{self.email}] ✓ Sentinel token obtained: {str(self.sentinel_token)[:50]}...") - + self.sentinel_token = await self.sentinel.get_token(flow="username_password_create") + logger.info(f"[{self.email}] ✓ Sentinel token obtained") except (NotImplementedError, ImportError) as e: - logger.error( - f"[{self.email}] ❌ Sentinel solver not available: {e}" - ) - # 重新抛出异常,让调用方知道需要修复 + logger.error(f"[{self.email}] Sentinel solver not available: {e}") raise async def _step5_submit_registration(self): - """ - Step 5: 提交注册信息 - - POST /api/accounts/user/register - 提交用户名(邮箱)、密码,Sentinel Token 放在 Header 中 - """ + """Step 5: 提交注册信息""" logger.info(f"[{self.email}] Step 5: Submitting registration") - # 请求 Body:username 和 password - payload = { - "username": self.email, # ✅ 改为 username - "password": self.password - } + payload = {"username": self.email, "password": self.password} - # Sentinel Token 作为 Header 传递(JSON 字符串格式) headers = { "Content-Type": "application/json", "Accept": "application/json", "Origin": "https://auth.openai.com", "Referer": self.AUTH_CREATE_ACCOUNT, - "Openai-Sentinel-Token": json.dumps(self.sentinel_token), # ✅ 注意大小写 - "Sec-Fetch-Dest": "empty", - "Sec-Fetch-Mode": "cors", - "Sec-Fetch-Site": "same-origin", - "Priority": "u=1, i", - } - - # 添加 Datadog tracing headers(模拟真实浏览器) - headers.update({ + "Openai-Sentinel-Token": json.dumps(self.sentinel_token), "X-Datadog-Trace-Id": str(secrets.randbits(63)), "X-Datadog-Parent-Id": str(secrets.randbits(63)), "X-Datadog-Sampling-Priority": "1", "X-Datadog-Origin": "rum", "Traceparent": f"00-0000000000000000{secrets.token_hex(8)}-{secrets.token_hex(8)}-01", "Tracestate": "dd=s:1;o:rum", - }) + } - resp = self.s.post( - self.AUTH_REGISTER, - json=payload, - headers=headers - ) + resp = self.s.post(self.AUTH_REGISTER, json=payload, headers=headers) if resp.status_code != 200: - error_msg = resp.text[:300] - logger.error(f" - Registration failed: {resp.status_code} - {error_msg}") - - # 检查常见错误 if "email already exists" in resp.text.lower(): raise ValueError(f"Email already registered: {self.email}") - elif "invalid token" in resp.text.lower(): - raise ValueError("Invalid Sentinel token. Check your Sentinel solver.") - else: - raise RuntimeError(f"Registration submission failed: {resp.status_code} - {error_msg}") + raise RuntimeError(f"Registration failed: {resp.status_code}") - logger.info(f"[{self.email}] ✓ Registration info submitted successfully") + logger.info(f"[{self.email}] ✓ Registration submitted") async def _step6_send_email_otp(self): - """ - Step 6: 触发邮件验证 - - POST /api/accounts/email-otp/send - 触发 OpenAI 发送 OTP 验证码到注册邮箱 - - ⚠️ 此步骤最容易触发 Cloudflare 403 - """ + """Step 6: 触发邮件验证""" logger.info(f"[{self.email}] Step 6: Sending email OTP") resp = self.s.post( self.AUTH_SEND_OTP, json={}, - headers={ - "Content-Type": "application/json", - "Referer": self.AUTH_CREATE_ACCOUNT - } + headers={"Content-Type": "application/json", "Referer": self.AUTH_CREATE_ACCOUNT} ) - # 检查 Cloudflare 拦截 - if resp.status_code == 403: - if CloudflareSolver.detect_challenge(resp): - logger.error(f"[{self.email}] ⚠️ Cloudflare challenge detected at OTP send") - raise CloudflareBlockError( - "Cloudflare Turnstile challenge triggered when sending OTP. " - "Recommendations: use residential proxy or integrate captcha solver." - ) + if resp.status_code == 403 and CloudflareSolver.detect_challenge(resp): + raise CloudflareBlockError("Cloudflare challenge triggered") if resp.status_code != 200: - raise RuntimeError( - f"Email OTP send failed: {resp.status_code} - {resp.text[:200]}" - ) + raise RuntimeError(f"OTP send failed: {resp.status_code}") - logger.info(f"[{self.email}] ✓ OTP email sent successfully") + logger.info(f"[{self.email}] ✓ OTP email sent") async def _step7_submit_otp(self): - """ - Step 7: 提交 OTP 验证码 - - 等待邮件接收 OTP,然后提交验证 - - ⚠️ 用户需要配置邮箱服务 - """ + """Step 7: 提交 OTP 验证码""" logger.info(f"[{self.email}] Step 7: Waiting for OTP") try: - # 等待 OTP(最多 5 分钟) - self.otp = await self.mail.wait_for_otp( - email=self.email, - timeout=300 - ) + self.otp = await self.mail.wait_for_otp(email=self.email, timeout=300) logger.info(f"[{self.email}] ✓ OTP received: {self.otp}") - except NotImplementedError: - logger.warning( - f"[{self.email}] ⚠️ Mail handler not configured, cannot retrieve OTP" - ) - # 重新抛出异常,让调用方知道需要手动输入 OTP + logger.warning(f"[{self.email}] Mail handler not configured") raise - except TimeoutError: - logger.error(f"[{self.email}] ⏱ Timeout waiting for OTP") - raise TimeoutError( - f"Timeout waiting for OTP email (5 minutes). " - f"Please check email: {self.email}" - ) + raise TimeoutError(f"Timeout waiting for OTP: {self.email}") - # 提交 OTP - payload = {"code": self.otp} resp = self.s.post( self.AUTH_VALIDATE_OTP, - json=payload, - headers={ - "Content-Type": "application/json", - "Referer": self.AUTH_CREATE_ACCOUNT - } + json={"code": self.otp}, + headers={"Content-Type": "application/json", "Referer": self.AUTH_CREATE_ACCOUNT} ) if resp.status_code != 200: - error_msg = resp.text[:200] - logger.error(f" - OTP validation failed: {resp.status_code} - {error_msg}") + raise RuntimeError(f"OTP validation failed: {resp.status_code}") - if "invalid code" in resp.text.lower(): - raise ValueError(f"Invalid OTP code: {self.otp}") - else: - raise RuntimeError(f"OTP validation failed: {resp.status_code} - {error_msg}") - - logger.info(f"[{self.email}] ✓ OTP validated successfully") + logger.info(f"[{self.email}] ✓ OTP validated") async def _step8_complete_profile(self): - """ - Step 8: 完成用户信息 - - POST /api/accounts/create_account - 提交姓名和生日,完成注册 - """ + """Step 8: 完成用户信息""" logger.info(f"[{self.email}] Step 8: Completing profile") name = self._generate_name() birthdate = self._generate_birthdate() - payload = { - "name": name, - "birthdate": birthdate - } - resp = self.s.post( self.AUTH_COMPLETE_PROFILE, - json=payload, - headers={ - "Content-Type": "application/json", - "Referer": self.AUTH_CREATE_ACCOUNT - } + json={"name": name, "birthdate": birthdate}, + headers={"Content-Type": "application/json", "Referer": self.AUTH_CREATE_ACCOUNT} ) if resp.status_code != 200: - raise RuntimeError( - f"Profile completion failed: {resp.status_code} - {resp.text[:200]}" - ) + raise RuntimeError(f"Profile completion failed: {resp.status_code}") - logger.info( - f"[{self.email}] ✓ Profile completed: name={name}, birthdate={birthdate}" - ) - - # ========== 辅助方法 ========== + logger.info(f"[{self.email}] ✓ Profile completed") def _generate_email(self) -> str: - """ - 生成随机邮箱 - - 如果启用了 CloudMail 且配置了域名,使用 CloudMail 域名 - 否则使用 example.com(需要用户替换) - - 返回: - 邮箱地址 - """ + """生成随机邮箱""" random_part = secrets.token_hex(8) - # 如果使用 CloudMail 且配置了域名,使用真实域名 if self.config.mail.enabled and self.config.mail.type == "cloudmail": domain = self.config.mail.cloudmail_domain if domain: return f"user_{random_part}@{domain}" - # 默认使用 example.com(用户应该替换为实际域名) - logger.warning( - f"Using example.com domain. Please configure MAIL_CLOUDMAIL_DOMAIN " - f"or replace with your actual domain." - ) + logger.warning("Using example.com domain. Configure MAIL_CLOUDMAIL_DOMAIN.") return f"user_{random_part}@example.com" def _generate_name(self) -> str: - """ - 生成随机姓名 - - 返回: - 姓名字符串 - """ - first_names = [ - "James", "John", "Robert", "Michael", "William", - "David", "Richard", "Joseph", "Thomas", "Charles", - "Mary", "Patricia", "Jennifer", "Linda", "Elizabeth", - "Barbara", "Susan", "Jessica", "Sarah", "Karen" - ] - - last_names = [ - "Smith", "Johnson", "Williams", "Brown", "Jones", - "Garcia", "Miller", "Davis", "Rodriguez", "Martinez", - "Hernandez", "Lopez", "Gonzalez", "Wilson", "Anderson", - "Thomas", "Taylor", "Moore", "Jackson", "Martin" - ] - - first = random.choice(first_names) - last = random.choice(last_names) - - return f"{first} {last}" + """生成随机姓名""" + first_names = ["James", "John", "Robert", "Michael", "William", "David", "Mary", "Patricia", "Jennifer", "Linda"] + last_names = ["Smith", "Johnson", "Williams", "Brown", "Jones", "Garcia", "Miller", "Davis", "Wilson", "Anderson"] + return f"{random.choice(first_names)} {random.choice(last_names)}" def _generate_birthdate(self) -> str: - """ - 生成随机生日(1980-2002 年,确保满 18 岁) - - 返回: - 日期字符串,格式: YYYY-MM-DD - """ + """生成随机生日 (1980-2002)""" year = random.randint(1980, 2002) month = random.randint(1, 12) - - # 避免 2 月 29/30/31 日等无效日期 - if month == 2: - day = random.randint(1, 28) - elif month in [4, 6, 9, 11]: - day = random.randint(1, 30) - else: - day = random.randint(1, 31) - + day = random.randint(1, 28) if month == 2 else random.randint(1, 30) if month in [4, 6, 9, 11] else random.randint(1, 31) return f"{year}-{month:02d}-{day:02d}" -# 导出主要接口 __all__ = ["RegisterFlow"] diff --git a/core/sentinel.py b/core/sentinel.py deleted file mode 100644 index cbc8d20..0000000 --- a/core/sentinel.py +++ /dev/null @@ -1,225 +0,0 @@ -""" -Sentinel 反爬机制处理器 - -Sentinel 是 OpenAI 用于防止自动化注册的安全机制,包括: -- Proof of Work (PoW) 挑战 -- 设备指纹验证 -- 行为分析 - -✅ 已集成完整的 Sentinel 解决方案(使用 reference/ 下的代码) -""" - -from typing import Optional, Dict, Any -from utils.logger import logger -from utils.fingerprint import BrowserFingerprint - - -class SentinelHandler: - """ - Sentinel 反爬机制处理器 - - ✅ 已集成用户的 Sentinel 解决方案 - 使用 reference/ 目录下的完整实现 - """ - - # Sentinel API 端点(从开发文档提取) - SENTINEL_API = "https://chatgpt.com/_next/static/chunks/sentinel.js" - SENTINEL_TOKEN_ENDPOINT = "https://api.openai.com/sentinel/token" - - def __init__(self, session): - """ - 初始化 Sentinel 处理器 - - 参数: - session: OAISession 实例(需要使用其 Cookie 和代理) - """ - from core.session import OAISession - self.session: OAISession = session - self.oai_did = session.oai_did - - # 初始化浏览器指纹 - self.fingerprint = BrowserFingerprint(session_id=self.oai_did) - - # 延迟导入 SentinelSolver(避免循环导入) - self._solver = None - - logger.info("SentinelHandler initialized") - - def _get_solver(self): - """延迟初始化 Sentinel 求解器""" - if self._solver is None: - try: - from reference.sentinel_solver import SentinelSolver - self._solver = SentinelSolver(self.fingerprint) - logger.debug("SentinelSolver initialized successfully") - except ImportError as e: - logger.error(f"Failed to import SentinelSolver: {e}") - raise ImportError( - "Sentinel solver not found. Please check reference/ directory." - ) from e - return self._solver - - async def get_token( - self, - flow: str = "username_password_create", - **kwargs - ) -> Dict[str, Any]: - """ - 获取 Sentinel Token - - ✅ 已实现 - 使用 reference/ 下的完整解决方案 - - 参数: - flow: 注册流程类型,常见值: - - "username_password_create" (注册流程) - - "username_password_create__auto" (自动注册) - **kwargs: 其他可能需要的参数 - - 返回: - Sentinel Token 字典 {"p": "...", "t": "...", "c": "...", "id": "...", "flow": "..."} - - 实现说明: - 使用 reference/sentinel_solver.py 生成 requirements token - 返回完整的 JSON 对象(用于 Header) - """ - logger.info(f"Generating Sentinel token for flow='{flow}'") - - try: - # 获取求解器 - solver = self._get_solver() - - # 生成 requirements token - token_dict = solver.generate_requirements_token() - - # 构建完整 token - # 格式: {"p": "gAAAAAC...", "t": "...", "c": "...", "id": "uuid", "flow": "..."} - token_dict["flow"] = flow - token_dict["id"] = self.oai_did - - # 验证必需字段 - if "p" not in token_dict or not token_dict["p"]: - raise ValueError("Generated token missing 'p' field") - - logger.success(f"Sentinel token generated: {str(token_dict)[:50]}...") - return token_dict - - except ImportError as e: - logger.error(f"Sentinel solver not available: {e}") - raise NotImplementedError( - "❌ Sentinel solver import failed.\n\n" - "Please ensure:\n" - "1. reference/ directory exists with sentinel_solver.py\n" - "2. sdk/sdk.js file exists\n" - "3. Node.js is installed and available in PATH\n\n" - f"Error: {e}" - ) from e - - except Exception as e: - logger.exception(f"Failed to generate Sentinel token: {e}") - raise RuntimeError(f"Sentinel token generation failed: {e}") from e - - async def solve_enforcement( - self, - enforcement_config: Dict[str, Any] - ) -> str: - """ - 解决完整的 enforcement 挑战(PoW + Turnstile) - - ✅ 已实现 - 使用 reference/sentinel_solver.py - - 参数: - enforcement_config: 服务器返回的挑战配置 - { - 'proofofwork': { - 'seed': '...', - 'difficulty': '0003a', - 'token': '...', # cached token - 'turnstile': { - 'dx': '...' # VM bytecode - } - } - } - - 返回: - 完整的 Sentinel token (JSON string) - """ - logger.info("Solving enforcement challenge...") - - try: - solver = self._get_solver() - token_json = solver.solve_enforcement(enforcement_config) - - logger.success(f"Enforcement solved: {token_json[:50]}...") - return token_json - - except Exception as e: - logger.exception(f"Failed to solve enforcement: {e}") - raise RuntimeError(f"Enforcement solving failed: {e}") from e - - def _build_payload(self, flow: str) -> Dict[str, Any]: - """ - 构建 Sentinel 请求 Payload - - 参考开发文档中的请求格式: - { - "p": "gAAAAAB...", # Proof of Work 答案 - "id": "a1b2c3d4-...", # oai-did - "flow": "username_password_create__auto" - } - - 参数: - flow: 注册流程类型 - - 返回: - Payload 字典 - """ - return { - "p": "gAAAAAB_PLACEHOLDER_POW_ANSWER", - "id": self.oai_did, - "flow": flow - } - - async def verify_token(self, token: str) -> bool: - """ - 验证 Sentinel Token 是否有效(可选功能) - - 参数: - token: 待验证的 Sentinel Token - - 返回: - True 如果有效,否则 False - """ - if not token or token == "placeholder_token": - logger.warning("Received placeholder token, validation skipped") - return False - - # Token 基本格式检查 - if not token.startswith("gAAAAA"): - logger.warning(f"Invalid token format: {token[:20]}...") - return False - - logger.info(f"Token validation: {token[:20]}... (length={len(token)})") - return True - - def get_sentinel_script(self) -> Optional[str]: - """ - 获取 Sentinel JavaScript 代码(可选,用于分析) - - 返回: - Sentinel JS 代码内容,失败则返回 None - """ - try: - response = self.session.get(self.SENTINEL_API) - if response.status_code == 200: - logger.info(f"Sentinel script fetched: {len(response.text)} bytes") - return response.text - else: - logger.error(f"Failed to fetch Sentinel script: {response.status_code}") - return None - except Exception as e: - logger.error(f"Error fetching Sentinel script: {e}") - return None - - -# 导出主要接口 -__all__ = ["SentinelHandler"] diff --git a/core/sentinel/__init__.py b/core/sentinel/__init__.py new file mode 100644 index 0000000..d60c97a --- /dev/null +++ b/core/sentinel/__init__.py @@ -0,0 +1,7 @@ +""" +Sentinel 反爬机制解决方案 +""" +from .solver import SentinelSolver +from .js_executor import JSExecutor + +__all__ = ["SentinelSolver", "JSExecutor"] diff --git a/reference/js_executor.py b/core/sentinel/js_executor.py similarity index 91% rename from reference/js_executor.py rename to core/sentinel/js_executor.py index 3a16f26..64eca7b 100644 --- a/reference/js_executor.py +++ b/core/sentinel/js_executor.py @@ -11,31 +11,43 @@ from __future__ import annotations import json import re import subprocess +import sys from pathlib import Path from typing import Any, Dict, Optional -from reference.config import DEBUG, SDK_JS_PATH +from config import load_config class JSExecutor: """通过 Node.js 执行 Sentinel SDK 内部逻辑(支持 async Turnstile VM)""" def __init__(self): + # 加载配置 + config = load_config() + self.debug = config.sentinel_debug + + # 解析 SDK 路径 + sdk_path = Path(config.sentinel_sdk_path) + if not sdk_path.is_absolute(): + # 相对路径:相对于项目根目录 + project_root = Path(sys.argv[0]).parent.resolve() + sdk_path = project_root / sdk_path + + self._sdk_path = sdk_path self._sdk_code: str = "" self._load_sdk() def _load_sdk(self) -> None: - sdk_path = Path(SDK_JS_PATH) - if not sdk_path.exists(): - raise FileNotFoundError(f"SDK not found at {SDK_JS_PATH}") + if not self._sdk_path.exists(): + raise FileNotFoundError(f"SDK not found at {self._sdk_path}") - sdk_code = sdk_path.read_text(encoding="utf-8") + sdk_code = self._sdk_path.read_text(encoding="utf-8") sdk_code = self._sanitize_sdk(sdk_code) sdk_code = self._inject_internal_exports(sdk_code) self._sdk_code = sdk_code - if DEBUG: + if self.debug: print("[JSExecutor] SDK loaded successfully (sanitized)") def _sanitize_sdk(self, sdk_code: str) -> str: @@ -169,7 +181,7 @@ async function __entry() {{ def _run_node(self, payload: Dict[str, Any], entry: str, timeout_s: int = 30) -> Any: script = self._node_script(payload, entry) - if DEBUG: + if self.debug: print("[JSExecutor] Running Node worker...") try: @@ -200,7 +212,7 @@ async function __entry() {{ return obj.get("result") def solve_pow(self, seed: str, difficulty: str, config_array: list) -> str: - if DEBUG: + if self.debug: print(f"[JSExecutor] Solving PoW: seed={seed[:10]}..., difficulty={difficulty}") result = self._run_node( @@ -209,7 +221,7 @@ async function __entry() {{ timeout_s=60, ) - if DEBUG and isinstance(result, str): + if self.debug and isinstance(result, str): print(f"[JSExecutor] PoW solved: {result[:50]}...") return result @@ -227,7 +239,7 @@ async function __entry() {{ return result def execute_turnstile(self, dx_bytecode: str, xor_key: str) -> str: - if DEBUG: + if self.debug: print("[JSExecutor] Executing Turnstile VM...") result = self._run_node( @@ -239,7 +251,7 @@ async function __entry() {{ timeout_s=30, ) - if DEBUG and isinstance(result, str): + if self.debug and isinstance(result, str): print(f"[JSExecutor] Turnstile result: {result[:50]}...") return result diff --git a/reference/sentinel_solver.py b/core/sentinel/solver.py similarity index 90% rename from reference/sentinel_solver.py rename to core/sentinel/solver.py index 992a246..4d8a31b 100644 --- a/reference/sentinel_solver.py +++ b/core/sentinel/solver.py @@ -5,51 +5,55 @@ import json import uuid from typing import Dict, Optional -from reference.js_executor import JSExecutor +from core.sentinel.js_executor import JSExecutor from utils.fingerprint import BrowserFingerprint -from reference.config import DEBUG +from config import load_config class SentinelSolver: """协调指纹生成和 JS 执行,生成完整的 Sentinel tokens""" - + def __init__(self, fingerprint: BrowserFingerprint): self.fingerprint = fingerprint self.js_executor = JSExecutor() - + + # 加载配置 + config = load_config() + self.debug = config.sentinel_debug + def generate_requirements_token(self) -> Dict[str, str]: """ 生成 requirements token(初始化时需要) - + Returns: {'p': 'gAAAAAC...', 'id': 'uuid'} """ - if DEBUG: + if self.debug: print("[Solver] Generating requirements token...") - + # 生成随机 seed req_seed = str(uuid.uuid4()) - + # 获取指纹配置 config_array = self.fingerprint.get_config_array() - + # 调用 JS 求解 answer = self.js_executor.generate_requirements(req_seed, config_array) - + token = { 'p': f'gAAAAAC{answer}', 'id': self.fingerprint.session_id, } - if DEBUG: + if self.debug: print(f"[Solver] Requirements token: {token['p'][:30]}...") return token - + def solve_enforcement(self, enforcement_config: Dict) -> str: """ 解决完整的 enforcement 挑战(PoW + Turnstile) - + Args: enforcement_config: 服务器返回的挑战配置 { @@ -62,32 +66,32 @@ class SentinelSolver: } } } - + Returns: 完整的 Sentinel token (JSON string) """ - if DEBUG: + if self.debug: print("[Solver] Solving enforcement challenge...") - + pow_data = enforcement_config.get('proofofwork', {}) - + # 1. 解决 PoW seed = pow_data['seed'] difficulty = pow_data['difficulty'] - + config_array = self.fingerprint.get_config_array() pow_answer = self.js_executor.solve_pow(seed, difficulty, config_array) - + # 2. 执行 Turnstile(如果有) turnstile_result = None turnstile_data = pow_data.get('turnstile') - + if turnstile_data and turnstile_data.get('dx'): dx_bytecode = turnstile_data['dx'] xor_key = self.fingerprint.session_id # 通常用 session ID 作为密钥 - + turnstile_result = self.js_executor.execute_turnstile(dx_bytecode, xor_key) - + # 3. 构建最终 token sentinel_token = { # enforcement token 前缀为 gAAAAAB(requirements 为 gAAAAAC) @@ -95,19 +99,17 @@ class SentinelSolver: 'id': self.fingerprint.session_id, 'flow': 'username_password_create', } - + # 添加可选字段 if turnstile_result: sentinel_token['t'] = turnstile_result - + if pow_data.get('token'): sentinel_token['c'] = pow_data['token'] - - token_json = json.dumps(sentinel_token) - - if DEBUG: - print(f"[Solver] Sentinel token generated: {token_json[:80]}...") - - return token_json - + token_json = json.dumps(sentinel_token) + + if self.debug: + print(f"[Solver] Sentinel token generated: {token_json[:80]}...") + + return token_json diff --git a/core/sentinel_handler.py b/core/sentinel_handler.py new file mode 100644 index 0000000..5d8e8a0 --- /dev/null +++ b/core/sentinel_handler.py @@ -0,0 +1,95 @@ +"""Sentinel 反爬机制处理器""" + +from typing import Optional, Dict, Any +from utils.logger import logger +from utils.fingerprint import BrowserFingerprint + + +class SentinelHandler: + """Sentinel 反爬机制处理器""" + + SENTINEL_API = "https://chatgpt.com/_next/static/chunks/sentinel.js" + SENTINEL_TOKEN_ENDPOINT = "https://api.openai.com/sentinel/token" + + def __init__(self, session): + from core.session import OAISession + self.session: OAISession = session + self.oai_did = session.oai_did + self.fingerprint = BrowserFingerprint(session_id=self.oai_did) + self._solver = None + logger.info("SentinelHandler initialized") + + def _get_solver(self): + """延迟初始化 Sentinel 求解器""" + if self._solver is None: + try: + from core.sentinel import SentinelSolver + self._solver = SentinelSolver(self.fingerprint) + logger.debug("SentinelSolver initialized") + except ImportError as e: + logger.error(f"Failed to import SentinelSolver: {e}") + raise ImportError("Sentinel solver not found. Check core/sentinel/ directory.") from e + return self._solver + + async def get_token(self, flow: str = "username_password_create", **kwargs) -> Dict[str, Any]: + """获取 Sentinel Token""" + logger.info(f"Generating Sentinel token for flow='{flow}'") + + try: + solver = self._get_solver() + token_dict = solver.generate_requirements_token() + token_dict["flow"] = flow + token_dict["id"] = self.oai_did + + if "p" not in token_dict or not token_dict["p"]: + raise ValueError("Generated token missing 'p' field") + + logger.success(f"Sentinel token generated: {str(token_dict)[:50]}...") + return token_dict + + except ImportError as e: + logger.error(f"Sentinel solver not available: {e}") + raise NotImplementedError(f"Sentinel solver import failed: {e}") from e + + except Exception as e: + logger.exception(f"Failed to generate Sentinel token: {e}") + raise RuntimeError(f"Sentinel token generation failed: {e}") from e + + async def solve_enforcement(self, enforcement_config: Dict[str, Any]) -> str: + """解决 enforcement 挑战(PoW + Turnstile)""" + logger.info("Solving enforcement challenge...") + + try: + solver = self._get_solver() + token_json = solver.solve_enforcement(enforcement_config) + logger.success(f"Enforcement solved: {token_json[:50]}...") + return token_json + + except Exception as e: + logger.exception(f"Failed to solve enforcement: {e}") + raise RuntimeError(f"Enforcement solving failed: {e}") from e + + async def verify_token(self, token: str) -> bool: + """验证 Sentinel Token 格式""" + if not token or token == "placeholder_token": + return False + if not token.startswith("gAAAAA"): + logger.warning(f"Invalid token format: {token[:20]}...") + return False + return True + + def get_sentinel_script(self) -> Optional[str]: + """获取 Sentinel JavaScript 代码""" + try: + response = self.session.get(self.SENTINEL_API) + if response.status_code == 200: + logger.info(f"Sentinel script fetched: {len(response.text)} bytes") + return response.text + logger.error(f"Failed to fetch Sentinel script: {response.status_code}") + return None + except Exception as e: + logger.error(f"Error fetching Sentinel script: {e}") + return None + + +__all__ = ["SentinelHandler"] diff --git a/reference/__init__.py b/reference/__init__.py deleted file mode 100644 index b8991b0..0000000 --- a/reference/__init__.py +++ /dev/null @@ -1,11 +0,0 @@ -""" -Reference 模块 - Sentinel 解决方案 - -包含 Sentinel Token 生成的完整实现 -""" - -from .sentinel_solver import SentinelSolver -from .js_executor import JSExecutor -from .pow_solver import ProofOfWorkSolver - -__all__ = ["SentinelSolver", "JSExecutor", "ProofOfWorkSolver"] diff --git a/reference/config.py b/reference/config.py deleted file mode 100644 index f183c6a..0000000 --- a/reference/config.py +++ /dev/null @@ -1,14 +0,0 @@ -""" -Reference 模块配置文件 - -供 Sentinel 解决器使用的配置项 -""" - -# 调试模式 -DEBUG = False - -# SDK JS 文件路径 -SDK_JS_PATH = "/home/carry/myprj/gptAutoPlus/sdk/sdk.js" - -# 导出 -__all__ = ["DEBUG", "SDK_JS_PATH"] diff --git a/reference/fingerprint.py b/reference/fingerprint.py deleted file mode 100644 index 5921d24..0000000 --- a/reference/fingerprint.py +++ /dev/null @@ -1,167 +0,0 @@ -# modules/fingerprint.py -"""浏览器指纹生成器""" - -import uuid -import random -import time -from datetime import datetime -from typing import Dict, List, Any - -from config import FINGERPRINT_CONFIG - - -class BrowserFingerprint: - """生成符合 SDK 期望的浏览器指纹""" - - def __init__(self, session_id: str = None): - self.session_id = session_id or str(uuid.uuid4()) - - # 新增: 使用确定性方法从 session_id 派生 Stripe 指纹 - import hashlib - seed = hashlib.sha256(self.session_id.encode()).hexdigest() - # seed 是64个hex字符,我们需要确保切片正确 - - # 从 seed 生成一致的 guid/muid/sid - # UUID需要32个hex字符(去掉连字符),额外部分直接拼接 - self.stripe_guid = seed[:8] + '-' + seed[8:12] + '-' + seed[12:16] + '-' + seed[16:20] + '-' + seed[20:32] + seed[32:40] - self.stripe_muid = seed[:8] + '-' + seed[8:12] + '-' + seed[12:16] + '-' + seed[16:20] + '-' + seed[20:32] + seed[40:46] - self.stripe_sid = seed[:8] + '-' + seed[8:12] + '-' + seed[12:16] + '-' + seed[16:20] + '-' + seed[20:32] + seed[46:52] - - self.user_agent = FINGERPRINT_CONFIG['user_agent'] - self.screen_width = FINGERPRINT_CONFIG['screen_width'] - self.screen_height = FINGERPRINT_CONFIG['screen_height'] - self.languages = FINGERPRINT_CONFIG['languages'] - self.hardware_concurrency = FINGERPRINT_CONFIG['hardware_concurrency'] - - def get_config_array(self) -> List[Any]: - """ - 生成 SDK getConfig() 函数返回的 18 元素数组 - - 对应 SDK 源码: - [0]: screen.width + screen.height - [1]: new Date().toString() - [2]: performance.memory.jsHeapSizeLimit (可选) - [3]: nonce (PoW 填充) - [4]: navigator.userAgent - [5]: 随机 script.src - [6]: build ID - [7]: navigator.language - [8]: navigator.languages.join(',') - [9]: 运行时间 (PoW 填充) - [10]: 随机 navigator 属性 - [11]: 随机 document key - [12]: 随机 window key - [13]: performance.now() - [14]: session UUID - [15]: URL search params - [16]: navigator.hardwareConcurrency - [17]: performance.timeOrigin - """ - - # 模拟的 script sources - fake_scripts = [ - "https://sentinel.openai.com/sentinel/97790f37/sdk.js", - "https://chatgpt.com/static/js/main.abc123.js", - "https://cdn.oaistatic.com/_next/static/chunks/main.js", - ] - - # 模拟的 navigator 属性名 - navigator_props = [ - 'hardwareConcurrency', 'language', 'languages', - 'platform', 'userAgent', 'vendor' - ] - - # 模拟的 document keys - document_keys = ['body', 'head', 'documentElement', 'scripts'] - - # 模拟的 window keys - window_keys = ['performance', 'navigator', 'document', 'location'] - - current_time = time.time() * 1000 - - return [ - self.screen_width + self.screen_height, # [0] - str(datetime.now()), # [1] - None, # [2] memory - None, # [3] nonce (placeholder) - self.user_agent, # [4] - random.choice(fake_scripts), # [5] - "97790f37", # [6] build ID - self.languages[0], # [7] - ",".join(self.languages), # [8] - None, # [9] runtime (placeholder) - f"{random.choice(navigator_props)}−{random.randint(1, 16)}", # [10] - random.choice(document_keys), # [11] - random.choice(window_keys), # [12] - current_time, # [13] - self.session_id, # [14] - "", # [15] URL params - self.hardware_concurrency, # [16] - current_time - random.uniform(100, 1000), # [17] timeOrigin - ] - - def get_cookies(self) -> Dict[str, str]: - """生成初始 cookies""" - return { - 'oai-did': self.session_id, - } - - def get_stripe_fingerprint(self) -> Dict[str, str]: - """获取 Stripe 支付指纹(与 session_id 一致派生)""" - return { - 'guid': self.stripe_guid, - 'muid': self.stripe_muid, - 'sid': self.stripe_sid, - } - - def get_headers(self, with_sentinel: str = None, host: str = 'auth.openai.com') -> Dict[str, str]: - """生成 HTTP headers(支持多域名)""" - - # 基础 headers - headers = { - 'User-Agent': self.user_agent, - 'Accept': 'application/json', - 'Accept-Language': f"{self.languages[0]},{self.languages[1]};q=0.5", - # Note: urllib3/requests only auto-decompress brotli/zstd when optional - # deps are installed; avoid advertising unsupported encodings. - 'Accept-Encoding': 'gzip, deflate', - 'Connection': 'keep-alive', - 'Sec-Fetch-Dest': 'empty', - 'Sec-Fetch-Mode': 'cors', - 'Sec-Fetch-Site': 'same-origin', - 'Priority': 'u=1, i', - 'Pragma': 'no-cache', - 'Cache-Control': 'no-cache', - } - - # 根据域名设置特定 headers - if 'chatgpt.com' in host: - headers.update({ - 'Origin': 'https://chatgpt.com', - 'Referer': 'https://chatgpt.com/', - }) - else: - headers.update({ - 'Origin': 'https://auth.openai.com', - 'Referer': 'https://auth.openai.com/create-account/password', - 'Content-Type': 'application/json', - }) - - # Sentinel token - if with_sentinel: - headers['openai-sentinel-token'] = with_sentinel - - # Datadog RUM tracing - trace_id = random.randint(10**18, 10**19 - 1) - parent_id = random.randint(10**18, 10**19 - 1) - - headers.update({ - 'traceparent': f'00-0000000000000000{trace_id:016x}-{parent_id:016x}-01', - 'tracestate': 'dd=s:1;o:rum', - 'x-datadog-origin': 'rum', - 'x-datadog-parent-id': str(parent_id), - 'x-datadog-sampling-priority': '1', - 'x-datadog-trace-id': str(trace_id), - }) - - return headers diff --git a/reference/pow_solver.py b/reference/pow_solver.py deleted file mode 100644 index 976c527..0000000 --- a/reference/pow_solver.py +++ /dev/null @@ -1,114 +0,0 @@ -import time -import json -import base64 -from typing import List - -DEBUG = True - -class ProofOfWorkSolver: - """解决 OpenAI Sentinel 的 Proof of Work challenge""" - - def __init__(self): - # FNV-1a 常量 - self.FNV_OFFSET = 2166136261 - self.FNV_PRIME = 16777619 - - def fnv1a_hash(self, data: str) -> str: - """FNV-1a hash 算法""" - hash_value = self.FNV_OFFSET - - for char in data: - hash_value ^= ord(char) - hash_value = (hash_value * self.FNV_PRIME) & 0xFFFFFFFF - - # 额外的混合步骤(从 JS 代码复制) - hash_value ^= hash_value >> 16 - hash_value = (hash_value * 2246822507) & 0xFFFFFFFF - hash_value ^= hash_value >> 13 - hash_value = (hash_value * 3266489909) & 0xFFFFFFFF - hash_value ^= hash_value >> 16 - - # 转为 8 位十六进制字符串 - return format(hash_value, '08x') - - def serialize_array(self, arr: List) -> str: - """模拟 JS 的 T() 函数:JSON.stringify + Base64""" - json_str = json.dumps(arr, separators=(',', ':')) - return base64.b64encode(json_str.encode()).decode() - - def build_fingerprint_array(self, nonce: int, elapsed_ms: int) -> List: - """构建指纹数组(简化版)""" - return [ - 0, # [0] screen dimensions - "", # [1] timestamp - 0, # [2] memory - nonce, # [3] nonce ← 关键 - "", # [4] user agent - "", # [5] random element - "", # [6] script src - "", # [7] language - "", # [8] languages - elapsed_ms, # [9] elapsed time ← 关键 - "", # [10] random function - "", # [11] keys - "", # [12] window keys - 0, # [13] performance.now() - "", # [14] uuid - "", # [15] URL params - 0, # [16] hardware concurrency - 0 # [17] timeOrigin - ] - - def solve(self, seed: str, difficulty: str, max_iterations: int = 10000000) -> str: - """ - 解决 PoW challenge - - Args: - seed: Challenge seed - difficulty: 目标难度(十六进制字符串) - max_iterations: 最大尝试次数 - - Returns: - 序列化的答案(包含 nonce) - """ - if DEBUG: - print(f"[PoW] Solving challenge:") - print(f" Seed: {seed}") - print(f" Difficulty: {difficulty}") - - start_time = time.time() - - for nonce in range(max_iterations): - elapsed_ms = int((time.time() - start_time) * 1000) - - # 构建指纹数组 - fingerprint = self.build_fingerprint_array(nonce, elapsed_ms) - - # 序列化 - serialized = self.serialize_array(fingerprint) - - # 计算 hash(seed + serialized) - hash_input = seed + serialized - hash_result = self.fnv1a_hash(hash_input) - - # 检查是否满足难度要求 - # 比较方式:hash 的前 N 位(作为整数)<= difficulty(作为整数) - difficulty_len = len(difficulty) - hash_prefix = hash_result[:difficulty_len] - - if hash_prefix <= difficulty: - elapsed = time.time() - start_time - if DEBUG: - print(f"[PoW] ✓ Found solution in {elapsed:.2f}s") - print(f" Nonce: {nonce}") - print(f" Hash: {hash_result}") - print(f" Serialized: {serialized[:100]}...") - - # 返回 serialized + "~S" (表示成功) - return serialized + "~S" - - # 每 100k 次迭代打印进度 - if DEBUG and nonce > 0 and nonce % 100000 == 0: - print(f"[PoW] Tried {nonce:,} iterations...") - - raise Exception(f"Failed to solve PoW after {max_iterations:,} iterations") diff --git a/reference/register.py b/reference/register.py deleted file mode 100644 index ee905fa..0000000 --- a/reference/register.py +++ /dev/null @@ -1,1129 +0,0 @@ -# modules/registrar.py (更新版) -"""OpenAI 注册流程控制器""" - -import json -from typing import Dict, Optional -import secrets -import random -import uuid -import requests -from urllib.parse import urlparse -from .fingerprint import BrowserFingerprint -from .sentinel_solver import SentinelSolver -from .http_client import HTTPClient -from .stripe_payment import StripePaymentHandler -from config import AUTH_BASE_URL, DEBUG, TEMPMAIL_CONFIG -from modules.pow_solver import ProofOfWorkSolver -from modules.tempmail import TempMailClient - -class OpenAIRegistrar: - """完整的 OpenAI 注册流程""" - - def __init__(self, session_id: Optional[str] = None, tempmail_client: Optional[TempMailClient] = None): - self.fingerprint = BrowserFingerprint(session_id) - self.solver = SentinelSolver(self.fingerprint) - self.http_client = HTTPClient(self.fingerprint) - self.pow_solver = ProofOfWorkSolver() # 新增 - self.tempmail_client = tempmail_client # 临时邮箱客户端(可选) - - def _step1_init_through_chatgpt(self, email: str): - """通过 ChatGPT web 初始化注册流程""" - - # 1.1 访问 ChatGPT 首页 - chatgpt_url = "https://chatgpt.com/" - headers = self.http_client.fingerprint.get_headers(host='chatgpt.com') - headers['Accept'] = 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8' - headers['Sec-Fetch-Dest'] = 'document' - headers['Sec-Fetch-Mode'] = 'navigate' - headers['Sec-Fetch-Site'] = 'none' - - resp = self.http_client.session.get( - chatgpt_url, - headers=headers, - timeout=30 - ) - - if DEBUG: - print(f"✅ [1.1] Visited ChatGPT ({resp.status_code})") - - # 1.2 获取 CSRF token - csrf_url = "https://chatgpt.com/api/auth/csrf" - - csrf_headers = headers.copy() - csrf_headers['Accept'] = 'application/json, text/plain, */*' - csrf_headers['Sec-Fetch-Dest'] = 'empty' - csrf_headers['Referer'] = 'https://chatgpt.com/' - - resp = self.http_client.session.get( - csrf_url, - headers=csrf_headers, - timeout=30 - ) - - if DEBUG: - print(f"✅ [1.2] CSRF API ({resp.status_code})") - - # 优先从 cookie 提取 CSRF token - csrf_token = None - csrf_cookie = self.http_client.cookies.get('__Host-next-auth.csrf-token', '') - - if csrf_cookie: - from urllib.parse import unquote - csrf_cookie = unquote(csrf_cookie) - - if '|' in csrf_cookie: - csrf_token = csrf_cookie.split('|')[0] - if DEBUG: - print(f"✅ [1.2] CSRF token extracted") - - # 备选:从响应 JSON 提取 - if not csrf_token and resp.status_code == 200: - try: - data = resp.json() - csrf_token = data.get('csrfToken', '') - if csrf_token and DEBUG: - print(f"✅ [1.2] CSRF token from JSON") - except Exception as e: - if DEBUG: - print(f"⚠️ [1.2] Failed to parse JSON") - - if not csrf_token: - if DEBUG: - print(f"❌ [1.2] Failed to obtain CSRF token") - raise Exception(f"Failed to obtain CSRF token") - - # 1.3 初始化注册(通过 NextAuth) - import uuid - auth_session_logging_id = str(uuid.uuid4()) - - signin_url = "https://chatgpt.com/api/auth/signin/openai" - signin_params = { - 'prompt': 'login', - 'ext-oai-did': self.fingerprint.session_id, - 'auth_session_logging_id': auth_session_logging_id, - 'screen_hint': 'signup', # 明确指定注册 - 'login_hint': email, # 🔥 关键:传入邮箱 - } - - signin_data = { - 'callbackUrl': 'https://chatgpt.com/', - 'csrfToken': csrf_token, - 'json': 'true', - } - - signin_headers = headers.copy() - signin_headers['Content-Type'] = 'application/x-www-form-urlencoded' - signin_headers['Accept'] = '*/*' - signin_headers['Referer'] = 'https://chatgpt.com/' - signin_headers['Sec-Fetch-Mode'] = 'cors' - signin_headers['Sec-Fetch-Dest'] = 'empty' - - resp = self.http_client.session.post( - signin_url, - params=signin_params, - data=signin_data, - headers=signin_headers, - allow_redirects=False, - timeout=30 - ) - - - if DEBUG: - print(f"✅ [1.3] NextAuth response ({resp.status_code})") - - - # 1.4 提取 OAuth URL(从 JSON 响应) - if resp.status_code == 200: - try: - try: - data = resp.json() - except Exception: - data = json.loads(resp.text or "{}") - - oauth_url = data.get('url') - - if not oauth_url: - raise Exception(f"No 'url' in NextAuth response: {data}") - - if DEBUG: - print(f"✅ [1.4] Got OAuth URL") - except Exception as e: - raise Exception(f"Failed to parse NextAuth response: {e}") - - elif resp.status_code in [301, 302, 303, 307, 308]: - # 旧版流程:直接重定向 - oauth_url = resp.headers.get('Location') - if not oauth_url: - if DEBUG: - print(f"❌ [1.4] Got redirect but no Location header") - raise Exception("Got redirect but no Location header") - - if DEBUG: - print(f"✅ [1.4] Got OAuth redirect") - - else: - raise Exception(f"Unexpected NextAuth response: {resp.status_code}") - - # 1.5 访问 OAuth authorize endpoint - auth_headers = self.http_client.fingerprint.get_headers(host='auth.openai.com') - auth_headers['Accept'] = 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8' - auth_headers['Referer'] = 'https://chatgpt.com/' - auth_headers['Sec-Fetch-Dest'] = 'document' - auth_headers['Sec-Fetch-Mode'] = 'navigate' - auth_headers['Sec-Fetch-Site'] = 'cross-site' - - resp = self.http_client.session.get( - oauth_url, - headers=auth_headers, - allow_redirects=True, # 自动跟随重定向 - timeout=30 - ) - - if DEBUG: - print(f"✅ [1.5] OAuth flow completed ({resp.status_code})") - - # 检查必需的 cookies - required_cookies = ['login_session', 'oai-did'] - missing = [c for c in required_cookies if c not in self.http_client.cookies] - - if missing: - if DEBUG: - print(f"⚠️ Missing cookies: {', '.join(missing)}") - else: - if DEBUG: - print(f"✅ All required cookies present") - - # 1.6 确保获取 auth.openai.com 的 CSRF(用于后续 /register 请求) - try: - auth_csrf = self.http_client.get_csrf_token(domain="auth.openai.com") - if DEBUG and auth_csrf: - print(f"✅ [1.6] Auth CSRF obtained") - except Exception as e: - if DEBUG: - print(f"⚠️ [1.6] Failed to get auth CSRF") - - - - - def _step2_init_sentinel(self): - """初始化 Sentinel(生成 token)""" - - try: - token_data = self.solver.generate_requirements_token() - self.sentinel_token = json.dumps(token_data) - - if DEBUG: - print(f"✅ [2] Sentinel token generated") - - except Exception as e: - if DEBUG: - print(f"❌ [2] Sentinel initialization failed: {e}") - raise - def _step2_5_submit_sentinel(self): - """Step 2.5: 提交 Sentinel token 到 OpenAI""" - - # 如果 sentinel_token 是字符串,先解析 - if isinstance(self.sentinel_token, str): - token_data = json.loads(self.sentinel_token) - else: - token_data = self.sentinel_token - - url = "https://sentinel.openai.com/backend-api/sentinel/req" - - payload = { - "p": token_data['p'], - "id": self.fingerprint.session_id, - "flow": "username_password_create" - } - - headers = self.http_client.fingerprint.get_headers(host='sentinel.openai.com') - headers['Content-Type'] = 'text/plain;charset=UTF-8' # 注意:text/plain - headers['Origin'] = 'https://sentinel.openai.com' - headers['Referer'] = 'https://sentinel.openai.com/backend-api/sentinel/frame.html' - headers['Accept'] = '*/*' - headers['Sec-Fetch-Site'] = 'same-origin' - headers['Sec-Fetch-Mode'] = 'cors' - headers['Sec-Fetch-Dest'] = 'empty' - - resp = self.http_client.session.post( - url, - json=payload, - headers=headers, - timeout=30 - ) - - if resp.status_code != 200: - if DEBUG: - print(f"❌ [2.5] Failed to submit Sentinel ({resp.status_code})") - raise Exception(f"Failed to submit Sentinel token: {resp.status_code} {resp.text}") - - try: - data = resp.json() - if DEBUG: - print(f"✅ [2.5] Sentinel accepted (persona: {data.get('persona')})") - - # 检查是否需要额外验证 - if data.get('turnstile', {}).get('required'): - print(f"⚠️ Turnstile required") - if data.get('proofofwork', {}).get('required'): - print(f"⚠️ Proof of Work required (difficulty: {data.get('proofofwork', {}).get('difficulty')})") - - # 保存 token(可能后续需要) - self.sentinel_response = data - return data - - except Exception as e: - if DEBUG: - print(f"❌ [2.5] Failed to parse response: {e}") - raise - - def _step2_6_solve_pow(self): - """Step 2.6: 解 Proof of Work""" - - pow_data = self.sentinel_response.get('proofofwork', {}) - - if not pow_data.get('required'): - if DEBUG: - print("⏭️ [2.6] PoW not required, skipping") - return None - - seed = pow_data.get('seed') - difficulty = pow_data.get('difficulty') - - if not seed or not difficulty: - raise Exception(f"Missing PoW parameters") - - # 解 PoW - self.pow_answer = self.pow_solver.solve(seed, difficulty) - - if DEBUG: - print(f"✅ [2.6] PoW solved (difficulty: {difficulty})") - - return self.pow_answer - def _step2_7_submit_pow(self): - """Step 2.7: 提交 PoW 答案到 Sentinel""" - - url = "https://sentinel.openai.com/backend-api/sentinel/req" - - # 再次生成 requirements token(或重用之前的) - requirements_token = self.sentinel_token.split('"p": "')[1].split('"')[0] - - payload = { - "p": requirements_token, - "id": self.fingerprint.session_id, - "answer": self.pow_answer - } - - headers = self.http_client.fingerprint.get_headers(host='sentinel.openai.com') - headers['Content-Type'] = 'text/plain;charset=UTF-8' - headers['Origin'] = 'https://sentinel.openai.com' - headers['Referer'] = 'https://sentinel.openai.com/backend-api/sentinel/frame.html' - - # 补全 Sec-Fetch-* headers - headers['Sec-Fetch-Dest'] = 'empty' - headers['Sec-Fetch-Mode'] = 'cors' - headers['Sec-Fetch-Site'] = 'same-origin' - - resp = self.http_client.session.post( - url, - json=payload, - headers=headers, - timeout=30 - ) - - - if resp.status_code != 200: - if DEBUG: - print(f"❌ [2.7] Failed to submit PoW ({resp.status_code})") - raise Exception(f"Failed to submit PoW: {resp.status_code}") - - # 解析响应 - result = resp.json() - - if DEBUG: - print(f"✅ [2.7] PoW accepted") - if 'turnstile' in result: - print(f"⚠️ Turnstile still required") - - # 保存最终响应 - self.sentinel_response = result - - return result - - - - def _step3_attempt_register(self, email: str, password: str) -> Optional[Dict]: - """尝试注册(不验证邮箱)""" - - # 3.2 准备注册请求(正确的 payload) - url = "https://auth.openai.com/api/accounts/user/register" - - payload = { - "username": email, # 不是 "email"! - "password": password, - } - - # 3.3 准备 headers(完全匹配真实请求) - headers = self.http_client.fingerprint.get_headers(host='auth.openai.com') - - # 添加 Sentinel token - if hasattr(self, 'sentinel_token') and self.sentinel_token: - headers['Openai-Sentinel-Token'] = self.sentinel_token # 注意大小写 - - headers.update({ - 'Content-Type': 'application/json', - 'Accept': 'application/json', - 'Accept-Language': 'zh-CN,zh;q=0.9', - 'Origin': 'https://auth.openai.com', - 'Referer': 'https://auth.openai.com/create-account/password', # 注意是 /password - 'Sec-Fetch-Dest': 'empty', - 'Sec-Fetch-Mode': 'cors', - 'Sec-Fetch-Site': 'same-origin', - 'Priority': 'u=1, i', - }) - - # 添加 Datadog tracing headers(匹配真实请求) - import secrets - headers.update({ - 'X-Datadog-Trace-Id': str(secrets.randbits(63)), - 'X-Datadog-Parent-Id': str(secrets.randbits(63)), - 'X-Datadog-Sampling-Priority': '1', - 'X-Datadog-Origin': 'rum', - 'Traceparent': f'00-0000000000000000{secrets.token_hex(8)}-{secrets.token_hex(8)}-01', - 'Tracestate': 'dd=s:1;o:rum', - }) - - # 3.4 准备 cookies(使用所有 auth.openai.com 的 cookies) - # 不要手动过滤,让 requests 自动处理域名匹配 - - - # 3.5 发送注册请求 - resp = self.http_client.session.post( - url, - json=payload, - headers=headers, - # 不手动指定 cookies,让 session 自动处理 - timeout=30 - ) - - - # 3.6 处理响应 - try: - data = resp.json() - - if resp.status_code == 200: - if DEBUG: - print(f"✅ [3] Registration successful!") - - # 检查是否需要邮箱验证 - continue_url = data.get('continue_url', '') - if 'email-otp' in continue_url: - if DEBUG: - print(f"⏭️ [3] Email OTP verification required") - return { - 'success': True, - 'requires_verification': True, - 'continue_url': continue_url, - 'method': data.get('method', 'GET'), - 'data': data - } - else: - return {'success': True, 'data': data} - - elif resp.status_code == 409: - error = data.get('error', {}) - error_code = error.get('code', '') - - if error_code == 'invalid_state': - # Session 无效 - if DEBUG: - print(f"❌ [3] Invalid session") - raise Exception(f"Invalid session: {error}") - - elif error_code == 'email_taken' or 'already' in str(error).lower(): - # 邮箱已被使用 - if DEBUG: - print(f"⚠️ [3] Email already registered") - return None - - else: - if DEBUG: - print(f"❌ [3] Registration conflict: {error_code}") - raise Exception(f"Registration conflict: {data}") - - elif resp.status_code == 400: - if DEBUG: - print(f"❌ [3] Bad Request: {data}") - raise Exception(f"Bad request: {data}") - - else: - if DEBUG: - print(f"❌ [3] Unexpected status: {resp.status_code}") - raise Exception(f"Registration failed with {resp.status_code}: {data}") - - except json.JSONDecodeError as e: - if DEBUG: - print(f"❌ [3] Failed to parse JSON response") - raise Exception(f"Invalid JSON response: {e}") - - except Exception as e: - raise - - def _step5_get_access_token(self) -> str: - """Step 5: Retrieve access token from authenticated session - - This method leverages the existing session cookies (login_session, oai-did, - __Host-next-auth.csrf-token) obtained after successful registration and - email verification. - - Returns: - Access token string - - Raises: - Exception: If access token retrieval fails - """ - - url = "https://chatgpt.com/api/auth/session" - - headers = self.http_client.fingerprint.get_headers(host='chatgpt.com') - headers.update({ - 'Accept': 'application/json', - 'Referer': 'https://chatgpt.com/', - 'Sec-Fetch-Dest': 'empty', - 'Sec-Fetch-Mode': 'cors', - 'Sec-Fetch-Site': 'same-origin', - }) - - try: - resp = self.http_client.session.get( - url, - headers=headers, - timeout=30 - ) - - if resp.status_code != 200: - error_msg = f"Failed to retrieve session: HTTP {resp.status_code}" - if DEBUG: - print(f"❌ [5] {error_msg}") - raise Exception(error_msg) - - data = resp.json() - access_token = data.get('accessToken') - - if not access_token: - error_msg = "No accessToken in session response" - if DEBUG: - print(f"❌ [5] {error_msg}") - raise Exception(error_msg) - - if DEBUG: - print(f"✅ [5] Access token retrieved") - print(f" Token: {access_token[:50]}...") - - return access_token - - except requests.RequestException as e: - error_msg = f"Network error retrieving access token: {e}" - if DEBUG: - print(f"❌ [5] {error_msg}") - raise Exception(error_msg) - - except Exception as e: - if DEBUG: - print(f"❌ [5] Unexpected error: {e}") - raise - - def _step4_verify_email(self, mailbox: str, continue_url: str) -> Dict: - """Step 4: 验证邮箱(通过 OTP 验证码) - - Args: - mailbox: 邮箱地址 - continue_url: Step 3 返回的 continue_url(例如: /email-otp/send) - - Returns: - 验证结果 - """ - - # 4.0 触发发送验证邮件(访问 continue_url) - if continue_url: - # 构造完整 URL - if not continue_url.startswith('http'): - send_url = f"https://auth.openai.com{continue_url}" - else: - send_url = continue_url - - - # 准备 headers - headers = self.http_client.fingerprint.get_headers(host='auth.openai.com') - headers.update({ - 'Accept': 'application/json', - 'Referer': 'https://auth.openai.com/create-account/password', - 'Sec-Fetch-Dest': 'empty', - 'Sec-Fetch-Mode': 'cors', - 'Sec-Fetch-Site': 'same-origin', - }) - - # 发送请求(根据 Step 3 返回的 method) - resp = self.http_client.session.get( - send_url, - headers=headers, - timeout=30 - ) - - if resp.status_code != 200: - if DEBUG: - print(f"❌ [4.0] Failed to trigger email send ({resp.status_code})") - raise Exception(f"Failed to trigger email send: {resp.status_code} {resp.text}") - elif DEBUG: - print(f"✅ [4.0] Email send triggered") - - # 4.1 检查是否有 tempmail_client - if not self.tempmail_client: - raise Exception("No TempMailClient configured. Cannot receive verification emails.") - - # 4.2 等待验证邮件 - if DEBUG: - print(f"⏳ [4.1] Waiting for verification email...") - - email_data = self.tempmail_client.wait_for_email( - mailbox=mailbox, - from_filter=None, # 不过滤发件人(因为临时邮箱可能不返回 from 字段) - subject_filter="chatgpt", # 主题包含 "chatgpt"(匹配 "Your ChatGPT code is...") - timeout=120, - interval=5 - ) - - if not email_data: - if DEBUG: - print(f"❌ [4.1] Timeout: No verification email received") - raise Exception("Timeout: No verification email received") - - # 4.3 提取验证码 - verification_code = self.tempmail_client.extract_verification_code(email_data) - - if not verification_code: - # 尝试提取验证链接(备选方案) - verification_link = self.tempmail_client.extract_verification_link(email_data) - if verification_link: - if DEBUG: - print(f"❌ [4.3] Link-based verification not implemented") - raise NotImplementedError("Link-based verification not implemented yet") - else: - if DEBUG: - print(f"❌ [4.3] Failed to extract verification code") - raise Exception("Failed to extract verification code or link from email") - - if DEBUG: - print(f"✅ [4.3] Got verification code: {verification_code}") - - # 4.4 提交验证码 - # 使用正确的验证接口 - verify_url = "https://auth.openai.com/api/accounts/email-otp/validate" - - # 准备 payload - payload = { - "code": verification_code, - } - - # 准备 headers - headers = self.http_client.fingerprint.get_headers(host='auth.openai.com') - headers.update({ - 'Content-Type': 'application/json', - 'Accept': 'application/json', - 'Origin': 'https://auth.openai.com', - 'Referer': 'https://auth.openai.com/email-verification', - 'Sec-Fetch-Dest': 'empty', - 'Sec-Fetch-Mode': 'cors', - 'Sec-Fetch-Site': 'same-origin', - 'Priority': 'u=1, i', - }) - - # 添加 Datadog tracing headers - import secrets - headers.update({ - 'X-Datadog-Trace-Id': str(secrets.randbits(63)), - 'X-Datadog-Parent-Id': str(secrets.randbits(63)), - 'X-Datadog-Sampling-Priority': '1', - 'X-Datadog-Origin': 'rum', - 'Traceparent': f'00-0000000000000000{secrets.token_hex(8)}-{secrets.token_hex(8)}-01', - 'Tracestate': 'dd=s:1;o:rum', - }) - - - # 发送验证请求 - resp = self.http_client.session.post( - verify_url, - json=payload, - headers=headers, - timeout=30 - ) - - # 处理响应 - try: - data = resp.json() - - if resp.status_code == 200: - if DEBUG: - print(f"✅ [4.4] Email verified successfully!") - print(f"📋 [4.4] Response data: {json.dumps(data, indent=2)}") - - return { - 'success': True, - 'verified': True, - 'data': data - } - - else: - if DEBUG: - print(f"❌ [4.4] Verification failed: {data}") - - return { - 'success': False, - 'error': data - } - - except Exception as e: - if DEBUG: - print(f"❌ [4.4] Exception: {e}") - raise - - def _step4_5_submit_personal_info(self) -> Dict: - """Step 4.5: 提交个人信息(姓名、生日)完成账号设置 - - Returns: - 包含 OAuth callback URL 的字典 - """ - - url = "https://auth.openai.com/api/accounts/create_account" - - # 生成随机姓名和生日 - from modules.data_generator import NameGenerator - random_name = NameGenerator.generate_full_name() - - # 生成随机生日(1980-2000年之间) - year = random.randint(1980, 2000) - month = random.randint(1, 12) - day = random.randint(1, 28) # 保险起见,避免2月29日等边界情况 - birthdate = f"{year}-{month:02d}-{day:02d}" - - payload = { - "name": random_name, - "birthdate": birthdate - } - - # 准备 headers - headers = self.http_client.fingerprint.get_headers(host='auth.openai.com') - headers.update({ - 'Content-Type': 'application/json', - 'Accept': 'application/json', - 'Origin': 'https://auth.openai.com', - 'Referer': 'https://auth.openai.com/about-you', - 'Sec-Fetch-Dest': 'empty', - 'Sec-Fetch-Mode': 'cors', - 'Sec-Fetch-Site': 'same-origin', - 'Priority': 'u=1, i', - }) - - # 添加 Datadog tracing headers - import secrets - headers.update({ - 'X-Datadog-Trace-Id': str(secrets.randbits(63)), - 'X-Datadog-Parent-Id': str(secrets.randbits(63)), - 'X-Datadog-Sampling-Priority': '1', - 'X-Datadog-Origin': 'rum', - 'Traceparent': f'00-0000000000000000{secrets.token_hex(8)}-{secrets.token_hex(8)}-01', - 'Tracestate': 'dd=s:1;o:rum', - }) - - # 发送请求 - resp = self.http_client.session.post( - url, - json=payload, - headers=headers, - timeout=30 - ) - - # 处理响应 - try: - data = resp.json() - - if resp.status_code == 200: - oauth_url = data.get('continue_url') - - if DEBUG: - print(f"✅ [4.5] Personal info submitted") - print(f" Name: {random_name}") - print(f" Birthdate: {birthdate}") - - if oauth_url: - return { - 'success': True, - 'oauth_url': oauth_url, - 'method': data.get('method', 'GET') - } - else: - raise Exception(f"No continue_url in response: {data}") - - else: - if DEBUG: - print(f"❌ [4.5] Failed to submit personal info: {data}") - raise Exception(f"Failed to submit personal info: {resp.status_code} {data}") - - except Exception as e: - if DEBUG: - print(f"❌ [4.5] Exception: {e}") - raise - - def _step4_6_complete_oauth_flow(self, oauth_url: str): - """Step 4.6: 完成 OAuth 回调流程,获取 session-token - - 这一步会自动跟随重定向链: - 1. GET oauth_url → 302 to /api/accounts/consent - 2. GET /api/accounts/consent → 302 to chatgpt.com/api/auth/callback/openai - 3. GET chatgpt.com/api/auth/callback/openai → 最终生成 session-token - - Args: - oauth_url: Step 4.5 返回的 OAuth URL - """ - - # 确保是完整 URL - if not oauth_url.startswith('http'): - oauth_url = f"https://auth.openai.com{oauth_url}" - - # 准备 headers - headers = self.http_client.fingerprint.get_headers(host='auth.openai.com') - headers.update({ - 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', - 'Referer': 'https://auth.openai.com/about-you', - 'Sec-Fetch-Dest': 'document', - 'Sec-Fetch-Mode': 'navigate', - 'Sec-Fetch-Site': 'same-origin', - 'Upgrade-Insecure-Requests': '1', - }) - - # 发送请求,允许自动跟随重定向 - # requests.Session 会自动处理跨域重定向(auth.openai.com → chatgpt.com) - resp = self.http_client.session.get( - oauth_url, - headers=headers, - allow_redirects=True, # 自动跟随所有重定向 - timeout=30 - ) - - # 检查是否获取到 session-token - session_token = None - - # curl_cffi 的 cookies 是字典格式,requests 的是 CookieJar - cookies = self.http_client.session.cookies - - # 尝试字典访问(curl_cffi) - if hasattr(cookies, 'get'): - session_token = cookies.get('__Secure-next-auth.session-token') - - if session_token: - if DEBUG: - print(f"✅ [4.6] OAuth flow completed") - print(f" Got session-token: {session_token[:50]}...") - return True - else: - if DEBUG: - print(f"❌ [4.6] OAuth flow completed but no session-token found") - print(f" Final URL: {resp.url}") - print(f" Status: {resp.status_code}") - raise Exception("OAuth flow completed but no session-token cookie was set") - - - def _step5_solve_challenge(self, challenge: Dict) -> str: - """解决 enforcement 挑战""" - - sentinel_token = self.solver.solve_enforcement(challenge) - - return sentinel_token - - def _step6_register_with_token(self, email: str, password: str, - sentinel_token: str) -> Dict: - """带 Sentinel token 重新注册""" - - url = f"{AUTH_BASE_URL}/api/accounts/user/register" - - payload = { - 'username': email, - 'password': password, - 'client_id': 'sentinel' - } - - resp = self.http_client.post( - url, - json_data=payload, - sentinel_token=sentinel_token - ) - - if resp.status_code == 200: - if DEBUG: - print("✅ [6] Registration successful!") - return {'success': True, 'data': resp.json()} - - else: - if DEBUG: - print(f"❌ [6] Registration failed: {resp.status_code}") - - return { - 'success': False, - 'status_code': resp.status_code, - 'error': resp.text - } - - def register(self, email: str, password: str) -> Dict: - """完整注册流程""" - - try: - # Step 1: 通过 ChatGPT web 初始化(获取正确的 session) - self._step1_init_through_chatgpt(email) - - # Step 2: 初始化 Sentinel - self._step2_init_sentinel() - self._step2_5_submit_sentinel() - self._step2_6_solve_pow() - self._step2_7_submit_pow() - - # Step 3: 尝试注册(提交邮箱和密码) - result = self._step3_attempt_register(email, password) - - # 检查是否需要邮箱验证 - if result and result.get('success') and result.get('requires_verification'): - - # Step 4: 验证邮箱 - verify_result = self._step4_verify_email( - mailbox=email, - continue_url=result['continue_url'] - ) - - if verify_result.get('success'): - # Step 4.5: 提交个人信息 - personal_info_result = self._step4_5_submit_personal_info() - - if personal_info_result.get('success'): - oauth_url = personal_info_result['oauth_url'] - - # Step 4.6: 完成 OAuth 回调流程,获取 session-token - self._step4_6_complete_oauth_flow(oauth_url) - - if DEBUG: - print(f"\n✅ Registration completed successfully!") - - return { - 'success': True, - 'verified': True, - 'email': email, - 'data': verify_result.get('data') - } - else: - raise Exception(f"Failed to submit personal info: {personal_info_result}") - - else: - raise Exception(f"Email verification failed: {verify_result.get('error')}") - - # 如果直接成功(无需验证) - elif result and result.get('success'): - if DEBUG: - print(f"\n✅ Registration completed (no verification needed)!") - return result - - # 如果注册失败(邮箱已被使用等) - elif result is None: - if DEBUG: - print(f"\n❌ Registration failed: Email already taken or invalid") - return { - 'success': False, - 'error': 'Email already taken or registration rejected' - } - - # 其他情况:触发 enforcement challenge(旧逻辑) - else: - if DEBUG: - print(f"\n⚠️ Enforcement challenge triggered") - - # Step 5: 解决挑战 - sentinel_token = self._step5_solve_challenge(result) - - # Step 6: 带 token 重新注册 - final_result = self._step6_register_with_token(email, password, sentinel_token) - - return final_result - - except Exception as e: - if DEBUG: - import traceback - print(f"\n❌ Registration failed with exception:") - traceback.print_exc() - - return { - 'success': False, - 'error': str(e) - } - - def register_with_auto_email(self, password: str) -> Dict: - """自动生成临时邮箱并完成注册流程 - - Args: - password: 要设置的密码 - - Returns: - 注册结果(包含邮箱地址) - """ - - # 检查是否配置了 TempMailClient - if not self.tempmail_client: - return { - 'success': False, - 'error': 'TempMailClient not configured. Please initialize with tempmail_client parameter.' - } - - generated_email = None - - try: - # Step 0: 生成临时邮箱(domain_index 从配置读取) - domain_index = TEMPMAIL_CONFIG.get('domain_index', 0) # 默认使用第1个域名 - generated_email = self.tempmail_client.generate_mailbox(domain_index=domain_index) - - if DEBUG: - print(f"\n✅ Generated temp email: {generated_email}") - - # 调用正常的注册流程 - result = self.register(email=generated_email, password=password) - - # 检查注册结果 - if result.get('success'): - if DEBUG: - print(f"\n✅ AUTO REGISTRATION SUCCESS") - print(f" Email: {generated_email}") - print(f" Password: {password}") - - return { - 'success': True, - 'email': generated_email, - 'password': password, - 'verified': result.get('verified', False), - 'data': result.get('data') - } - - else: - # 注册失败 → 删除邮箱 - if DEBUG: - print(f"\n❌ AUTO REGISTRATION FAILED") - print(f" Email: {generated_email}") - print(f" Error: {result.get('error')}") - - # 删除失败的邮箱 - self.tempmail_client.delete_mailbox(generated_email) - - return { - 'success': False, - 'email': generated_email, - 'error': result.get('error'), - 'mailbox_deleted': True - } - - except Exception as e: - # 发生异常 → 清理邮箱 - if DEBUG: - import traceback - print(f"\n❌ AUTO REGISTRATION EXCEPTION") - if generated_email: - print(f" Email: {generated_email}") - print(f" Exception: {e}") - traceback.print_exc() - - # 清理邮箱(如果已生成) - if generated_email: - try: - self.tempmail_client.delete_mailbox(generated_email) - except: - pass - - return { - 'success': False, - 'email': generated_email, - 'error': str(e), - 'mailbox_deleted': True if generated_email else False - } - - def add_payment_method( - self, - checkout_session_url: str, - iban: str, - name: str, - email: str, - address_line1: str, - city: str, - postal_code: str, - state: str, - country: str = "US" - ) -> Dict: - """ - 为账户添加Stripe支付方式(SEPA) - - Args: - checkout_session_url: Stripe checkout session URL - iban: 德国IBAN账号 - name: 持卡人姓名 - email: 邮箱 - address_line1: 街道地址 - city: 城市 - postal_code: 邮编 - state: 州/省 - country: 国家代码 - - Returns: - 支付结果字典 - """ - try: - if DEBUG: - print(f"\n🔐 [Payment] Starting payment method setup...") - print(f" Session URL: {checkout_session_url[:60]}...") - - # 初始化Stripe支付处理器 - payment_handler = StripePaymentHandler( - checkout_session_url=checkout_session_url, - http_client=self.http_client - ) - - # 执行完整支付流程 - success = payment_handler.complete_payment( - iban=iban, - name=name, - email=email, - address_line1=address_line1, - city=city, - postal_code=postal_code, - state=state, - country=country - ) - - if success: - if DEBUG: - print(f"\n✅ [Payment] Payment method added successfully!") - - return { - 'success': True, - 'message': 'Payment method added' - } - else: - if DEBUG: - print(f"\n❌ [Payment] Failed to add payment method") - - return { - 'success': False, - 'error': 'Payment setup failed' - } - - except Exception as e: - if DEBUG: - import traceback - print(f"\n❌ [Payment] Exception occurred:") - traceback.print_exc() - - return { - 'success': False, - 'error': str(e) - } diff --git a/test_cloudmail_standalone.py b/test_cloudmail_standalone.py deleted file mode 100644 index 196c29a..0000000 --- a/test_cloudmail_standalone.py +++ /dev/null @@ -1,183 +0,0 @@ -#!/usr/bin/env python3 -""" -Cloud Mail API 独立测试脚本 - -使用方法: -1. 配置 .env 文件中的 Cloud Mail 参数 -2. 运行: python test_cloudmail_standalone.py -""" - -import asyncio -import time -from config import load_config -from utils.mail_box import CloudMailHandler -from utils.logger import logger - - -async def test_email_query(handler: CloudMailHandler, test_email: str): - """测试邮件查询功能""" - logger.info("=" * 60) - logger.info("测试 1: 查询最近的邮件") - logger.info("=" * 60) - - try: - emails = await handler._query_emails( - to_email=test_email, - time_sort="desc", - size=5 - ) - - logger.success(f"✓ 查询到 {len(emails)} 封邮件") - - if emails: - for i, email in enumerate(emails, 1): - logger.info( - f" 邮件 {i}:\n" - f" 发件人: {email.get('sendEmail')}\n" - f" 主题: {email.get('subject')}\n" - f" 时间: {email.get('createTime')}" - ) - else: - logger.warning(" (邮箱为空)") - - return len(emails) > 0 - - except Exception as e: - logger.error(f"✗ 测试失败: {e}") - return False - - -async def test_otp_waiting(handler: CloudMailHandler, test_email: str): - """测试 OTP 等待功能""" - logger.info("") - logger.info("=" * 60) - logger.info("测试 2: OTP 等待功能") - logger.info("=" * 60) - logger.warning(f"请在 60 秒内向 {test_email} 发送测试 OTP 邮件") - logger.warning(f"发件人应为: {handler.OTP_SENDER}") - - try: - otp = await handler.wait_for_otp(test_email, timeout=60) - logger.success(f"✓ OTP 接收成功: {otp}") - return True - except TimeoutError: - logger.error("✗ 超时未收到 OTP") - return False - except Exception as e: - logger.error(f"✗ 测试失败: {e}") - return False - - -async def test_add_user(handler: CloudMailHandler): - """测试添加用户功能""" - logger.info("") - logger.info("=" * 60) - logger.info("测试 3: 添加测试用户") - logger.info("=" * 60) - - # 生成测试邮箱 - test_users = [ - {"email": f"test_{int(time.time())}@example.com"} - ] - - try: - result = await handler.add_users(test_users) - logger.success(f"✓ 用户创建请求已发送") - logger.info(f" 响应: {result}") - return True - except Exception as e: - logger.error(f"✗ 测试失败: {e}") - return False - - -async def main(): - """主测试流程""" - logger.info("=" * 60) - logger.info("Cloud Mail API 测试开始") - logger.info("=" * 60) - - # 加载配置 - try: - config = load_config() - except Exception as e: - logger.error(f"配置加载失败: {e}") - return - - # 验证配置 - if not config.mail.enabled or config.mail.type != "cloudmail": - logger.error("") - logger.error("请在 .env 中配置 Cloud Mail 参数:") - logger.error(" MAIL_ENABLED=true") - logger.error(" MAIL_TYPE=cloudmail") - logger.error(" MAIL_CLOUDMAIL_API_URL=https://your-domain.com") - logger.error(" MAIL_CLOUDMAIL_TOKEN=your_token") - return - - # 初始化 handler - try: - handler = CloudMailHandler(config.mail.to_dict()) - except Exception as e: - logger.error(f"CloudMailHandler 初始化失败: {e}") - return - - # 获取测试邮箱 - logger.info("") - test_email = input("请输入测试邮箱地址: ").strip() - - if not test_email: - logger.error("未输入邮箱地址,退出测试") - return - - # 运行测试 - results = {} - - try: - # 测试 1: 邮件查询 - results["email_query"] = await test_email_query(handler, test_email) - - # 测试 2: OTP 等待(可选) - if input("\n是否测试 OTP 等待功能? (y/N): ").lower() == 'y': - results["otp_waiting"] = await test_otp_waiting(handler, test_email) - - # 测试 3: 添加用户(可选) - if input("\n是否测试添加用户功能? (y/N): ").lower() == 'y': - results["add_user"] = await test_add_user(handler) - - finally: - # 清理资源 - await handler.close() - - # 测试总结 - logger.info("") - logger.info("=" * 60) - logger.info("测试结果总结") - logger.info("=" * 60) - - if results: - for name, passed in results.items(): - status = "✓ 通过" if passed else "✗ 失败" - logger.info(f" {name}: {status}") - - # 总体结果 - total = len(results) - passed = sum(1 for v in results.values() if v) - logger.info("") - logger.info(f"总计: {passed}/{total} 测试通过") - - if passed == total: - logger.success("所有测试通过!✅") - else: - logger.warning(f"部分测试失败 ({total - passed} 个)") - else: - logger.warning("未执行任何测试") - - logger.info("=" * 60) - - -if __name__ == "__main__": - try: - asyncio.run(main()) - except KeyboardInterrupt: - logger.warning("\n测试被用户中断") - except Exception as e: - logger.exception(f"测试过程中发生未捕获的异常: {e}") diff --git a/test_sentinel.py b/test_sentinel.py deleted file mode 100644 index 4eb9c68..0000000 --- a/test_sentinel.py +++ /dev/null @@ -1,277 +0,0 @@ -#!/usr/bin/env python3 -""" -Sentinel 集成测试脚本 - -验证 Sentinel 解决方案是否正确集成 -""" - -import sys -import asyncio -from pathlib import Path - - -def test_imports(): - """测试所有必要的模块导入""" - print("=" * 60) - print("测试 1: 模块导入") - print("=" * 60) - - try: - print("✓ 导入 utils.logger...") - from utils.logger import logger - - print("✓ 导入 utils.crypto...") - from utils.crypto import generate_oai_did, generate_random_password - - print("✓ 导入 utils.fingerprint...") - from utils.fingerprint import BrowserFingerprint - - print("✓ 导入 core.session...") - from core.session import OAISession - - print("✓ 导入 core.sentinel...") - from core.sentinel import SentinelHandler - - print("✓ 导入 reference.sentinel_solver...") - from reference.sentinel_solver import SentinelSolver - - print("✓ 导入 reference.js_executor...") - from reference.js_executor import JSExecutor - - print("\n✅ 所有模块导入成功!\n") - return True - - except Exception as e: - print(f"\n❌ 导入失败: {e}\n") - import traceback - traceback.print_exc() - return False - - -def test_node_availability(): - """测试 Node.js 是否可用""" - print("=" * 60) - print("测试 2: Node.js 环境检查") - print("=" * 60) - - import subprocess - - try: - result = subprocess.run( - ["node", "--version"], - capture_output=True, - text=True, - timeout=5 - ) - - if result.returncode == 0: - version = result.stdout.strip() - print(f"✓ Node.js 已安装: {version}") - return True - else: - print(f"❌ Node.js 执行失败: {result.stderr}") - return False - - except FileNotFoundError: - print("❌ Node.js 未安装或不在 PATH 中") - print(" 请安装 Node.js: https://nodejs.org/") - return False - except Exception as e: - print(f"❌ Node.js 检查失败: {e}") - return False - - -def test_sdk_file(): - """测试 SDK 文件是否存在""" - print("\n" + "=" * 60) - print("测试 3: SDK 文件检查") - print("=" * 60) - - sdk_path = Path("/home/carry/myprj/gptAutoPlus/sdk/sdk.js") - - if sdk_path.exists(): - size = sdk_path.stat().st_size - print(f"✓ SDK 文件存在: {sdk_path}") - print(f" 文件大小: {size:,} bytes ({size/1024:.1f} KB)") - return True - else: - print(f"❌ SDK 文件不存在: {sdk_path}") - print(" 请确保 sdk/sdk.js 文件存在") - return False - - -def test_fingerprint(): - """测试浏览器指纹生成""" - print("\n" + "=" * 60) - print("测试 4: 浏览器指纹生成") - print("=" * 60) - - try: - from utils.fingerprint import BrowserFingerprint - - fp = BrowserFingerprint() - config_array = fp.get_config_array() - - print(f"✓ 指纹生成成功") - print(f" Session ID: {fp.session_id}") - print(f" 配置数组长度: {len(config_array)}") - print(f" 配置数组前 3 项: {config_array[:3]}") - - if len(config_array) == 18: - print("✓ 配置数组长度正确 (18 个元素)") - return True - else: - print(f"❌ 配置数组长度错误: {len(config_array)} (期望 18)") - return False - - except Exception as e: - print(f"❌ 指纹生成失败: {e}") - import traceback - traceback.print_exc() - return False - - -async def test_sentinel_token(): - """测试 Sentinel Token 生成""" - print("\n" + "=" * 60) - print("测试 5: Sentinel Token 生成") - print("=" * 60) - - try: - from core.session import OAISession - from core.sentinel import SentinelHandler - - print("✓ 创建测试会话...") - session = OAISession() - - print(f"✓ Session 创建成功,oai-did: {session.oai_did}") - - print("✓ 初始化 SentinelHandler...") - sentinel = SentinelHandler(session) - - print("✓ 生成 Sentinel Token...") - print(" (这可能需要几秒钟,正在执行 PoW 计算...)") - - token = await sentinel.get_token() - - print(f"\n✅ Sentinel Token 生成成功!") - print(f" Token 前缀: {token[:30]}...") - print(f" Token 长度: {len(token)}") - - # 验证 token 格式 - if token.startswith("gAAAAA"): - print("✓ Token 格式正确") - return True - else: - print(f"⚠️ Token 格式异常: {token[:20]}...") - return True # 仍然算成功,因为可能是格式变化 - - except Exception as e: - print(f"\n❌ Sentinel Token 生成失败: {e}") - import traceback - traceback.print_exc() - return False - - -def test_crypto_utils(): - """测试加密工具""" - print("\n" + "=" * 60) - print("测试 6: 加密工具") - print("=" * 60) - - try: - from utils.crypto import ( - generate_oai_did, - generate_random_password, - validate_oai_did, - validate_password - ) - - # 测试 oai-did 生成 - oai_did = generate_oai_did() - print(f"✓ OAI-DID 生成: {oai_did}") - - is_valid = validate_oai_did(oai_did) - print(f"✓ OAI-DID 验证: {is_valid}") - - # 测试密码生成 - password = generate_random_password() - print(f"✓ 密码生成: {password}") - - is_valid, error = validate_password(password) - print(f"✓ 密码验证: {is_valid} {f'({error})' if error else ''}") - - if is_valid: - print("\n✅ 加密工具测试通过!") - return True - else: - print(f"\n❌ 密码验证失败: {error}") - return False - - except Exception as e: - print(f"\n❌ 加密工具测试失败: {e}") - import traceback - traceback.print_exc() - return False - - -async def main(): - """运行所有测试""" - print("\n" + "=" * 60) - print(" OpenAI 注册系统 - Sentinel 集成测试") - print("=" * 60) - print() - - results = [] - - # 运行所有测试 - results.append(("模块导入", test_imports())) - results.append(("Node.js 环境", test_node_availability())) - results.append(("SDK 文件", test_sdk_file())) - results.append(("浏览器指纹", test_fingerprint())) - results.append(("加密工具", test_crypto_utils())) - results.append(("Sentinel Token", await test_sentinel_token())) - - # 打印总结 - print("\n" + "=" * 60) - print(" 测试总结") - print("=" * 60) - - for name, passed in results: - status = "✅ 通过" if passed else "❌ 失败" - print(f" {name:20s} {status}") - - print("=" * 60) - - total = len(results) - passed = sum(1 for _, p in results if p) - - print(f"\n总计: {passed}/{total} 个测试通过") - - if passed == total: - print("\n🎉 所有测试通过!系统已准备就绪。") - print("\n下一步:") - print(" 1. 配置邮箱(修改 .env 文件)") - print(" 2. 运行主程序: python main.py") - return 0 - else: - print("\n⚠️ 部分测试失败,请检查上述错误信息。") - print("\n常见问题:") - print(" - Node.js 未安装: 请安装 Node.js v16+") - print(" - SDK 文件缺失: 确保 sdk/sdk.js 存在") - print(" - 依赖未安装: 运行 pip install -e .") - return 1 - - -if __name__ == "__main__": - try: - exit_code = asyncio.run(main()) - sys.exit(exit_code) - except KeyboardInterrupt: - print("\n\n⚠️ 测试被用户中断") - sys.exit(1) - except Exception as e: - print(f"\n\n❌ 测试程序异常: {e}") - import traceback - traceback.print_exc() - sys.exit(1) diff --git a/utils/fingerprint.py b/utils/fingerprint.py index b0e4cdc..7ebc75d 100644 --- a/utils/fingerprint.py +++ b/utils/fingerprint.py @@ -1,81 +1,175 @@ -""" -浏览器指纹生成模块 - -用于生成符合 OpenAI Sentinel 要求的浏览器指纹配置数组 -""" +# modules/fingerprint.py +"""浏览器指纹生成器""" import uuid +import random import time -from typing import List +from datetime import datetime +from typing import Dict, List, Any + +from config import load_config class BrowserFingerprint: - """ - 浏览器指纹生成器 - - 生成 Sentinel SDK 所需的配置数组(18 个元素) - """ + """生成符合 SDK 期望的浏览器指纹""" def __init__(self, session_id: str = None): - """ - 初始化浏览器指纹 - - 参数: - session_id: 会话 ID(oai-did),如果不提供则自动生成 - """ self.session_id = session_id or str(uuid.uuid4()) - self.start_time = time.time() - def get_config_array(self) -> List: + # 新增: 使用确定性方法从 session_id 派生 Stripe 指纹 + import hashlib + seed = hashlib.sha256(self.session_id.encode()).hexdigest() + # seed 是64个hex字符,我们需要确保切片正确 + + # 从 seed 生成一致的 guid/muid/sid + # UUID需要32个hex字符(去掉连字符),额外部分直接拼接 + self.stripe_guid = seed[:8] + '-' + seed[8:12] + '-' + seed[12:16] + '-' + seed[16:20] + '-' + seed[20:32] + seed[32:40] + self.stripe_muid = seed[:8] + '-' + seed[8:12] + '-' + seed[12:16] + '-' + seed[16:20] + '-' + seed[20:32] + seed[40:46] + self.stripe_sid = seed[:8] + '-' + seed[8:12] + '-' + seed[12:16] + '-' + seed[16:20] + '-' + seed[20:32] + seed[46:52] + + # 从配置加载指纹参数 + config = load_config() + fingerprint_cfg = config.fingerprint_config + + self.user_agent = fingerprint_cfg['user_agent'] + self.screen_width = fingerprint_cfg['screen_width'] + self.screen_height = fingerprint_cfg['screen_height'] + self.languages = fingerprint_cfg['languages'] + self.hardware_concurrency = fingerprint_cfg['hardware_concurrency'] + + def get_config_array(self) -> List[Any]: """ - 获取 Sentinel SDK 配置数组 + 生成 SDK getConfig() 函数返回的 18 元素数组 - 返回: - 包含 18 个元素的指纹数组 - - 数组结构(从 JS 逆向): - [0] screen dimensions (width*height) - [1] timestamp - [2] memory (hardwareConcurrency) - [3] nonce (动态值,PoW 时会修改) - [4] user agent - [5] random element - [6] script src - [7] language - [8] languages (joined) - [9] elapsed time (ms) - [10] random function test - [11] keys - [12] window keys - [13] performance.now() - [14] uuid (session_id) - [15] URL params - [16] hardware concurrency - [17] timeOrigin + 对应 SDK 源码: + [0]: screen.width + screen.height + [1]: new Date().toString() + [2]: performance.memory.jsHeapSizeLimit (可选) + [3]: nonce (PoW 填充) + [4]: navigator.userAgent + [5]: 随机 script.src + [6]: build ID + [7]: navigator.language + [8]: navigator.languages.join(',') + [9]: 运行时间 (PoW 填充) + [10]: 随机 navigator 属性 + [11]: 随机 document key + [12]: 随机 window key + [13]: performance.now() + [14]: session UUID + [15]: URL search params + [16]: navigator.hardwareConcurrency + [17]: performance.timeOrigin """ - elapsed_ms = int((time.time() - self.start_time) * 1000) + + # 模拟的 script sources + fake_scripts = [ + "https://sentinel.openai.com/sentinel/97790f37/sdk.js", + "https://chatgpt.com/static/js/main.abc123.js", + "https://cdn.oaistatic.com/_next/static/chunks/main.js", + ] + + # 模拟的 navigator 属性名 + navigator_props = [ + 'hardwareConcurrency', 'language', 'languages', + 'platform', 'userAgent', 'vendor' + ] + + # 模拟的 document keys + document_keys = ['body', 'head', 'documentElement', 'scripts'] + + # 模拟的 window keys + window_keys = ['performance', 'navigator', 'document', 'location'] + + current_time = time.time() * 1000 return [ - 1920 * 1080, # [0] screen dimensions - str(int(time.time() * 1000)), # [1] timestamp - 8, # [2] hardware concurrency - 0, # [3] nonce (placeholder) - "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.0.0 Safari/537.36", # [4] UA - str(0.123456789), # [5] random element - "https://chatgpt.com/_next/static/chunks/sentinel.js", # [6] script src - "en-US", # [7] language - "en-US,en", # [8] languages - elapsed_ms, # [9] elapsed time - "", # [10] random function - "", # [11] keys - "", # [12] window keys - elapsed_ms, # [13] performance.now() - self.session_id, # [14] uuid (oai-did) - "", # [15] URL params - 8, # [16] hardware concurrency - int(time.time() * 1000) - elapsed_ms, # [17] timeOrigin + self.screen_width + self.screen_height, # [0] + str(datetime.now()), # [1] + None, # [2] memory + None, # [3] nonce (placeholder) + self.user_agent, # [4] + random.choice(fake_scripts), # [5] + "97790f37", # [6] build ID + self.languages[0], # [7] + ",".join(self.languages), # [8] + None, # [9] runtime (placeholder) + f"{random.choice(navigator_props)}−{random.randint(1, 16)}", # [10] + random.choice(document_keys), # [11] + random.choice(window_keys), # [12] + current_time, # [13] + self.session_id, # [14] + "", # [15] URL params + self.hardware_concurrency, # [16] + current_time - random.uniform(100, 1000), # [17] timeOrigin ] + def get_cookies(self) -> Dict[str, str]: + """生成初始 cookies""" + return { + 'oai-did': self.session_id, + } + + def get_stripe_fingerprint(self) -> Dict[str, str]: + """获取 Stripe 支付指纹(与 session_id 一致派生)""" + return { + 'guid': self.stripe_guid, + 'muid': self.stripe_muid, + 'sid': self.stripe_sid, + } + + def get_headers(self, with_sentinel: str = None, host: str = 'auth.openai.com') -> Dict[str, str]: + """生成 HTTP headers(支持多域名)""" + + # 基础 headers + headers = { + 'User-Agent': self.user_agent, + 'Accept': 'application/json', + 'Accept-Language': f"{self.languages[0]},{self.languages[1]};q=0.5", + # Note: urllib3/requests only auto-decompress brotli/zstd when optional + # deps are installed; avoid advertising unsupported encodings. + 'Accept-Encoding': 'gzip, deflate', + 'Connection': 'keep-alive', + 'Sec-Fetch-Dest': 'empty', + 'Sec-Fetch-Mode': 'cors', + 'Sec-Fetch-Site': 'same-origin', + 'Priority': 'u=1, i', + 'Pragma': 'no-cache', + 'Cache-Control': 'no-cache', + } + + # 根据域名设置特定 headers + if 'chatgpt.com' in host: + headers.update({ + 'Origin': 'https://chatgpt.com', + 'Referer': 'https://chatgpt.com/', + }) + else: + headers.update({ + 'Origin': 'https://auth.openai.com', + 'Referer': 'https://auth.openai.com/create-account/password', + 'Content-Type': 'application/json', + }) + + # Sentinel token + if with_sentinel: + headers['openai-sentinel-token'] = with_sentinel + + # Datadog RUM tracing + trace_id = random.randint(10**18, 10**19 - 1) + parent_id = random.randint(10**18, 10**19 - 1) + + headers.update({ + 'traceparent': f'00-0000000000000000{trace_id:016x}-{parent_id:016x}-01', + 'tracestate': 'dd=s:1;o:rum', + 'x-datadog-origin': 'rum', + 'x-datadog-parent-id': str(parent_id), + 'x-datadog-sampling-priority': '1', + 'x-datadog-trace-id': str(trace_id), + }) + + return headers + # 导出 __all__ = ["BrowserFingerprint"] diff --git a/utils/mail_box.py b/utils/mail_box.py index 420ceff..f31ad73 100644 --- a/utils/mail_box.py +++ b/utils/mail_box.py @@ -1,14 +1,5 @@ """ -邮件接码处理器 - -用于接收和解析 OpenAI 发送的验证码邮件 - -⚠️ 本模块提供预留接口,用户需要根据实际情况配置邮箱服务 - -支持的邮箱方案: -1. IMAP 收件 (Gmail, Outlook, 自建邮箱) -2. 临时邮箱 API (TempMail, Guerrilla Mail, etc.) -3. 邮件转发服务 +邮件接码处理器 - 用于接收和解析 OpenAI 发送的验证码邮件 """ from typing import Optional, Dict, Any, List @@ -19,58 +10,23 @@ from utils.logger import logger class MailHandler: - """ - 邮件接码处理器 + """邮件接码处理器基类""" - ⚠️ 预留接口 - 用户需要配置实际的邮箱服务 - - 使用场景: - - 接收 OpenAI 发送的 6 位数字 OTP 验证码 - - 解析邮件内容提取验证码 - - 支持超时和重试机制 - """ - - # OTP 邮件特征 OTP_SUBJECT_KEYWORDS = ["openai", "verification", "verify", "code"] - OTP_SENDER = "noreply@tm.openai.com" # OpenAI 发件人地址 + OTP_SENDER = "noreply@tm.openai.com" def __init__(self, config: Optional[Dict[str, Any]] = None): - """ - 初始化邮件处理器 - - 参数: - config: 邮箱配置字典,可能包含: - - type: "imap" | "tempmail" | "api" | "cloudmail" - - host: IMAP 服务器地址 (如果使用 IMAP) - - port: IMAP 端口 (默认 993) - - username: 邮箱用户名 - - password: 邮箱密码 - - api_key: 临时邮箱 API Key (如果使用 API) - """ self.config = config or {} self.mail_type = self.config.get("type", "not_configured") if not config: - logger.warning( - "MailHandler initialized without configuration. " - "OTP retrieval will fail until configured." - ) + logger.warning("MailHandler initialized without configuration.") else: logger.info(f"MailHandler initialized with type: {self.mail_type}") @staticmethod def create(config: Optional[Dict[str, Any]]) -> "MailHandler": - """ - 工厂方法:创建合适的邮件处理器 - - 根据配置中的 type 字段自动选择正确的 handler 实现 - - 参数: - config: 邮箱配置字典 - - 返回: - MailHandler 实例(IMAPMailHandler 或 CloudMailHandler) - """ + """工厂方法:根据配置创建对应的邮件处理器""" if not config: return MailHandler(config) @@ -81,115 +37,22 @@ class MailHandler: elif mail_type == "cloudmail": return CloudMailHandler(config) else: - # 默认处理器(会抛出 NotImplementedError) return MailHandler(config) - async def wait_for_otp( - self, - email: str, - timeout: int = 300, - check_interval: int = 5 - ) -> str: - """ - 等待并提取 OTP 验证码 - - ⚠️ 预留接口 - 用户需要实现此方法 - - 参数: - email: 注册邮箱地址 - timeout: 超时时间(秒),默认 300 秒(5 分钟) - check_interval: 检查间隔(秒),默认 5 秒 - - 返回: - 6 位数字验证码(例如 "123456") - - 抛出: - NotImplementedError: 用户需要实现此方法 - TimeoutError: 超时未收到邮件 - ValueError: 邮件格式错误,无法提取 OTP - - 集成示例: - ```python - # 方案 1: 使用 IMAP (imap-tools 库) - from imap_tools import MailBox - with MailBox(self.config["host"]).login( - self.config["username"], - self.config["password"] - ) as mailbox: - for msg in mailbox.fetch(AND(from_=self.OTP_SENDER, seen=False)): - otp = self._extract_otp(msg.text) - if otp: - return otp - - # 方案 2: 使用临时邮箱 API - import httpx - async with httpx.AsyncClient() as client: - resp = await client.get( - f"https://tempmail.api/messages?email={email}", - headers={"Authorization": f"Bearer {self.config['api_key']}"} - ) - messages = resp.json() - for msg in messages: - otp = self._extract_otp(msg["body"]) - if otp: - return otp - - # 方案 3: 手动输入(调试用) - print(f"Please enter OTP for {email}:") - return input().strip() - ``` - """ - logger.info( - f"Waiting for OTP for {email} " - f"(timeout: {timeout}s, check_interval: {check_interval}s)" - ) - - raise NotImplementedError( - "❌ Mail handler not configured.\n\n" - "User needs to configure email service for OTP retrieval.\n\n" - "Configuration options:\n\n" - "1. IMAP (Gmail, Outlook, custom):\n" - " config = {\n" - " 'type': 'imap',\n" - " 'host': 'imap.gmail.com',\n" - " 'port': 993,\n" - " 'username': 'your@email.com',\n" - " 'password': 'app_password'\n" - " }\n\n" - "2. Temporary email API:\n" - " config = {\n" - " 'type': 'tempmail',\n" - " 'api_key': 'YOUR_API_KEY',\n" - " 'api_endpoint': 'https://api.tempmail.com'\n" - " }\n\n" - "3. Manual input (for debugging):\n" - " config = {'type': 'manual'}\n\n" - "Example implementation location: utils/mail_box.py -> wait_for_otp()" - ) + async def wait_for_otp(self, email: str, timeout: int = 300, check_interval: int = 5) -> str: + """等待并提取 OTP 验证码(需子类实现)""" + logger.info(f"Waiting for OTP for {email} (timeout: {timeout}s)") + raise NotImplementedError("Mail handler not configured.") def _extract_otp(self, text: str) -> Optional[str]: - """ - 从邮件正文中提取 OTP 验证码 - - OpenAI 邮件格式示例: - "Your OpenAI verification code is: 123456" - "Enter this code: 123456" - - 参数: - text: 邮件正文(纯文本或 HTML) - - 返回: - 6 位数字验证码,未找到则返回 None - """ - # 清理 HTML 标签(如果有) + """从邮件正文中提取 6 位 OTP 验证码""" text = re.sub(r'<[^>]+>', ' ', text) - # 常见的 OTP 模式 patterns = [ - r'verification code is[:\s]+(\d{6})', # "verification code is: 123456" - r'code[:\s]+(\d{6})', # "code: 123456" - r'enter[:\s]+(\d{6})', # "enter: 123456" - r'(\d{6})', # 任意 6 位数字(最后尝试) + r'verification code is[:\s]+(\d{6})', + r'code[:\s]+(\d{6})', + r'enter[:\s]+(\d{6})', + r'(\d{6})', ] for pattern in patterns: @@ -202,118 +65,27 @@ class MailHandler: logger.warning("Failed to extract OTP from email text") return None - def _check_imap(self, email: str, timeout: int) -> Optional[str]: - """ - 使用 IMAP 检查邮件(预留方法) - - 参数: - email: 注册邮箱 - timeout: 超时时间(秒) - - 返回: - OTP 验证码,未找到则返回 None - """ - # TODO: 用户实现 IMAP 检查逻辑 - # 需要安装: pip install imap-tools - raise NotImplementedError("IMAP checking not implemented") - - def _check_tempmail_api(self, email: str, timeout: int) -> Optional[str]: - """ - 使用临时邮箱 API 检查邮件(预留方法) - - 参数: - email: 注册邮箱 - timeout: 超时时间(秒) - - 返回: - OTP 验证码,未找到则返回 None - """ - # TODO: 用户实现临时邮箱 API 调用逻辑 - raise NotImplementedError("Temp mail API not implemented") - - def generate_temp_email(self) -> str: - """ - 生成临时邮箱地址(可选功能) - - ⚠️ 预留接口 - 如果使用临时邮箱服务,需要实现此方法 - - 返回: - 临时邮箱地址(例如 "random123@tempmail.com") - - 抛出: - NotImplementedError: 用户需要实现此方法 - """ - raise NotImplementedError( - "Temp email generation not implemented. " - "Integrate a temp mail service API if needed." - ) - def verify_email_deliverability(self, email: str) -> bool: - """ - 验证邮箱地址是否可以接收邮件(可选功能) - - 参数: - email: 邮箱地址 - - 返回: - True 如果邮箱有效且可接收邮件,否则 False - """ - # 基本格式验证 + """验证邮箱地址格式是否有效""" email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$' if not re.match(email_pattern, email): logger.warning(f"Invalid email format: {email}") return False - - # TODO: 用户可以添加更严格的验证逻辑 - # 例如:DNS MX 记录查询、SMTP 验证等 - - logger.info(f"Email format valid: {email}") return True class IMAPMailHandler(MailHandler): - """ - 基于 IMAP 的邮件处理器(完整实现示例) + """基于 IMAP 的邮件处理器""" - ⚠️ 这是一个参考实现,用户可以根据需要修改 - - 依赖: - pip install imap-tools - """ - - async def wait_for_otp( - self, - email: str, - timeout: int = 300, - check_interval: int = 5 - ) -> str: - """ - 使用 IMAP 等待 OTP 邮件 - - 参数: - email: 注册邮箱 - timeout: 超时时间(秒) - check_interval: 检查间隔(秒) - - 返回: - 6 位数字验证码 - - 抛出: - ImportError: 未安装 imap-tools - TimeoutError: 超时未收到邮件 - ValueError: 配置错误或邮件格式错误 - """ + async def wait_for_otp(self, email: str, timeout: int = 300, check_interval: int = 5) -> str: + """使用 IMAP 等待 OTP 邮件""" try: from imap_tools import MailBox, AND except ImportError: - raise ImportError( - "imap-tools not installed. Install with: pip install imap-tools" - ) + raise ImportError("imap-tools not installed. Install with: pip install imap-tools") if not all(k in self.config for k in ["host", "username", "password"]): - raise ValueError( - "IMAP configuration incomplete. Required: host, username, password" - ) + raise ValueError("IMAP configuration incomplete. Required: host, username, password") start_time = time.time() logger.info(f"Connecting to IMAP server: {self.config['host']}") @@ -324,86 +96,44 @@ class IMAPMailHandler(MailHandler): self.config["username"], self.config["password"] ) as mailbox: - # 查找未读邮件,来自 OpenAI for msg in mailbox.fetch( AND(from_=self.OTP_SENDER, seen=False), - reverse=True, # 最新的邮件优先 + reverse=True, limit=10 ): - # 检查主题是否包含 OTP 关键词 if any(kw in msg.subject.lower() for kw in self.OTP_SUBJECT_KEYWORDS): otp = self._extract_otp(msg.text or msg.html) if otp: logger.success(f"OTP received: {otp}") - # 标记为已读 mailbox.flag([msg.uid], ['\\Seen'], True) return otp - except Exception as e: logger.warning(f"IMAP check failed: {e}") - # 等待下一次检查 - elapsed = time.time() - start_time - remaining = timeout - elapsed - logger.debug( - f"No OTP found, waiting {check_interval}s " - f"(remaining: {int(remaining)}s)" - ) + logger.debug(f"No OTP found, waiting {check_interval}s") time.sleep(check_interval) - raise TimeoutError( - f"Timeout waiting for OTP email (timeout: {timeout}s). " - f"Email: {email}, Sender: {self.OTP_SENDER}" - ) + raise TimeoutError(f"Timeout waiting for OTP email ({timeout}s)") class CloudMailHandler(MailHandler): - """ - Cloud Mail API handler with external token management - - 使用外部预生成的 Token 管理邮件,不调用 genToken API - - 依赖: - pip install httpx - - 配置示例: - config = { - "type": "cloudmail", - "api_base_url": "https://your-cloudmail-domain.com", - "token": "9f4e298e-7431-4c76-bc15-4931c3a73984", - "target_email": "user@example.com" # 可选 - } - """ + """Cloud Mail API 邮件处理器""" def __init__(self, config: Optional[Dict[str, Any]] = None): - """ - 初始化 Cloud Mail handler - - 参数: - config: 配置字典,必填项: - - api_base_url: Cloud Mail API 基础 URL - - token: 预生成的身份令牌 - - target_email: (可选) 指定监控的邮箱地址 - """ super().__init__(config) - # 验证必填配置 required = ["api_base_url", "token"] missing = [key for key in required if not self.config.get(key)] - if missing: - raise ValueError( - f"CloudMail configuration incomplete. Missing: {', '.join(missing)}\n" - f"Required: api_base_url, token" - ) + raise ValueError(f"CloudMail config incomplete. Missing: {', '.join(missing)}") self.api_base_url = self.config["api_base_url"].rstrip("/") self.token = self.config["token"] self.target_email = self.config.get("target_email") - self.domain = self.config.get("domain") # 邮箱域名 + self.domain = self.config.get("domain") self._client: Optional[Any] = None - logger.info(f"CloudMailHandler initialized (API: {self.api_base_url}, Domain: {self.domain or 'N/A'})") + logger.info(f"CloudMailHandler initialized (API: {self.api_base_url})") async def _get_client(self): """懒加载 HTTP 客户端""" @@ -411,9 +141,7 @@ class CloudMailHandler(MailHandler): try: import httpx except ImportError: - raise ImportError( - "httpx not installed. Install with: pip install httpx" - ) + raise ImportError("httpx not installed. Install with: pip install httpx") self._client = httpx.AsyncClient( timeout=30.0, @@ -433,33 +161,19 @@ class CloudMailHandler(MailHandler): num: int = 1, size: int = 20 ) -> List[Dict[str, Any]]: - """ - 查询邮件列表 (POST /api/public/emailList) - - 参数: - to_email: 收件人邮箱 - send_email: 发件人邮箱(可选,支持 % 通配符) - subject: 主题关键词(可选) - time_sort: 时间排序 (desc/asc) - num: 页码(从 1 开始) - size: 每页数量 - - 返回: - 邮件列表 - """ + """查询邮件列表 (POST /api/public/emailList)""" client = await self._get_client() url = f"{self.api_base_url}/api/public/emailList" payload = { "toEmail": to_email, - "type": 0, # 0=收件箱 - "isDel": 0, # 0=未删除 + "type": 0, + "isDel": 0, "timeSort": time_sort, "num": num, "size": size } - # 可选参数 if send_email: payload["sendEmail"] = send_email if subject: @@ -468,38 +182,17 @@ class CloudMailHandler(MailHandler): try: resp = await client.post(url, json=payload) - # 检查认证错误 if resp.status_code in [401, 403]: - raise RuntimeError( - "CloudMail token expired or invalid.\n" - "Please regenerate token and update MAIL_CLOUDMAIL_TOKEN in .env\n" - "Steps:\n" - "1. Login to Cloud Mail with admin account\n" - "2. Call POST /api/public/genToken to generate new token\n" - "3. Update MAIL_CLOUDMAIL_TOKEN in .env\n" - "4. Restart the program" - ) + raise RuntimeError("CloudMail token expired or invalid.") if resp.status_code != 200: - raise RuntimeError( - f"CloudMail API error: {resp.status_code} - {resp.text[:200]}" - ) + raise RuntimeError(f"CloudMail API error: {resp.status_code}") data = resp.json() - - # 检查业务逻辑错误 if data.get("code") != 200: - error_msg = data.get("message", "Unknown error") - raise RuntimeError( - f"CloudMail API error: {error_msg} (code: {data.get('code')})" - ) + raise RuntimeError(f"CloudMail API error: {data.get('message')}") - # 返回邮件列表 result = data.get("data", {}) - - # CloudMail API 可能返回两种格式: - # 1. {"data": {"list": [...]}} - 标准格式 - # 2. {"data": [...]} - 直接列表格式 if isinstance(result, list): emails = result elif isinstance(result, dict): @@ -512,174 +205,102 @@ class CloudMailHandler(MailHandler): except Exception as e: if "httpx" in str(type(e).__module__): - # httpx 网络错误 raise RuntimeError(f"CloudMail API network error: {e}") - else: - # 重新抛出其他错误 - raise + raise async def ensure_email_exists(self, email: str) -> bool: - """ - 确保邮箱账户存在(如果不存在则创建) + """确保邮箱账户存在(如果不存在则创建)""" + logger.info(f"CloudMail: Creating email account {email}...") - 用于在注册流程开始前自动创建 Cloud Mail 邮箱账户 - - 参数: - email: 邮箱地址 - - 返回: - True 如果邮箱已存在或成功创建 - """ try: - # 先尝试查询邮箱(检查是否存在) - logger.debug(f"CloudMail: Checking if {email} exists...") - emails = await self._query_emails( - to_email=email, - size=1 - ) - - # 如果能查询到,说明邮箱存在 - logger.debug(f"CloudMail: Email {email} already exists") + result = await self.add_users([{"email": email}]) + logger.success(f"CloudMail: Email {email} created successfully") + logger.debug(f"CloudMail: API response: {result}") return True - except Exception as e: - # 查询失败可能是邮箱不存在,尝试创建 - logger.info(f"CloudMail: Creating email account {email}...") - - try: - await self.add_users([{"email": email}]) - logger.success(f"CloudMail: Email {email} created successfully") + except RuntimeError as e: + error_msg = str(e).lower() + if "already" in error_msg or "exist" in error_msg or "duplicate" in error_msg: + logger.debug(f"CloudMail: Email {email} already exists") return True - except Exception as create_error: - logger.error(f"CloudMail: Failed to create email {email}: {create_error}") - raise + logger.error(f"CloudMail: Failed to create email {email}: {e}") + raise - async def wait_for_otp( - self, - email: str, - timeout: int = 300, - check_interval: int = 5 - ) -> str: - """ - 等待 OTP 邮件(轮询实现) + except Exception as e: + logger.error(f"CloudMail: Failed to create email {email}: {e}") + raise - 参数: - email: 注册邮箱 - timeout: 超时时间(秒) - check_interval: 检查间隔(秒) - - 返回: - 6 位数字验证码 - - 抛出: - TimeoutError: 超时未收到邮件 - ValueError: 邮件格式错误,无法提取 OTP - """ + async def wait_for_otp(self, email: str, timeout: int = 300, check_interval: int = 5) -> str: + """等待 OTP 邮件(轮询实现)""" start_time = time.time() - logger.info( - f"CloudMail: Waiting for OTP for {email} " - f"(timeout: {timeout}s, interval: {check_interval}s)" - ) + logger.info(f"CloudMail: Waiting for OTP for {email} (timeout: {timeout}s)") while time.time() - start_time < timeout: try: - # 查询最近的邮件 + # 模糊匹配 openai 发件人 emails = await self._query_emails( to_email=email, - send_email=self.OTP_SENDER, + send_email="%openai%", time_sort="desc", size=10 ) - # 检查每封邮件 + # 备选:不过滤发件人 + if not emails: + emails = await self._query_emails( + to_email=email, + time_sort="desc", + size=10 + ) + for msg in emails: subject = msg.get("subject", "").lower() + sender = msg.get("sendEmail", "").lower() + is_from_openai = "openai" in sender or "noreply" in sender - # 检查主题是否包含 OTP 关键词 - if any(kw in subject for kw in self.OTP_SUBJECT_KEYWORDS): - # 尝试从邮件内容提取 OTP + if any(kw in subject for kw in self.OTP_SUBJECT_KEYWORDS) or is_from_openai: content = msg.get("text") or msg.get("content") or "" otp = self._extract_otp(content) - if otp: - logger.success(f"CloudMail: OTP received: {otp}") + logger.success(f"CloudMail: OTP received: {otp} (from: {sender})") return otp - # 等待下一次检查 - elapsed = time.time() - start_time - remaining = timeout - elapsed - logger.debug( - f"CloudMail: No OTP found, waiting {check_interval}s " - f"(remaining: {int(remaining)}s)" - ) + remaining = timeout - (time.time() - start_time) + logger.debug(f"CloudMail: No OTP found, waiting {check_interval}s (remaining: {int(remaining)}s)") await asyncio.sleep(check_interval) except Exception as e: logger.warning(f"CloudMail: Query error: {e}") await asyncio.sleep(check_interval) - raise TimeoutError( - f"Timeout waiting for OTP email (timeout: {timeout}s). " - f"Email: {email}, Sender: {self.OTP_SENDER}" - ) + raise TimeoutError(f"Timeout waiting for OTP email ({timeout}s)") - async def add_users( - self, - users: List[Dict[str, str]] - ) -> Dict[str, Any]: - """ - 添加用户 (POST /api/public/addUser) - - 参数: - users: 用户列表,格式: - [ - { - "email": "test@example.com", - "password": "optional", # 可选 - "roleName": "optional" # 可选 - } - ] - - 返回: - API 响应数据 - """ + async def add_users(self, users: List[Dict[str, str]]) -> Dict[str, Any]: + """添加用户 (POST /api/public/addUser)""" client = await self._get_client() url = f"{self.api_base_url}/api/public/addUser" - payload = {"list": users} try: resp = await client.post(url, json=payload) - # 检查认证错误 if resp.status_code in [401, 403]: - raise RuntimeError( - "CloudMail token expired or invalid. " - "Please regenerate token and update MAIL_CLOUDMAIL_TOKEN in .env" - ) + raise RuntimeError("CloudMail token expired or invalid.") if resp.status_code != 200: - raise RuntimeError( - f"CloudMail addUser API error: {resp.status_code} - {resp.text[:200]}" - ) + raise RuntimeError(f"CloudMail addUser error: {resp.status_code}") data = resp.json() - - # 检查业务逻辑错误 if data.get("code") != 200: - error_msg = data.get("message", "Unknown error") - raise RuntimeError( - f"CloudMail addUser error: {error_msg} (code: {data.get('code')})" - ) + raise RuntimeError(f"CloudMail addUser error: {data.get('message')}") - logger.info(f"CloudMail: Users added successfully: {len(users)} users") + logger.info(f"CloudMail: Users added: {len(users)}") return data except Exception as e: if "httpx" in str(type(e).__module__): raise RuntimeError(f"CloudMail API network error: {e}") - else: - raise + raise async def close(self): """清理资源""" @@ -688,9 +309,4 @@ class CloudMailHandler(MailHandler): logger.debug("CloudMail: HTTP client closed") -# 导出主要接口 -__all__ = [ - "MailHandler", - "IMAPMailHandler", - "CloudMailHandler", -] +__all__ = ["MailHandler", "IMAPMailHandler", "CloudMailHandler"]