"""OpenAI 账号注册流程编排模块""" from typing import Dict, Any, Optional import secrets import random import json import uuid from core.session import OAISession, CloudflareBlockError, SessionInvalidError, RateLimitError from core.sentinel_handler import SentinelHandler from core.challenge import CloudflareSolver from utils.mail_box import MailHandler from utils.crypto import generate_random_password from utils.logger import logger class RegisterFlow: """OpenAI 账号注册流程编排器""" CHATGPT_HOME = "https://chatgpt.com/" CHATGPT_PROVIDERS = "https://chatgpt.com/api/auth/providers" CHATGPT_CSRF = "https://chatgpt.com/api/auth/csrf" CHATGPT_SIGNIN = "https://chatgpt.com/api/auth/signin/openai" AUTH_CREATE_ACCOUNT = "https://auth.openai.com/create-account/password" AUTH_REGISTER = "https://auth.openai.com/api/accounts/user/register" AUTH_SEND_OTP = "https://auth.openai.com/api/accounts/email-otp/send" AUTH_VALIDATE_OTP = "https://auth.openai.com/api/accounts/email-otp/validate" AUTH_COMPLETE_PROFILE = "https://auth.openai.com/api/accounts/create_account" # 获取 Token 相关 CHATGPT_SESSION = "https://chatgpt.com/api/auth/session" def __init__(self, session: OAISession, config, email: Optional[str] = None, password: Optional[str] = None): self.s = session self.config = config self.email = email or self._generate_email() self.password = password or generate_random_password() self.sentinel = SentinelHandler(session) self.mail = MailHandler.create(config.mail.to_dict() if config.mail.enabled else None) self.cloudflare_solver = CloudflareSolver() self.csrf_token: Optional[str] = None self.sentinel_token: Optional[Dict[str, Any]] = None self.otp: Optional[str] = None self.access_token: Optional[str] = None logger.info(f"RegisterFlow initialized for {self.email} (oai-did: {self.s.oai_did})") async def run(self) -> Dict[str, Any]: """执行完整注册流程""" try: logger.info(f"[{self.email}] Starting registration flow") # Step 0: CloudMail 邮箱账户 if self.config.mail.enabled and self.config.mail.type == "cloudmail": try: from utils.mail_box import CloudMailHandler if isinstance(self.mail, CloudMailHandler): logger.info(f"[{self.email}] Step 0: Ensuring email account exists") await self.mail.ensure_email_exists(self.email) logger.info(f"[{self.email}] ✓ Email account ready") except Exception as e: logger.warning(f"[{self.email}] Failed to create CloudMail account: {e}") await self._step1_init_session() await self._step2_get_csrf_token() await self._step3_oauth_signin() await self._step4_get_sentinel_token() await self._step5_submit_registration() await self._step6_send_email_otp() await self._step7_submit_otp() await self._step8_complete_profile() await self._step9_get_access_token() logger.success(f"[{self.email}] Registration completed successfully! ✅") return { "email": self.email, "password": self.password, "oai_did": self.s.oai_did, "access_token": self.access_token, "status": "success", "message": "Account registered successfully" } except CloudflareBlockError as e: logger.error(f"[{self.email}] Cloudflare blocked: {e}") return {"email": self.email, "password": self.password, "status": "cloudflare_blocked", "error": str(e)} except SessionInvalidError as e: logger.error(f"[{self.email}] Session invalid: {e}") return {"email": self.email, "password": self.password, "status": "session_invalid", "error": str(e)} except RateLimitError as e: logger.error(f"[{self.email}] Rate limited: {e}") return {"email": self.email, "password": self.password, "status": "rate_limited", "error": str(e)} except NotImplementedError as e: logger.warning(f"[{self.email}] Feature not implemented: {e}") return {"email": self.email, "password": self.password, "status": "pending_manual", "error": str(e)} except Exception as e: logger.exception(f"[{self.email}] Unexpected error") return {"email": self.email, "password": self.password, "status": "failed", "error": str(e)} async def _step1_init_session(self): """Step 1: 初始化会话""" logger.info(f"[{self.email}] Step 1: Initializing session") self.s.get(self.CHATGPT_HOME) self.s.get(self.CHATGPT_PROVIDERS) logger.info(f"[{self.email}] ✓ Session initialized") async def _step2_get_csrf_token(self): """Step 2: 获取 CSRF Token""" logger.info(f"[{self.email}] Step 2: Getting CSRF token") resp = self.s.get(self.CHATGPT_CSRF) if resp.status_code != 200: raise RuntimeError(f"Failed to get CSRF token: {resp.status_code}") self.csrf_token = resp.json().get("csrfToken") if not self.csrf_token: raise RuntimeError("CSRF token not found") logger.info(f"[{self.email}] ✓ CSRF token obtained") async def _step3_oauth_signin(self): """Step 3: OAuth 登录流程""" logger.info(f"[{self.email}] Step 3: Starting OAuth flow") signin_params = { 'prompt': 'login', 'ext-oai-did': self.s.oai_did, 'auth_session_logging_id': str(uuid.uuid4()), 'screen_hint': 'signup', 'login_hint': self.email, } payload = {"callbackUrl": "/", "csrfToken": self.csrf_token, "json": "true"} resp = self.s.post( self.CHATGPT_SIGNIN, params=signin_params, data=payload, headers={"Content-Type": "application/x-www-form-urlencoded"} ) if resp.status_code != 200: raise RuntimeError(f"OAuth signin failed: {resp.status_code}") auth_url = resp.json().get("url") if not auth_url: raise RuntimeError("OAuth URL not found") self.s.get(auth_url, allow_redirects=True) self.s.get(self.AUTH_CREATE_ACCOUNT) logger.info(f"[{self.email}] ✓ OAuth flow completed") async def _step4_get_sentinel_token(self): """Step 4: Sentinel 握手""" logger.info(f"[{self.email}] Step 4: Getting Sentinel token") try: self.sentinel_token = await self.sentinel.get_token(flow="username_password_create") logger.info(f"[{self.email}] ✓ Sentinel token obtained") except (NotImplementedError, ImportError) as e: logger.error(f"[{self.email}] Sentinel solver not available: {e}") raise async def _step5_submit_registration(self): """Step 5: 提交注册信息""" logger.info(f"[{self.email}] Step 5: Submitting registration") payload = {"username": self.email, "password": self.password} headers = { "Content-Type": "application/json", "Accept": "application/json", "Origin": "https://auth.openai.com", "Referer": self.AUTH_CREATE_ACCOUNT, "Openai-Sentinel-Token": json.dumps(self.sentinel_token), "X-Datadog-Trace-Id": str(secrets.randbits(63)), "X-Datadog-Parent-Id": str(secrets.randbits(63)), "X-Datadog-Sampling-Priority": "1", "X-Datadog-Origin": "rum", "Traceparent": f"00-0000000000000000{secrets.token_hex(8)}-{secrets.token_hex(8)}-01", "Tracestate": "dd=s:1;o:rum", } resp = self.s.post(self.AUTH_REGISTER, json=payload, headers=headers) if resp.status_code != 200: if "email already exists" in resp.text.lower(): raise ValueError(f"Email already registered: {self.email}") raise RuntimeError(f"Registration failed: {resp.status_code}") logger.info(f"[{self.email}] ✓ Registration submitted") async def _step6_send_email_otp(self): """Step 6: 触发邮件验证""" logger.info(f"[{self.email}] Step 6: Sending email OTP") resp = self.s.post( self.AUTH_SEND_OTP, json={}, headers={"Content-Type": "application/json", "Referer": self.AUTH_CREATE_ACCOUNT} ) if resp.status_code == 403 and CloudflareSolver.detect_challenge(resp): raise CloudflareBlockError("Cloudflare challenge triggered") if resp.status_code != 200: raise RuntimeError(f"OTP send failed: {resp.status_code}") logger.info(f"[{self.email}] ✓ OTP email sent") async def _step7_submit_otp(self): """Step 7: 提交 OTP 验证码""" logger.info(f"[{self.email}] Step 7: Waiting for OTP") try: self.otp = await self.mail.wait_for_otp(email=self.email, timeout=300) logger.info(f"[{self.email}] ✓ OTP received: {self.otp}") except NotImplementedError: logger.warning(f"[{self.email}] Mail handler not configured") raise except TimeoutError: raise TimeoutError(f"Timeout waiting for OTP: {self.email}") resp = self.s.post( self.AUTH_VALIDATE_OTP, json={"code": self.otp}, headers={"Content-Type": "application/json", "Referer": self.AUTH_CREATE_ACCOUNT} ) if resp.status_code != 200: raise RuntimeError(f"OTP validation failed: {resp.status_code}") logger.info(f"[{self.email}] ✓ OTP validated") async def _step8_complete_profile(self): """Step 8: 完成用户信息""" logger.info(f"[{self.email}] Step 8: Completing profile") name = self._generate_name() birthdate = self._generate_birthdate() resp = self.s.post( self.AUTH_COMPLETE_PROFILE, json={"name": name, "birthdate": birthdate}, headers={"Content-Type": "application/json", "Referer": self.AUTH_CREATE_ACCOUNT}, allow_redirects=False ) if resp.status_code not in [200, 302, 303]: raise RuntimeError(f"Profile completion failed: {resp.status_code}") # 检查是否有 continue_url 需要跟随 try: data = resp.json() continue_url = data.get("continue_url") if continue_url: logger.info(f"[{self.email}] Following OAuth callback...") if not continue_url.startswith("http"): continue_url = f"https://auth.openai.com{continue_url}" # 跟随 OAuth 回调,最终会重定向到 chatgpt.com callback_headers = { "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", "Referer": "https://auth.openai.com/", "Sec-Fetch-Dest": "document", "Sec-Fetch-Mode": "navigate", "Sec-Fetch-Site": "same-origin", "Upgrade-Insecure-Requests": "1", } self.s.get(continue_url, headers=callback_headers, allow_redirects=True) logger.info(f"[{self.email}] ✓ OAuth callback completed") except Exception as e: logger.debug(f"[{self.email}] No continue_url or parse error: {e}") logger.info(f"[{self.email}] ✓ Profile completed") async def _step9_get_access_token(self): """Step 9: 通过登录流程获取 Access Token""" logger.info(f"[{self.email}] Step 9: Getting access token via login") from core.login_flow import LoginFlow # 使用当前 session 执行登录流程 login_flow = LoginFlow(self.s, self.email, self.password) result = await login_flow.run() if result.get("status") == "success": self.access_token = result.get("access_token") logger.info(f"[{self.email}] ✓ Access token obtained: {self.access_token[:50]}...") else: logger.warning(f"[{self.email}] Failed to get access token: {result.get('error')}") def _generate_email(self) -> str: """生成随机邮箱""" random_part = secrets.token_hex(8) if self.config.mail.enabled and self.config.mail.type == "cloudmail": domain = self.config.mail.cloudmail_domain if domain: return f"user_{random_part}@{domain}" logger.warning("Using example.com domain. Configure MAIL_CLOUDMAIL_DOMAIN.") return f"user_{random_part}@example.com" def _generate_name(self) -> str: """生成随机姓名""" first_names = ["James", "John", "Robert", "Michael", "William", "David", "Mary", "Patricia", "Jennifer", "Linda"] last_names = ["Smith", "Johnson", "Williams", "Brown", "Jones", "Garcia", "Miller", "Davis", "Wilson", "Anderson"] return f"{random.choice(first_names)} {random.choice(last_names)}" def _generate_birthdate(self) -> str: """生成随机生日 (1980-2002)""" year = random.randint(1980, 2002) month = random.randint(1, 12) day = random.randint(1, 28) if month == 2 else random.randint(1, 30) if month in [4, 6, 9, 11] else random.randint(1, 31) return f"{year}-{month:02d}-{day:02d}" __all__ = ["RegisterFlow"]