Files
autoPlus/core/login_flow.py
2026-01-29 18:54:04 +08:00

381 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
OpenAI 账号登录流程模块
功能:
- 使用邮箱密码登录 ChatGPT
- 获取 access_token 和 session_token
- 支持 Sentinel 验证
- 完整的 OAuth 流程
"""
import json
import uuid
import secrets
from typing import Dict, Any, Optional
from urllib.parse import unquote
from core.session import OAISession, CloudflareBlockError, SessionInvalidError, RateLimitError
from core.sentinel_handler import SentinelHandler
from utils.logger import logger
class LoginFlow:
"""OpenAI 账号登录流程编排器"""
# ChatGPT 相关接口
CHATGPT_HOME = "https://chatgpt.com/"
CHATGPT_CSRF = "https://chatgpt.com/api/auth/csrf"
CHATGPT_SIGNIN = "https://chatgpt.com/api/auth/signin/openai"
CHATGPT_SESSION = "https://chatgpt.com/api/auth/session"
# Auth 相关接口
AUTH_PASSWORD_VERIFY = "https://auth.openai.com/api/accounts/password/verify"
def __init__(self, session: OAISession, email: str, password: str):
"""
初始化登录流程
参数:
session: OAISession 会话实例
email: 登录邮箱
password: 登录密码
"""
self.s = session
self.email = email
self.password = password
# Sentinel 处理器
self.sentinel = SentinelHandler(session)
# 流程状态
self.csrf_token: Optional[str] = None
self.sentinel_token: Optional[Dict[str, Any]] = None
self.auth_session_logging_id = str(uuid.uuid4())
# 登录结果
self.access_token: Optional[str] = None
self.session_token: Optional[str] = None
logger.info(f"LoginFlow initialized for {self.email}")
async def run(self) -> Dict[str, Any]:
"""
执行完整登录流程
返回:
登录结果字典,包含:
- status: 状态 (success/failed/...)
- access_token: 访问令牌(成功时)
- session_token: 会话令牌(成功时)
- error: 错误信息(失败时)
"""
try:
logger.info(f"[{self.email}] Starting login flow")
# Step 1: 初始化会话
await self._step1_init_session()
# Step 2: 获取 CSRF Token
await self._step2_get_csrf_token()
# Step 3: 发起 OAuth 登录请求
await self._step3_oauth_signin()
# Step 4: 获取 Sentinel Token
await self._step4_get_sentinel_token()
# Step 5: 提交密码验证
continue_url = await self._step5_submit_password()
# Step 6: 完成 OAuth 回调
if continue_url:
await self._step6_complete_oauth(continue_url)
# Step 7: 获取 Access Token
await self._step7_get_access_token()
if self.access_token:
logger.success(f"[{self.email}] Login successful! ✅")
return {
"email": self.email,
"status": "success",
"access_token": self.access_token,
"session_token": self.session_token,
"message": "Login successful"
}
else:
return {
"email": self.email,
"status": "failed",
"error": "Failed to get access token"
}
except CloudflareBlockError as e:
logger.error(f"[{self.email}] Cloudflare blocked: {e}")
return {"email": self.email, "status": "cloudflare_blocked", "error": str(e)}
except SessionInvalidError as e:
logger.error(f"[{self.email}] Session invalid: {e}")
return {"email": self.email, "status": "session_invalid", "error": str(e)}
except RateLimitError as e:
logger.error(f"[{self.email}] Rate limited: {e}")
return {"email": self.email, "status": "rate_limited", "error": str(e)}
except Exception as e:
logger.exception(f"[{self.email}] Unexpected error during login")
return {"email": self.email, "status": "failed", "error": str(e)}
async def _step1_init_session(self):
"""Step 1: 初始化会话(访问首页获取初始 cookies"""
logger.info(f"[{self.email}] Step 1: Initializing session")
headers = {
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8",
"Sec-Fetch-Dest": "document",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-Site": "none",
"Sec-Fetch-User": "?1",
"Upgrade-Insecure-Requests": "1",
}
resp = self.s.get(self.CHATGPT_HOME, headers=headers)
logger.info(f"[{self.email}] ✓ Session initialized (status: {resp.status_code})")
async def _step2_get_csrf_token(self):
"""Step 2: 获取 CSRF Token"""
logger.info(f"[{self.email}] Step 2: Getting CSRF token")
headers = {
"Accept": "*/*",
"Referer": self.CHATGPT_HOME,
"Sec-Fetch-Dest": "empty",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Site": "same-origin",
}
resp = self.s.get(self.CHATGPT_CSRF, headers=headers)
if resp.status_code != 200:
raise RuntimeError(f"Failed to get CSRF token: {resp.status_code}")
# 方法1从 Cookie 提取
csrf_cookie = self.s.get_cookie("__Host-next-auth.csrf-token")
if csrf_cookie:
csrf_cookie = unquote(csrf_cookie)
if "|" in csrf_cookie:
self.csrf_token = csrf_cookie.split("|")[0]
# 方法2从响应 JSON 提取
if not self.csrf_token:
try:
data = resp.json()
self.csrf_token = data.get("csrfToken")
except:
pass
if not self.csrf_token:
raise RuntimeError("CSRF token not found")
logger.info(f"[{self.email}] ✓ CSRF token obtained")
async def _step3_oauth_signin(self):
"""Step 3: 发起 OAuth 登录请求"""
logger.info(f"[{self.email}] Step 3: Starting OAuth signin")
# Query 参数
params = {
"prompt": "login",
"ext-oai-did": self.s.oai_did,
"auth_session_logging_id": self.auth_session_logging_id,
"screen_hint": "login_or_signup",
"login_hint": self.email,
}
# POST body
data = {
"callbackUrl": self.CHATGPT_HOME,
"csrfToken": self.csrf_token,
"json": "true",
}
headers = {
"Content-Type": "application/x-www-form-urlencoded",
"Accept": "*/*",
"Origin": "https://chatgpt.com",
"Referer": self.CHATGPT_HOME,
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Dest": "empty",
"Sec-Fetch-Site": "same-origin",
}
resp = self.s.post(
self.CHATGPT_SIGNIN,
params=params,
data=data,
headers=headers,
allow_redirects=False
)
# 提取 OAuth URL
oauth_url = None
if resp.status_code == 200:
try:
result = resp.json()
oauth_url = result.get("url")
except:
pass
elif resp.status_code in [301, 302, 303, 307, 308]:
oauth_url = resp.headers.get("Location")
if not oauth_url:
raise RuntimeError(f"Failed to get OAuth URL: {resp.status_code}")
# 访问 OAuth URL跟随重定向到密码页面
oauth_headers = {
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Referer": "https://chatgpt.com/",
"Sec-Fetch-Dest": "document",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-Site": "cross-site",
"Upgrade-Insecure-Requests": "1",
}
self.s.get(oauth_url, headers=oauth_headers, allow_redirects=True)
logger.info(f"[{self.email}] ✓ OAuth flow initiated")
async def _step4_get_sentinel_token(self):
"""Step 4: 获取 Sentinel Token"""
logger.info(f"[{self.email}] Step 4: Getting Sentinel token")
try:
# 使用登录流程的 flow 参数
self.sentinel_token = await self.sentinel.get_token(flow="username_password_login")
logger.info(f"[{self.email}] ✓ Sentinel token obtained")
except NotImplementedError as e:
logger.warning(f"[{self.email}] Sentinel not available, continuing without it: {e}")
self.sentinel_token = None
except Exception as e:
logger.warning(f"[{self.email}] Sentinel error, continuing without it: {e}")
self.sentinel_token = None
async def _step5_submit_password(self) -> Optional[str]:
"""
Step 5: 提交密码验证
返回:
continue_url: OAuth 回调 URL如果需要
"""
logger.info(f"[{self.email}] Step 5: Submitting password")
payload = {
"username": self.email,
"password": self.password,
}
headers = {
"Content-Type": "application/json",
"Accept": "application/json",
"Origin": "https://auth.openai.com",
"Referer": "https://auth.openai.com/log-in/password",
"Sec-Fetch-Dest": "empty",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Site": "same-origin",
# Datadog tracing headers
"X-Datadog-Trace-Id": str(secrets.randbits(63)),
"X-Datadog-Parent-Id": str(secrets.randbits(63)),
"X-Datadog-Sampling-Priority": "1",
"X-Datadog-Origin": "rum",
"Traceparent": f"00-0000000000000000{secrets.token_hex(8)}-{secrets.token_hex(8)}-01",
"Tracestate": "dd=s:1;o:rum",
}
# 添加 Sentinel token如果有
if self.sentinel_token:
headers["Openai-Sentinel-Token"] = json.dumps(self.sentinel_token)
logger.debug(f"[{self.email}] Using Sentinel token")
resp = self.s.post(
self.AUTH_PASSWORD_VERIFY,
json=payload,
headers=headers,
allow_redirects=False
)
if resp.status_code == 200:
try:
data = resp.json()
continue_url = data.get("continue_url")
if continue_url:
logger.info(f"[{self.email}] ✓ Password verified, need OAuth callback")
return continue_url
else:
logger.info(f"[{self.email}] ✓ Password verified")
return None
except:
logger.info(f"[{self.email}] ✓ Password verified")
return None
else:
error_msg = resp.text[:200] if resp.text else "Unknown error"
raise RuntimeError(f"Password verification failed: {resp.status_code} - {error_msg}")
async def _step6_complete_oauth(self, continue_url: str):
"""Step 6: 完成 OAuth 回调"""
logger.info(f"[{self.email}] Step 6: Completing OAuth callback")
# 确保是完整 URL
if not continue_url.startswith("http"):
continue_url = f"https://auth.openai.com{continue_url}"
headers = {
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Referer": "https://auth.openai.com/log-in/password",
"Sec-Fetch-Dest": "document",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-Site": "same-origin",
"Upgrade-Insecure-Requests": "1",
}
resp = self.s.get(continue_url, headers=headers, allow_redirects=True)
# 检查是否获取到 session-token
self.session_token = self.s.get_cookie("__Secure-next-auth.session-token")
if self.session_token:
logger.info(f"[{self.email}] ✓ OAuth callback completed, got session token")
else:
logger.warning(f"[{self.email}] OAuth callback completed but no session token found")
async def _step7_get_access_token(self):
"""Step 7: 获取 Access Token"""
logger.info(f"[{self.email}] Step 7: Getting access token")
headers = {
"Accept": "application/json",
"Referer": self.CHATGPT_HOME,
"Sec-Fetch-Dest": "empty",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Site": "same-origin",
}
resp = self.s.get(self.CHATGPT_SESSION, headers=headers)
if resp.status_code == 200:
try:
data = resp.json()
self.access_token = data.get("accessToken")
if self.access_token:
logger.info(f"[{self.email}] ✓ Access token obtained")
else:
logger.warning(f"[{self.email}] No accessToken in response")
logger.debug(f"Response: {json.dumps(data, indent=2)}")
except Exception as e:
logger.error(f"[{self.email}] Failed to parse session response: {e}")
else:
logger.error(f"[{self.email}] Failed to get session: {resp.status_code}")
__all__ = ["LoginFlow"]