"""OpenAI 账号注册流程编排模块""" from typing import Dict, Any, Optional import secrets import random import json import uuid from core.session import OAISession, CloudflareBlockError, SessionInvalidError from core.sentinel_handler import SentinelHandler from core.challenge import CloudflareSolver from utils.mail_box import MailHandler from utils.crypto import generate_random_password from utils.logger import logger class RegisterFlow: """OpenAI 账号注册流程编排器""" CHATGPT_HOME = "https://chatgpt.com/" CHATGPT_PROVIDERS = "https://chatgpt.com/api/auth/providers" CHATGPT_CSRF = "https://chatgpt.com/api/auth/csrf" CHATGPT_SIGNIN = "https://chatgpt.com/api/auth/signin/openai" AUTH_CREATE_ACCOUNT = "https://auth.openai.com/create-account/password" AUTH_REGISTER = "https://auth.openai.com/api/accounts/user/register" AUTH_SEND_OTP = "https://auth.openai.com/api/accounts/email-otp/send" AUTH_VALIDATE_OTP = "https://auth.openai.com/api/accounts/email-otp/validate" AUTH_COMPLETE_PROFILE = "https://auth.openai.com/api/accounts/create_account" AUTH_CONSENT = "https://auth.openai.com/api/accounts/consent" CHATGPT_SESSION = "https://chatgpt.com/api/auth/session" def __init__(self, session: OAISession, config, email: Optional[str] = None, password: Optional[str] = None): self.s = session self.config = config self.email = email or self._generate_email() self.password = password or generate_random_password() self.sentinel = SentinelHandler(session) self.mail = MailHandler.create(config.mail.to_dict() if config.mail.enabled else None) self.cloudflare_solver = CloudflareSolver() self.csrf_token: Optional[str] = None self.sentinel_token: Optional[Dict[str, Any]] = None self.otp: Optional[str] = None self.oauth_callback_url: Optional[str] = None logger.info(f"RegisterFlow initialized for {self.email} (oai-did: {self.s.oai_did})") async def run(self) -> Dict[str, Any]: """执行完整注册流程""" try: logger.info(f"[{self.email}] Starting registration flow") # Step 0: CloudMail 邮箱账户 if self.config.mail.enabled and self.config.mail.type == "cloudmail": try: from utils.mail_box import CloudMailHandler if isinstance(self.mail, CloudMailHandler): logger.info(f"[{self.email}] Step 0: Ensuring email account exists") await self.mail.ensure_email_exists(self.email) logger.info(f"[{self.email}] ✓ Email account ready") except Exception as e: logger.warning(f"[{self.email}] Failed to create CloudMail account: {e}") await self._step1_init_session() await self._step2_get_csrf_token() await self._step3_oauth_signin() await self._step4_get_sentinel_token() await self._step5_submit_registration() await self._step6_send_email_otp() await self._step7_submit_otp() await self._step8_complete_profile() logger.success(f"[{self.email}] Registration completed successfully! ✅") return { "email": self.email, "password": self.password, "oai_did": self.s.oai_did, "status": "success", "message": "Account registered successfully" } except CloudflareBlockError as e: logger.error(f"[{self.email}] Cloudflare blocked: {e}") return {"email": self.email, "password": self.password, "status": "cloudflare_blocked", "error": str(e)} except SessionInvalidError as e: logger.error(f"[{self.email}] Session invalid: {e}") return {"email": self.email, "password": self.password, "status": "session_invalid", "error": str(e)} except NotImplementedError as e: logger.warning(f"[{self.email}] Feature not implemented: {e}") return {"email": self.email, "password": self.password, "status": "pending_manual", "error": str(e)} except Exception as e: logger.exception(f"[{self.email}] Unexpected error") return {"email": self.email, "password": self.password, "status": "failed", "error": str(e)} async def _step1_init_session(self): """Step 1: 初始化会话""" logger.info(f"[{self.email}] Step 1: Initializing session") self.s.get(self.CHATGPT_HOME) self.s.get(self.CHATGPT_PROVIDERS) logger.info(f"[{self.email}] ✓ Session initialized") async def _step2_get_csrf_token(self): """Step 2: 获取 CSRF Token""" logger.info(f"[{self.email}] Step 2: Getting CSRF token") resp = self.s.get(self.CHATGPT_CSRF) if resp.status_code != 200: raise RuntimeError(f"Failed to get CSRF token: {resp.status_code}") self.csrf_token = resp.json().get("csrfToken") if not self.csrf_token: raise RuntimeError("CSRF token not found") logger.info(f"[{self.email}] ✓ CSRF token obtained") async def _step3_oauth_signin(self): """Step 3: OAuth 登录流程""" logger.info(f"[{self.email}] Step 3: Starting OAuth flow") signin_params = { 'prompt': 'login', 'ext-oai-did': self.s.oai_did, 'auth_session_logging_id': str(uuid.uuid4()), 'screen_hint': 'signup', 'login_hint': self.email, } payload = {"callbackUrl": "/", "csrfToken": self.csrf_token, "json": "true"} resp = self.s.post( self.CHATGPT_SIGNIN, params=signin_params, data=payload, headers={"Content-Type": "application/x-www-form-urlencoded"} ) if resp.status_code != 200: raise RuntimeError(f"OAuth signin failed: {resp.status_code}") auth_url = resp.json().get("url") if not auth_url: raise RuntimeError("OAuth URL not found") self.s.get(auth_url, allow_redirects=True) self.s.get(self.AUTH_CREATE_ACCOUNT) logger.info(f"[{self.email}] ✓ OAuth flow completed") async def _step4_get_sentinel_token(self): """Step 4: Sentinel 握手""" logger.info(f"[{self.email}] Step 4: Getting Sentinel token") try: self.sentinel_token = await self.sentinel.get_token(flow="username_password_create") logger.info(f"[{self.email}] ✓ Sentinel token obtained") except (NotImplementedError, ImportError) as e: logger.error(f"[{self.email}] Sentinel solver not available: {e}") raise async def _step5_submit_registration(self): """Step 5: 提交注册信息""" logger.info(f"[{self.email}] Step 5: Submitting registration") payload = {"username": self.email, "password": self.password} headers = { "Content-Type": "application/json", "Accept": "application/json", "Origin": "https://auth.openai.com", "Referer": self.AUTH_CREATE_ACCOUNT, "Openai-Sentinel-Token": json.dumps(self.sentinel_token), "X-Datadog-Trace-Id": str(secrets.randbits(63)), "X-Datadog-Parent-Id": str(secrets.randbits(63)), "X-Datadog-Sampling-Priority": "1", "X-Datadog-Origin": "rum", "Traceparent": f"00-0000000000000000{secrets.token_hex(8)}-{secrets.token_hex(8)}-01", "Tracestate": "dd=s:1;o:rum", } resp = self.s.post(self.AUTH_REGISTER, json=payload, headers=headers) if resp.status_code != 200: if "email already exists" in resp.text.lower(): raise ValueError(f"Email already registered: {self.email}") raise RuntimeError(f"Registration failed: {resp.status_code}") logger.info(f"[{self.email}] ✓ Registration submitted") async def _step6_send_email_otp(self): """Step 6: 触发邮件验证""" logger.info(f"[{self.email}] Step 6: Sending email OTP") resp = self.s.post( self.AUTH_SEND_OTP, json={}, headers={"Content-Type": "application/json", "Referer": self.AUTH_CREATE_ACCOUNT} ) if resp.status_code == 403 and CloudflareSolver.detect_challenge(resp): raise CloudflareBlockError("Cloudflare challenge triggered") if resp.status_code != 200: raise RuntimeError(f"OTP send failed: {resp.status_code}") logger.info(f"[{self.email}] ✓ OTP email sent") async def _step7_submit_otp(self): """Step 7: 提交 OTP 验证码""" logger.info(f"[{self.email}] Step 7: Waiting for OTP") try: self.otp = await self.mail.wait_for_otp(email=self.email, timeout=300) logger.info(f"[{self.email}] ✓ OTP received: {self.otp}") except NotImplementedError: logger.warning(f"[{self.email}] Mail handler not configured") raise except TimeoutError: raise TimeoutError(f"Timeout waiting for OTP: {self.email}") resp = self.s.post( self.AUTH_VALIDATE_OTP, json={"code": self.otp}, headers={"Content-Type": "application/json", "Referer": self.AUTH_CREATE_ACCOUNT} ) if resp.status_code != 200: raise RuntimeError(f"OTP validation failed: {resp.status_code}") logger.info(f"[{self.email}] ✓ OTP validated") async def _step8_complete_profile(self): """Step 8: 完成用户信息""" logger.info(f"[{self.email}] Step 8: Completing profile") name = self._generate_name() birthdate = self._generate_birthdate() resp = self.s.post( self.AUTH_COMPLETE_PROFILE, json={"name": name, "birthdate": birthdate}, headers={"Content-Type": "application/json", "Referer": self.AUTH_CREATE_ACCOUNT} ) if resp.status_code != 200: raise RuntimeError(f"Profile completion failed: {resp.status_code}") # 获取 OAuth 回调 URL data = resp.json() self.oauth_callback_url = data.get("continue_url") logger.debug(f"[{self.email}] OAuth callback URL: {self.oauth_callback_url}") logger.info(f"[{self.email}] ✓ Profile completed") async def _step9_complete_oauth_callback(self): """Step 9: 完成 OAuth 回调,获取 session-token""" logger.info(f"[{self.email}] Step 9: Completing OAuth callback") # 使用 step 8 返回的 continue_url,跟随重定向链到 chatgpt.com if self.oauth_callback_url: # continue_url 可能是相对路径或完整 URL if self.oauth_callback_url.startswith("http"): callback_url = self.oauth_callback_url else: callback_url = f"https://auth.openai.com{self.oauth_callback_url}" else: # 如果没有 continue_url,尝试访问 consent 端点 callback_url = self.AUTH_CONSENT logger.debug(f"[{self.email}] Following OAuth callback: {callback_url}") # 循环跟随重定向,直到到达 chatgpt.com 或获取到 session-token max_redirects = 10 for i in range(max_redirects): resp = self.s.get(callback_url, allow_redirects=True) # 检查是否已到达 chatgpt.com if 'chatgpt.com' in str(resp.url): logger.debug(f"[{self.email}] Reached chatgpt.com") break # 检查响应是否包含重定向 URL(JSON 格式) try: data = resp.json() redirect_url = data.get("redirect_url") or data.get("location") or data.get("url") if redirect_url: logger.debug(f"[{self.email}] Following JSON redirect ({i+1}): {redirect_url[:100]}...") callback_url = redirect_url continue except (json.JSONDecodeError, ValueError): pass # 没有更多重定向,退出循环 break # 检查是否获取到 session-token cookie session_token = self.s.get_cookie('__Secure-next-auth.session-token') if not session_token: # 尝试直接访问 chatgpt.com 首页触发 cookie 设置 logger.debug(f"[{self.email}] Session token not found, trying chatgpt.com homepage") self.s.get(self.CHATGPT_HOME, allow_redirects=True) session_token = self.s.get_cookie('__Secure-next-auth.session-token') if not session_token: # 打印所有 cookies 用于调试,但不抛出错误 # 尝试继续获取 access token,有时候 session token 不是必需的 all_cookies = self.s.get_cookies() logger.warning(f"[{self.email}] Session token not found, available cookies: {list(all_cookies.keys())}") logger.info(f"[{self.email}] Continuing without session-token, will try to get access token directly") else: logger.info(f"[{self.email}] ✓ OAuth callback completed, session-token obtained") async def _step10_get_access_token(self) -> str: """Step 10: 获取 AccessToken""" logger.info(f"[{self.email}] Step 10: Getting access token") resp = self.s.get( self.CHATGPT_SESSION, headers={ 'Accept': 'application/json', 'Referer': 'https://chatgpt.com/', } ) if resp.status_code != 200: raise RuntimeError(f"Failed to get access token: {resp.status_code}") data = resp.json() access_token = data.get('accessToken') if not access_token: raise RuntimeError("AccessToken not found in response") logger.info(f"[{self.email}] ✓ AccessToken obtained successfully") return access_token def _generate_email(self) -> str: """生成随机邮箱""" random_part = secrets.token_hex(8) if self.config.mail.enabled and self.config.mail.type == "cloudmail": domain = self.config.mail.cloudmail_domain if domain: return f"user_{random_part}@{domain}" logger.warning("Using example.com domain. Configure MAIL_CLOUDMAIL_DOMAIN.") return f"user_{random_part}@example.com" def _generate_name(self) -> str: """生成随机姓名""" first_names = ["James", "John", "Robert", "Michael", "William", "David", "Mary", "Patricia", "Jennifer", "Linda"] last_names = ["Smith", "Johnson", "Williams", "Brown", "Jones", "Garcia", "Miller", "Davis", "Wilson", "Anderson"] return f"{random.choice(first_names)} {random.choice(last_names)}" def _generate_birthdate(self) -> str: """生成随机生日 (1980-2002)""" year = random.randint(1980, 2002) month = random.randint(1, 12) day = random.randint(1, 28) if month == 2 else random.randint(1, 30) if month in [4, 6, 9, 11] else random.randint(1, 31) return f"{year}-{month:02d}-{day:02d}" __all__ = ["RegisterFlow"]