""" OpenAI 账号登录流程模块 功能: - 使用邮箱密码登录 ChatGPT - 获取 access_token 和 session_token - 支持 Sentinel 验证 - 完整的 OAuth 流程 """ import json import uuid import secrets from typing import Dict, Any, Optional from urllib.parse import unquote from core.session import OAISession, CloudflareBlockError, SessionInvalidError, RateLimitError from core.sentinel_handler import SentinelHandler from utils.logger import logger class LoginFlow: """OpenAI 账号登录流程编排器""" # ChatGPT 相关接口 CHATGPT_HOME = "https://chatgpt.com/" CHATGPT_CSRF = "https://chatgpt.com/api/auth/csrf" CHATGPT_SIGNIN = "https://chatgpt.com/api/auth/signin/openai" CHATGPT_SESSION = "https://chatgpt.com/api/auth/session" # Auth 相关接口 AUTH_PASSWORD_VERIFY = "https://auth.openai.com/api/accounts/password/verify" def __init__(self, session: OAISession, email: str, password: str): """ 初始化登录流程 参数: session: OAISession 会话实例 email: 登录邮箱 password: 登录密码 """ self.s = session self.email = email self.password = password # Sentinel 处理器 self.sentinel = SentinelHandler(session) # 流程状态 self.csrf_token: Optional[str] = None self.sentinel_token: Optional[Dict[str, Any]] = None self.auth_session_logging_id = str(uuid.uuid4()) # 登录结果 self.access_token: Optional[str] = None self.session_token: Optional[str] = None logger.info(f"LoginFlow initialized for {self.email}") async def run(self) -> Dict[str, Any]: """ 执行完整登录流程 返回: 登录结果字典,包含: - status: 状态 (success/failed/...) - access_token: 访问令牌(成功时) - session_token: 会话令牌(成功时) - error: 错误信息(失败时) """ try: logger.info(f"[{self.email}] Starting login flow") # Step 1: 初始化会话 await self._step1_init_session() # Step 2: 获取 CSRF Token await self._step2_get_csrf_token() # Step 3: 发起 OAuth 登录请求 await self._step3_oauth_signin() # Step 4: 获取 Sentinel Token await self._step4_get_sentinel_token() # Step 5: 提交密码验证 continue_url = await self._step5_submit_password() # Step 6: 完成 OAuth 回调 if continue_url: await self._step6_complete_oauth(continue_url) # Step 7: 获取 Access Token await self._step7_get_access_token() if self.access_token: logger.success(f"[{self.email}] Login successful! ✅") return { "email": self.email, "status": "success", "access_token": self.access_token, "session_token": self.session_token, "message": "Login successful" } else: return { "email": self.email, "status": "failed", "error": "Failed to get access token" } except CloudflareBlockError as e: logger.error(f"[{self.email}] Cloudflare blocked: {e}") return {"email": self.email, "status": "cloudflare_blocked", "error": str(e)} except SessionInvalidError as e: logger.error(f"[{self.email}] Session invalid: {e}") return {"email": self.email, "status": "session_invalid", "error": str(e)} except RateLimitError as e: logger.error(f"[{self.email}] Rate limited: {e}") return {"email": self.email, "status": "rate_limited", "error": str(e)} except Exception as e: logger.exception(f"[{self.email}] Unexpected error during login") return {"email": self.email, "status": "failed", "error": str(e)} async def _step1_init_session(self): """Step 1: 初始化会话(访问首页获取初始 cookies)""" logger.info(f"[{self.email}] Step 1: Initializing session") headers = { "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8", "Sec-Fetch-Dest": "document", "Sec-Fetch-Mode": "navigate", "Sec-Fetch-Site": "none", "Sec-Fetch-User": "?1", "Upgrade-Insecure-Requests": "1", } resp = self.s.get(self.CHATGPT_HOME, headers=headers) logger.info(f"[{self.email}] ✓ Session initialized (status: {resp.status_code})") async def _step2_get_csrf_token(self): """Step 2: 获取 CSRF Token""" logger.info(f"[{self.email}] Step 2: Getting CSRF token") headers = { "Accept": "*/*", "Referer": self.CHATGPT_HOME, "Sec-Fetch-Dest": "empty", "Sec-Fetch-Mode": "cors", "Sec-Fetch-Site": "same-origin", } resp = self.s.get(self.CHATGPT_CSRF, headers=headers) if resp.status_code != 200: raise RuntimeError(f"Failed to get CSRF token: {resp.status_code}") # 方法1:从 Cookie 提取 csrf_cookie = self.s.get_cookie("__Host-next-auth.csrf-token") if csrf_cookie: csrf_cookie = unquote(csrf_cookie) if "|" in csrf_cookie: self.csrf_token = csrf_cookie.split("|")[0] # 方法2:从响应 JSON 提取 if not self.csrf_token: try: data = resp.json() self.csrf_token = data.get("csrfToken") except: pass if not self.csrf_token: raise RuntimeError("CSRF token not found") logger.info(f"[{self.email}] ✓ CSRF token obtained") async def _step3_oauth_signin(self): """Step 3: 发起 OAuth 登录请求""" logger.info(f"[{self.email}] Step 3: Starting OAuth signin") # Query 参数 params = { "prompt": "login", "ext-oai-did": self.s.oai_did, "auth_session_logging_id": self.auth_session_logging_id, "screen_hint": "login_or_signup", "login_hint": self.email, } # POST body data = { "callbackUrl": self.CHATGPT_HOME, "csrfToken": self.csrf_token, "json": "true", } headers = { "Content-Type": "application/x-www-form-urlencoded", "Accept": "*/*", "Origin": "https://chatgpt.com", "Referer": self.CHATGPT_HOME, "Sec-Fetch-Mode": "cors", "Sec-Fetch-Dest": "empty", "Sec-Fetch-Site": "same-origin", } resp = self.s.post( self.CHATGPT_SIGNIN, params=params, data=data, headers=headers, allow_redirects=False ) # 提取 OAuth URL oauth_url = None if resp.status_code == 200: try: result = resp.json() oauth_url = result.get("url") except: pass elif resp.status_code in [301, 302, 303, 307, 308]: oauth_url = resp.headers.get("Location") if not oauth_url: raise RuntimeError(f"Failed to get OAuth URL: {resp.status_code}") # 访问 OAuth URL(跟随重定向到密码页面) oauth_headers = { "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", "Referer": "https://chatgpt.com/", "Sec-Fetch-Dest": "document", "Sec-Fetch-Mode": "navigate", "Sec-Fetch-Site": "cross-site", "Upgrade-Insecure-Requests": "1", } self.s.get(oauth_url, headers=oauth_headers, allow_redirects=True) logger.info(f"[{self.email}] ✓ OAuth flow initiated") async def _step4_get_sentinel_token(self): """Step 4: 获取 Sentinel Token""" logger.info(f"[{self.email}] Step 4: Getting Sentinel token") try: # 使用登录流程的 flow 参数 self.sentinel_token = await self.sentinel.get_token(flow="username_password_login") logger.info(f"[{self.email}] ✓ Sentinel token obtained") except NotImplementedError as e: logger.warning(f"[{self.email}] Sentinel not available, continuing without it: {e}") self.sentinel_token = None except Exception as e: logger.warning(f"[{self.email}] Sentinel error, continuing without it: {e}") self.sentinel_token = None async def _step5_submit_password(self) -> Optional[str]: """ Step 5: 提交密码验证 返回: continue_url: OAuth 回调 URL(如果需要) """ logger.info(f"[{self.email}] Step 5: Submitting password") payload = { "username": self.email, "password": self.password, } headers = { "Content-Type": "application/json", "Accept": "application/json", "Origin": "https://auth.openai.com", "Referer": "https://auth.openai.com/log-in/password", "Sec-Fetch-Dest": "empty", "Sec-Fetch-Mode": "cors", "Sec-Fetch-Site": "same-origin", # Datadog tracing headers "X-Datadog-Trace-Id": str(secrets.randbits(63)), "X-Datadog-Parent-Id": str(secrets.randbits(63)), "X-Datadog-Sampling-Priority": "1", "X-Datadog-Origin": "rum", "Traceparent": f"00-0000000000000000{secrets.token_hex(8)}-{secrets.token_hex(8)}-01", "Tracestate": "dd=s:1;o:rum", } # 添加 Sentinel token(如果有) if self.sentinel_token: headers["Openai-Sentinel-Token"] = json.dumps(self.sentinel_token) logger.debug(f"[{self.email}] Using Sentinel token") resp = self.s.post( self.AUTH_PASSWORD_VERIFY, json=payload, headers=headers, allow_redirects=False ) if resp.status_code == 200: try: data = resp.json() continue_url = data.get("continue_url") if continue_url: logger.info(f"[{self.email}] ✓ Password verified, need OAuth callback") return continue_url else: logger.info(f"[{self.email}] ✓ Password verified") return None except: logger.info(f"[{self.email}] ✓ Password verified") return None else: error_msg = resp.text[:200] if resp.text else "Unknown error" raise RuntimeError(f"Password verification failed: {resp.status_code} - {error_msg}") async def _step6_complete_oauth(self, continue_url: str): """Step 6: 完成 OAuth 回调""" logger.info(f"[{self.email}] Step 6: Completing OAuth callback") # 确保是完整 URL if not continue_url.startswith("http"): continue_url = f"https://auth.openai.com{continue_url}" headers = { "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", "Referer": "https://auth.openai.com/log-in/password", "Sec-Fetch-Dest": "document", "Sec-Fetch-Mode": "navigate", "Sec-Fetch-Site": "same-origin", "Upgrade-Insecure-Requests": "1", } resp = self.s.get(continue_url, headers=headers, allow_redirects=True) # 检查是否获取到 session-token self.session_token = self.s.get_cookie("__Secure-next-auth.session-token") if self.session_token: logger.info(f"[{self.email}] ✓ OAuth callback completed, got session token") else: logger.warning(f"[{self.email}] OAuth callback completed but no session token found") async def _step7_get_access_token(self): """Step 7: 获取 Access Token""" logger.info(f"[{self.email}] Step 7: Getting access token") headers = { "Accept": "application/json", "Referer": self.CHATGPT_HOME, "Sec-Fetch-Dest": "empty", "Sec-Fetch-Mode": "cors", "Sec-Fetch-Site": "same-origin", } resp = self.s.get(self.CHATGPT_SESSION, headers=headers) if resp.status_code == 200: try: data = resp.json() self.access_token = data.get("accessToken") if self.access_token: logger.info(f"[{self.email}] ✓ Access token obtained") else: logger.warning(f"[{self.email}] No accessToken in response") logger.debug(f"Response: {json.dumps(data, indent=2)}") except Exception as e: logger.error(f"[{self.email}] Failed to parse session response: {e}") else: logger.error(f"[{self.email}] Failed to get session: {resp.status_code}") __all__ = ["LoginFlow"]