# modules/registrar.py (更新版) """OpenAI 注册流程控制器""" import json from typing import Dict, Optional import secrets import random import uuid import requests from urllib.parse import urlparse from .fingerprint import BrowserFingerprint from .sentinel_solver import SentinelSolver from .http_client import HTTPClient from .stripe_payment import StripePaymentHandler from config import AUTH_BASE_URL, DEBUG, TEMPMAIL_CONFIG from modules.pow_solver import ProofOfWorkSolver from modules.tempmail import TempMailClient class OpenAIRegistrar: """完整的 OpenAI 注册流程""" def __init__(self, session_id: Optional[str] = None, tempmail_client: Optional[TempMailClient] = None): self.fingerprint = BrowserFingerprint(session_id) self.solver = SentinelSolver(self.fingerprint) self.http_client = HTTPClient(self.fingerprint) self.pow_solver = ProofOfWorkSolver() # 新增 self.tempmail_client = tempmail_client # 临时邮箱客户端(可选) def _step1_init_through_chatgpt(self, email: str): """通过 ChatGPT web 初始化注册流程""" # 1.1 访问 ChatGPT 首页 chatgpt_url = "https://chatgpt.com/" headers = self.http_client.fingerprint.get_headers(host='chatgpt.com') headers['Accept'] = 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8' headers['Sec-Fetch-Dest'] = 'document' headers['Sec-Fetch-Mode'] = 'navigate' headers['Sec-Fetch-Site'] = 'none' resp = self.http_client.session.get( chatgpt_url, headers=headers, timeout=30 ) if DEBUG: print(f"✅ [1.1] Visited ChatGPT ({resp.status_code})") # 1.2 获取 CSRF token csrf_url = "https://chatgpt.com/api/auth/csrf" csrf_headers = headers.copy() csrf_headers['Accept'] = 'application/json, text/plain, */*' csrf_headers['Sec-Fetch-Dest'] = 'empty' csrf_headers['Referer'] = 'https://chatgpt.com/' resp = self.http_client.session.get( csrf_url, headers=csrf_headers, timeout=30 ) if DEBUG: print(f"✅ [1.2] CSRF API ({resp.status_code})") # 优先从 cookie 提取 CSRF token csrf_token = None csrf_cookie = self.http_client.cookies.get('__Host-next-auth.csrf-token', '') if csrf_cookie: from urllib.parse import unquote csrf_cookie = unquote(csrf_cookie) if '|' in csrf_cookie: csrf_token = csrf_cookie.split('|')[0] if DEBUG: print(f"✅ [1.2] CSRF token extracted") # 备选:从响应 JSON 提取 if not csrf_token and resp.status_code == 200: try: data = resp.json() csrf_token = data.get('csrfToken', '') if csrf_token and DEBUG: print(f"✅ [1.2] CSRF token from JSON") except Exception as e: if DEBUG: print(f"⚠️ [1.2] Failed to parse JSON") if not csrf_token: if DEBUG: print(f"❌ [1.2] Failed to obtain CSRF token") raise Exception(f"Failed to obtain CSRF token") # 1.3 初始化注册(通过 NextAuth) import uuid auth_session_logging_id = str(uuid.uuid4()) signin_url = "https://chatgpt.com/api/auth/signin/openai" signin_params = { 'prompt': 'login', 'ext-oai-did': self.fingerprint.session_id, 'auth_session_logging_id': auth_session_logging_id, 'screen_hint': 'signup', # 明确指定注册 'login_hint': email, # 🔥 关键:传入邮箱 } signin_data = { 'callbackUrl': 'https://chatgpt.com/', 'csrfToken': csrf_token, 'json': 'true', } signin_headers = headers.copy() signin_headers['Content-Type'] = 'application/x-www-form-urlencoded' signin_headers['Accept'] = '*/*' signin_headers['Referer'] = 'https://chatgpt.com/' signin_headers['Sec-Fetch-Mode'] = 'cors' signin_headers['Sec-Fetch-Dest'] = 'empty' resp = self.http_client.session.post( signin_url, params=signin_params, data=signin_data, headers=signin_headers, allow_redirects=False, timeout=30 ) if DEBUG: print(f"✅ [1.3] NextAuth response ({resp.status_code})") # 1.4 提取 OAuth URL(从 JSON 响应) if resp.status_code == 200: try: try: data = resp.json() except Exception: data = json.loads(resp.text or "{}") oauth_url = data.get('url') if not oauth_url: raise Exception(f"No 'url' in NextAuth response: {data}") if DEBUG: print(f"✅ [1.4] Got OAuth URL") except Exception as e: raise Exception(f"Failed to parse NextAuth response: {e}") elif resp.status_code in [301, 302, 303, 307, 308]: # 旧版流程:直接重定向 oauth_url = resp.headers.get('Location') if not oauth_url: if DEBUG: print(f"❌ [1.4] Got redirect but no Location header") raise Exception("Got redirect but no Location header") if DEBUG: print(f"✅ [1.4] Got OAuth redirect") else: raise Exception(f"Unexpected NextAuth response: {resp.status_code}") # 1.5 访问 OAuth authorize endpoint auth_headers = self.http_client.fingerprint.get_headers(host='auth.openai.com') auth_headers['Accept'] = 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8' auth_headers['Referer'] = 'https://chatgpt.com/' auth_headers['Sec-Fetch-Dest'] = 'document' auth_headers['Sec-Fetch-Mode'] = 'navigate' auth_headers['Sec-Fetch-Site'] = 'cross-site' resp = self.http_client.session.get( oauth_url, headers=auth_headers, allow_redirects=True, # 自动跟随重定向 timeout=30 ) if DEBUG: print(f"✅ [1.5] OAuth flow completed ({resp.status_code})") # 检查必需的 cookies required_cookies = ['login_session', 'oai-did'] missing = [c for c in required_cookies if c not in self.http_client.cookies] if missing: if DEBUG: print(f"⚠️ Missing cookies: {', '.join(missing)}") else: if DEBUG: print(f"✅ All required cookies present") # 1.6 确保获取 auth.openai.com 的 CSRF(用于后续 /register 请求) try: auth_csrf = self.http_client.get_csrf_token(domain="auth.openai.com") if DEBUG and auth_csrf: print(f"✅ [1.6] Auth CSRF obtained") except Exception as e: if DEBUG: print(f"⚠️ [1.6] Failed to get auth CSRF") def _step2_init_sentinel(self): """初始化 Sentinel(生成 token)""" try: token_data = self.solver.generate_requirements_token() self.sentinel_token = json.dumps(token_data) if DEBUG: print(f"✅ [2] Sentinel token generated") except Exception as e: if DEBUG: print(f"❌ [2] Sentinel initialization failed: {e}") raise def _step2_5_submit_sentinel(self): """Step 2.5: 提交 Sentinel token 到 OpenAI""" # 如果 sentinel_token 是字符串,先解析 if isinstance(self.sentinel_token, str): token_data = json.loads(self.sentinel_token) else: token_data = self.sentinel_token url = "https://sentinel.openai.com/backend-api/sentinel/req" payload = { "p": token_data['p'], "id": self.fingerprint.session_id, "flow": "username_password_create" } headers = self.http_client.fingerprint.get_headers(host='sentinel.openai.com') headers['Content-Type'] = 'text/plain;charset=UTF-8' # 注意:text/plain headers['Origin'] = 'https://sentinel.openai.com' headers['Referer'] = 'https://sentinel.openai.com/backend-api/sentinel/frame.html' headers['Accept'] = '*/*' headers['Sec-Fetch-Site'] = 'same-origin' headers['Sec-Fetch-Mode'] = 'cors' headers['Sec-Fetch-Dest'] = 'empty' resp = self.http_client.session.post( url, json=payload, headers=headers, timeout=30 ) if resp.status_code != 200: if DEBUG: print(f"❌ [2.5] Failed to submit Sentinel ({resp.status_code})") raise Exception(f"Failed to submit Sentinel token: {resp.status_code} {resp.text}") try: data = resp.json() if DEBUG: print(f"✅ [2.5] Sentinel accepted (persona: {data.get('persona')})") # 检查是否需要额外验证 if data.get('turnstile', {}).get('required'): print(f"⚠️ Turnstile required") if data.get('proofofwork', {}).get('required'): print(f"⚠️ Proof of Work required (difficulty: {data.get('proofofwork', {}).get('difficulty')})") # 保存 token(可能后续需要) self.sentinel_response = data return data except Exception as e: if DEBUG: print(f"❌ [2.5] Failed to parse response: {e}") raise def _step2_6_solve_pow(self): """Step 2.6: 解 Proof of Work""" pow_data = self.sentinel_response.get('proofofwork', {}) if not pow_data.get('required'): if DEBUG: print("⏭️ [2.6] PoW not required, skipping") return None seed = pow_data.get('seed') difficulty = pow_data.get('difficulty') if not seed or not difficulty: raise Exception(f"Missing PoW parameters") # 解 PoW self.pow_answer = self.pow_solver.solve(seed, difficulty) if DEBUG: print(f"✅ [2.6] PoW solved (difficulty: {difficulty})") return self.pow_answer def _step2_7_submit_pow(self): """Step 2.7: 提交 PoW 答案到 Sentinel""" url = "https://sentinel.openai.com/backend-api/sentinel/req" # 再次生成 requirements token(或重用之前的) requirements_token = self.sentinel_token.split('"p": "')[1].split('"')[0] payload = { "p": requirements_token, "id": self.fingerprint.session_id, "answer": self.pow_answer } headers = self.http_client.fingerprint.get_headers(host='sentinel.openai.com') headers['Content-Type'] = 'text/plain;charset=UTF-8' headers['Origin'] = 'https://sentinel.openai.com' headers['Referer'] = 'https://sentinel.openai.com/backend-api/sentinel/frame.html' # 补全 Sec-Fetch-* headers headers['Sec-Fetch-Dest'] = 'empty' headers['Sec-Fetch-Mode'] = 'cors' headers['Sec-Fetch-Site'] = 'same-origin' resp = self.http_client.session.post( url, json=payload, headers=headers, timeout=30 ) if resp.status_code != 200: if DEBUG: print(f"❌ [2.7] Failed to submit PoW ({resp.status_code})") raise Exception(f"Failed to submit PoW: {resp.status_code}") # 解析响应 result = resp.json() if DEBUG: print(f"✅ [2.7] PoW accepted") if 'turnstile' in result: print(f"⚠️ Turnstile still required") # 保存最终响应 self.sentinel_response = result return result def _step3_attempt_register(self, email: str, password: str) -> Optional[Dict]: """尝试注册(不验证邮箱)""" # 3.2 准备注册请求(正确的 payload) url = "https://auth.openai.com/api/accounts/user/register" payload = { "username": email, # 不是 "email"! "password": password, } # 3.3 准备 headers(完全匹配真实请求) headers = self.http_client.fingerprint.get_headers(host='auth.openai.com') # 添加 Sentinel token if hasattr(self, 'sentinel_token') and self.sentinel_token: headers['Openai-Sentinel-Token'] = self.sentinel_token # 注意大小写 headers.update({ 'Content-Type': 'application/json', 'Accept': 'application/json', 'Accept-Language': 'zh-CN,zh;q=0.9', 'Origin': 'https://auth.openai.com', 'Referer': 'https://auth.openai.com/create-account/password', # 注意是 /password 'Sec-Fetch-Dest': 'empty', 'Sec-Fetch-Mode': 'cors', 'Sec-Fetch-Site': 'same-origin', 'Priority': 'u=1, i', }) # 添加 Datadog tracing headers(匹配真实请求) import secrets headers.update({ 'X-Datadog-Trace-Id': str(secrets.randbits(63)), 'X-Datadog-Parent-Id': str(secrets.randbits(63)), 'X-Datadog-Sampling-Priority': '1', 'X-Datadog-Origin': 'rum', 'Traceparent': f'00-0000000000000000{secrets.token_hex(8)}-{secrets.token_hex(8)}-01', 'Tracestate': 'dd=s:1;o:rum', }) # 3.4 准备 cookies(使用所有 auth.openai.com 的 cookies) # 不要手动过滤,让 requests 自动处理域名匹配 # 3.5 发送注册请求 resp = self.http_client.session.post( url, json=payload, headers=headers, # 不手动指定 cookies,让 session 自动处理 timeout=30 ) # 3.6 处理响应 try: data = resp.json() if resp.status_code == 200: if DEBUG: print(f"✅ [3] Registration successful!") # 检查是否需要邮箱验证 continue_url = data.get('continue_url', '') if 'email-otp' in continue_url: if DEBUG: print(f"⏭️ [3] Email OTP verification required") return { 'success': True, 'requires_verification': True, 'continue_url': continue_url, 'method': data.get('method', 'GET'), 'data': data } else: return {'success': True, 'data': data} elif resp.status_code == 409: error = data.get('error', {}) error_code = error.get('code', '') if error_code == 'invalid_state': # Session 无效 if DEBUG: print(f"❌ [3] Invalid session") raise Exception(f"Invalid session: {error}") elif error_code == 'email_taken' or 'already' in str(error).lower(): # 邮箱已被使用 if DEBUG: print(f"⚠️ [3] Email already registered") return None else: if DEBUG: print(f"❌ [3] Registration conflict: {error_code}") raise Exception(f"Registration conflict: {data}") elif resp.status_code == 400: if DEBUG: print(f"❌ [3] Bad Request: {data}") raise Exception(f"Bad request: {data}") else: if DEBUG: print(f"❌ [3] Unexpected status: {resp.status_code}") raise Exception(f"Registration failed with {resp.status_code}: {data}") except json.JSONDecodeError as e: if DEBUG: print(f"❌ [3] Failed to parse JSON response") raise Exception(f"Invalid JSON response: {e}") except Exception as e: raise def _step5_get_access_token(self) -> str: """Step 5: Retrieve access token from authenticated session This method leverages the existing session cookies (login_session, oai-did, __Host-next-auth.csrf-token) obtained after successful registration and email verification. Returns: Access token string Raises: Exception: If access token retrieval fails """ url = "https://chatgpt.com/api/auth/session" headers = self.http_client.fingerprint.get_headers(host='chatgpt.com') headers.update({ 'Accept': 'application/json', 'Referer': 'https://chatgpt.com/', 'Sec-Fetch-Dest': 'empty', 'Sec-Fetch-Mode': 'cors', 'Sec-Fetch-Site': 'same-origin', }) try: resp = self.http_client.session.get( url, headers=headers, timeout=30 ) if resp.status_code != 200: error_msg = f"Failed to retrieve session: HTTP {resp.status_code}" if DEBUG: print(f"❌ [5] {error_msg}") raise Exception(error_msg) data = resp.json() access_token = data.get('accessToken') if not access_token: error_msg = "No accessToken in session response" if DEBUG: print(f"❌ [5] {error_msg}") raise Exception(error_msg) if DEBUG: print(f"✅ [5] Access token retrieved") print(f" Token: {access_token[:50]}...") return access_token except requests.RequestException as e: error_msg = f"Network error retrieving access token: {e}" if DEBUG: print(f"❌ [5] {error_msg}") raise Exception(error_msg) except Exception as e: if DEBUG: print(f"❌ [5] Unexpected error: {e}") raise def _step4_verify_email(self, mailbox: str, continue_url: str) -> Dict: """Step 4: 验证邮箱(通过 OTP 验证码) Args: mailbox: 邮箱地址 continue_url: Step 3 返回的 continue_url(例如: /email-otp/send) Returns: 验证结果 """ # 4.0 触发发送验证邮件(访问 continue_url) if continue_url: # 构造完整 URL if not continue_url.startswith('http'): send_url = f"https://auth.openai.com{continue_url}" else: send_url = continue_url # 准备 headers headers = self.http_client.fingerprint.get_headers(host='auth.openai.com') headers.update({ 'Accept': 'application/json', 'Referer': 'https://auth.openai.com/create-account/password', 'Sec-Fetch-Dest': 'empty', 'Sec-Fetch-Mode': 'cors', 'Sec-Fetch-Site': 'same-origin', }) # 发送请求(根据 Step 3 返回的 method) resp = self.http_client.session.get( send_url, headers=headers, timeout=30 ) if resp.status_code != 200: if DEBUG: print(f"❌ [4.0] Failed to trigger email send ({resp.status_code})") raise Exception(f"Failed to trigger email send: {resp.status_code} {resp.text}") elif DEBUG: print(f"✅ [4.0] Email send triggered") # 4.1 检查是否有 tempmail_client if not self.tempmail_client: raise Exception("No TempMailClient configured. Cannot receive verification emails.") # 4.2 等待验证邮件 if DEBUG: print(f"⏳ [4.1] Waiting for verification email...") email_data = self.tempmail_client.wait_for_email( mailbox=mailbox, from_filter=None, # 不过滤发件人(因为临时邮箱可能不返回 from 字段) subject_filter="chatgpt", # 主题包含 "chatgpt"(匹配 "Your ChatGPT code is...") timeout=120, interval=5 ) if not email_data: if DEBUG: print(f"❌ [4.1] Timeout: No verification email received") raise Exception("Timeout: No verification email received") # 4.3 提取验证码 verification_code = self.tempmail_client.extract_verification_code(email_data) if not verification_code: # 尝试提取验证链接(备选方案) verification_link = self.tempmail_client.extract_verification_link(email_data) if verification_link: if DEBUG: print(f"❌ [4.3] Link-based verification not implemented") raise NotImplementedError("Link-based verification not implemented yet") else: if DEBUG: print(f"❌ [4.3] Failed to extract verification code") raise Exception("Failed to extract verification code or link from email") if DEBUG: print(f"✅ [4.3] Got verification code: {verification_code}") # 4.4 提交验证码 # 使用正确的验证接口 verify_url = "https://auth.openai.com/api/accounts/email-otp/validate" # 准备 payload payload = { "code": verification_code, } # 准备 headers headers = self.http_client.fingerprint.get_headers(host='auth.openai.com') headers.update({ 'Content-Type': 'application/json', 'Accept': 'application/json', 'Origin': 'https://auth.openai.com', 'Referer': 'https://auth.openai.com/email-verification', 'Sec-Fetch-Dest': 'empty', 'Sec-Fetch-Mode': 'cors', 'Sec-Fetch-Site': 'same-origin', 'Priority': 'u=1, i', }) # 添加 Datadog tracing headers import secrets headers.update({ 'X-Datadog-Trace-Id': str(secrets.randbits(63)), 'X-Datadog-Parent-Id': str(secrets.randbits(63)), 'X-Datadog-Sampling-Priority': '1', 'X-Datadog-Origin': 'rum', 'Traceparent': f'00-0000000000000000{secrets.token_hex(8)}-{secrets.token_hex(8)}-01', 'Tracestate': 'dd=s:1;o:rum', }) # 发送验证请求 resp = self.http_client.session.post( verify_url, json=payload, headers=headers, timeout=30 ) # 处理响应 try: data = resp.json() if resp.status_code == 200: if DEBUG: print(f"✅ [4.4] Email verified successfully!") print(f"📋 [4.4] Response data: {json.dumps(data, indent=2)}") return { 'success': True, 'verified': True, 'data': data } else: if DEBUG: print(f"❌ [4.4] Verification failed: {data}") return { 'success': False, 'error': data } except Exception as e: if DEBUG: print(f"❌ [4.4] Exception: {e}") raise def _step4_5_submit_personal_info(self) -> Dict: """Step 4.5: 提交个人信息(姓名、生日)完成账号设置 Returns: 包含 OAuth callback URL 的字典 """ url = "https://auth.openai.com/api/accounts/create_account" # 生成随机姓名和生日 from modules.data_generator import NameGenerator random_name = NameGenerator.generate_full_name() # 生成随机生日(1980-2000年之间) year = random.randint(1980, 2000) month = random.randint(1, 12) day = random.randint(1, 28) # 保险起见,避免2月29日等边界情况 birthdate = f"{year}-{month:02d}-{day:02d}" payload = { "name": random_name, "birthdate": birthdate } # 准备 headers headers = self.http_client.fingerprint.get_headers(host='auth.openai.com') headers.update({ 'Content-Type': 'application/json', 'Accept': 'application/json', 'Origin': 'https://auth.openai.com', 'Referer': 'https://auth.openai.com/about-you', 'Sec-Fetch-Dest': 'empty', 'Sec-Fetch-Mode': 'cors', 'Sec-Fetch-Site': 'same-origin', 'Priority': 'u=1, i', }) # 添加 Datadog tracing headers import secrets headers.update({ 'X-Datadog-Trace-Id': str(secrets.randbits(63)), 'X-Datadog-Parent-Id': str(secrets.randbits(63)), 'X-Datadog-Sampling-Priority': '1', 'X-Datadog-Origin': 'rum', 'Traceparent': f'00-0000000000000000{secrets.token_hex(8)}-{secrets.token_hex(8)}-01', 'Tracestate': 'dd=s:1;o:rum', }) # 发送请求 resp = self.http_client.session.post( url, json=payload, headers=headers, timeout=30 ) # 处理响应 try: data = resp.json() if resp.status_code == 200: oauth_url = data.get('continue_url') if DEBUG: print(f"✅ [4.5] Personal info submitted") print(f" Name: {random_name}") print(f" Birthdate: {birthdate}") if oauth_url: return { 'success': True, 'oauth_url': oauth_url, 'method': data.get('method', 'GET') } else: raise Exception(f"No continue_url in response: {data}") else: if DEBUG: print(f"❌ [4.5] Failed to submit personal info: {data}") raise Exception(f"Failed to submit personal info: {resp.status_code} {data}") except Exception as e: if DEBUG: print(f"❌ [4.5] Exception: {e}") raise def _step4_6_complete_oauth_flow(self, oauth_url: str): """Step 4.6: 完成 OAuth 回调流程,获取 session-token 这一步会自动跟随重定向链: 1. GET oauth_url → 302 to /api/accounts/consent 2. GET /api/accounts/consent → 302 to chatgpt.com/api/auth/callback/openai 3. GET chatgpt.com/api/auth/callback/openai → 最终生成 session-token Args: oauth_url: Step 4.5 返回的 OAuth URL """ # 确保是完整 URL if not oauth_url.startswith('http'): oauth_url = f"https://auth.openai.com{oauth_url}" # 准备 headers headers = self.http_client.fingerprint.get_headers(host='auth.openai.com') headers.update({ 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'Referer': 'https://auth.openai.com/about-you', 'Sec-Fetch-Dest': 'document', 'Sec-Fetch-Mode': 'navigate', 'Sec-Fetch-Site': 'same-origin', 'Upgrade-Insecure-Requests': '1', }) # 发送请求,允许自动跟随重定向 # requests.Session 会自动处理跨域重定向(auth.openai.com → chatgpt.com) resp = self.http_client.session.get( oauth_url, headers=headers, allow_redirects=True, # 自动跟随所有重定向 timeout=30 ) # 检查是否获取到 session-token session_token = None # curl_cffi 的 cookies 是字典格式,requests 的是 CookieJar cookies = self.http_client.session.cookies # 尝试字典访问(curl_cffi) if hasattr(cookies, 'get'): session_token = cookies.get('__Secure-next-auth.session-token') if session_token: if DEBUG: print(f"✅ [4.6] OAuth flow completed") print(f" Got session-token: {session_token[:50]}...") return True else: if DEBUG: print(f"❌ [4.6] OAuth flow completed but no session-token found") print(f" Final URL: {resp.url}") print(f" Status: {resp.status_code}") raise Exception("OAuth flow completed but no session-token cookie was set") def _step5_solve_challenge(self, challenge: Dict) -> str: """解决 enforcement 挑战""" sentinel_token = self.solver.solve_enforcement(challenge) return sentinel_token def _step6_register_with_token(self, email: str, password: str, sentinel_token: str) -> Dict: """带 Sentinel token 重新注册""" url = f"{AUTH_BASE_URL}/api/accounts/user/register" payload = { 'username': email, 'password': password, 'client_id': 'sentinel' } resp = self.http_client.post( url, json_data=payload, sentinel_token=sentinel_token ) if resp.status_code == 200: if DEBUG: print("✅ [6] Registration successful!") return {'success': True, 'data': resp.json()} else: if DEBUG: print(f"❌ [6] Registration failed: {resp.status_code}") return { 'success': False, 'status_code': resp.status_code, 'error': resp.text } def register(self, email: str, password: str) -> Dict: """完整注册流程""" try: # Step 1: 通过 ChatGPT web 初始化(获取正确的 session) self._step1_init_through_chatgpt(email) # Step 2: 初始化 Sentinel self._step2_init_sentinel() self._step2_5_submit_sentinel() self._step2_6_solve_pow() self._step2_7_submit_pow() # Step 3: 尝试注册(提交邮箱和密码) result = self._step3_attempt_register(email, password) # 检查是否需要邮箱验证 if result and result.get('success') and result.get('requires_verification'): # Step 4: 验证邮箱 verify_result = self._step4_verify_email( mailbox=email, continue_url=result['continue_url'] ) if verify_result.get('success'): # Step 4.5: 提交个人信息 personal_info_result = self._step4_5_submit_personal_info() if personal_info_result.get('success'): oauth_url = personal_info_result['oauth_url'] # Step 4.6: 完成 OAuth 回调流程,获取 session-token self._step4_6_complete_oauth_flow(oauth_url) if DEBUG: print(f"\n✅ Registration completed successfully!") return { 'success': True, 'verified': True, 'email': email, 'data': verify_result.get('data') } else: raise Exception(f"Failed to submit personal info: {personal_info_result}") else: raise Exception(f"Email verification failed: {verify_result.get('error')}") # 如果直接成功(无需验证) elif result and result.get('success'): if DEBUG: print(f"\n✅ Registration completed (no verification needed)!") return result # 如果注册失败(邮箱已被使用等) elif result is None: if DEBUG: print(f"\n❌ Registration failed: Email already taken or invalid") return { 'success': False, 'error': 'Email already taken or registration rejected' } # 其他情况:触发 enforcement challenge(旧逻辑) else: if DEBUG: print(f"\n⚠️ Enforcement challenge triggered") # Step 5: 解决挑战 sentinel_token = self._step5_solve_challenge(result) # Step 6: 带 token 重新注册 final_result = self._step6_register_with_token(email, password, sentinel_token) return final_result except Exception as e: if DEBUG: import traceback print(f"\n❌ Registration failed with exception:") traceback.print_exc() return { 'success': False, 'error': str(e) } def register_with_auto_email(self, password: str) -> Dict: """自动生成临时邮箱并完成注册流程 Args: password: 要设置的密码 Returns: 注册结果(包含邮箱地址) """ # 检查是否配置了 TempMailClient if not self.tempmail_client: return { 'success': False, 'error': 'TempMailClient not configured. Please initialize with tempmail_client parameter.' } generated_email = None try: # Step 0: 生成临时邮箱(domain_index 从配置读取) domain_index = TEMPMAIL_CONFIG.get('domain_index', 0) # 默认使用第1个域名 generated_email = self.tempmail_client.generate_mailbox(domain_index=domain_index) if DEBUG: print(f"\n✅ Generated temp email: {generated_email}") # 调用正常的注册流程 result = self.register(email=generated_email, password=password) # 检查注册结果 if result.get('success'): if DEBUG: print(f"\n✅ AUTO REGISTRATION SUCCESS") print(f" Email: {generated_email}") print(f" Password: {password}") return { 'success': True, 'email': generated_email, 'password': password, 'verified': result.get('verified', False), 'data': result.get('data') } else: # 注册失败 → 删除邮箱 if DEBUG: print(f"\n❌ AUTO REGISTRATION FAILED") print(f" Email: {generated_email}") print(f" Error: {result.get('error')}") # 删除失败的邮箱 self.tempmail_client.delete_mailbox(generated_email) return { 'success': False, 'email': generated_email, 'error': result.get('error'), 'mailbox_deleted': True } except Exception as e: # 发生异常 → 清理邮箱 if DEBUG: import traceback print(f"\n❌ AUTO REGISTRATION EXCEPTION") if generated_email: print(f" Email: {generated_email}") print(f" Exception: {e}") traceback.print_exc() # 清理邮箱(如果已生成) if generated_email: try: self.tempmail_client.delete_mailbox(generated_email) except: pass return { 'success': False, 'email': generated_email, 'error': str(e), 'mailbox_deleted': True if generated_email else False } def add_payment_method( self, checkout_session_url: str, iban: str, name: str, email: str, address_line1: str, city: str, postal_code: str, state: str, country: str = "US" ) -> Dict: """ 为账户添加Stripe支付方式(SEPA) Args: checkout_session_url: Stripe checkout session URL iban: 德国IBAN账号 name: 持卡人姓名 email: 邮箱 address_line1: 街道地址 city: 城市 postal_code: 邮编 state: 州/省 country: 国家代码 Returns: 支付结果字典 """ try: if DEBUG: print(f"\n🔐 [Payment] Starting payment method setup...") print(f" Session URL: {checkout_session_url[:60]}...") # 初始化Stripe支付处理器 payment_handler = StripePaymentHandler( checkout_session_url=checkout_session_url, http_client=self.http_client ) # 执行完整支付流程 success = payment_handler.complete_payment( iban=iban, name=name, email=email, address_line1=address_line1, city=city, postal_code=postal_code, state=state, country=country ) if success: if DEBUG: print(f"\n✅ [Payment] Payment method added successfully!") return { 'success': True, 'message': 'Payment method added' } else: if DEBUG: print(f"\n❌ [Payment] Failed to add payment method") return { 'success': False, 'error': 'Payment setup failed' } except Exception as e: if DEBUG: import traceback print(f"\n❌ [Payment] Exception occurred:") traceback.print_exc() return { 'success': False, 'error': str(e) }