拿到token

This commit is contained in:
dela
2026-01-29 18:54:04 +08:00
parent 70627f09fe
commit 433bb4d3c1
9 changed files with 954 additions and 7 deletions

View File

@@ -28,6 +28,9 @@ class RegisterFlow:
AUTH_VALIDATE_OTP = "https://auth.openai.com/api/accounts/email-otp/validate"
AUTH_COMPLETE_PROFILE = "https://auth.openai.com/api/accounts/create_account"
# 获取 Token 相关
CHATGPT_SESSION = "https://chatgpt.com/api/auth/session"
def __init__(self, session: OAISession, config, email: Optional[str] = None, password: Optional[str] = None):
self.s = session
self.config = config
@@ -41,6 +44,7 @@ class RegisterFlow:
self.csrf_token: Optional[str] = None
self.sentinel_token: Optional[Dict[str, Any]] = None
self.otp: Optional[str] = None
self.access_token: Optional[str] = None
logger.info(f"RegisterFlow initialized for {self.email} (oai-did: {self.s.oai_did})")
@@ -68,12 +72,14 @@ class RegisterFlow:
await self._step6_send_email_otp()
await self._step7_submit_otp()
await self._step8_complete_profile()
await self._step9_get_access_token()
logger.success(f"[{self.email}] Registration completed successfully! ✅")
return {
"email": self.email,
"password": self.password,
"oai_did": self.s.oai_did,
"access_token": self.access_token,
"status": "success",
"message": "Account registered successfully"
}
@@ -239,14 +245,54 @@ class RegisterFlow:
resp = self.s.post(
self.AUTH_COMPLETE_PROFILE,
json={"name": name, "birthdate": birthdate},
headers={"Content-Type": "application/json", "Referer": self.AUTH_CREATE_ACCOUNT}
headers={"Content-Type": "application/json", "Referer": self.AUTH_CREATE_ACCOUNT},
allow_redirects=False
)
if resp.status_code != 200:
if resp.status_code not in [200, 302, 303]:
raise RuntimeError(f"Profile completion failed: {resp.status_code}")
# 检查是否有 continue_url 需要跟随
try:
data = resp.json()
continue_url = data.get("continue_url")
if continue_url:
logger.info(f"[{self.email}] Following OAuth callback...")
if not continue_url.startswith("http"):
continue_url = f"https://auth.openai.com{continue_url}"
# 跟随 OAuth 回调,最终会重定向到 chatgpt.com
callback_headers = {
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Referer": "https://auth.openai.com/",
"Sec-Fetch-Dest": "document",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-Site": "same-origin",
"Upgrade-Insecure-Requests": "1",
}
self.s.get(continue_url, headers=callback_headers, allow_redirects=True)
logger.info(f"[{self.email}] ✓ OAuth callback completed")
except Exception as e:
logger.debug(f"[{self.email}] No continue_url or parse error: {e}")
logger.info(f"[{self.email}] ✓ Profile completed")
async def _step9_get_access_token(self):
"""Step 9: 通过登录流程获取 Access Token"""
logger.info(f"[{self.email}] Step 9: Getting access token via login")
from core.login_flow import LoginFlow
# 使用当前 session 执行登录流程
login_flow = LoginFlow(self.s, self.email, self.password)
result = await login_flow.run()
if result.get("status") == "success":
self.access_token = result.get("access_token")
logger.info(f"[{self.email}] ✓ Access token obtained: {self.access_token[:50]}...")
else:
logger.warning(f"[{self.email}] Failed to get access token: {result.get('error')}")
def _generate_email(self) -> str:
"""生成随机邮箱"""
random_part = secrets.token_hex(8)

380
core/login_flow.py Normal file
View File

@@ -0,0 +1,380 @@
"""
OpenAI 账号登录流程模块
功能:
- 使用邮箱密码登录 ChatGPT
- 获取 access_token 和 session_token
- 支持 Sentinel 验证
- 完整的 OAuth 流程
"""
import json
import uuid
import secrets
from typing import Dict, Any, Optional
from urllib.parse import unquote
from core.session import OAISession, CloudflareBlockError, SessionInvalidError, RateLimitError
from core.sentinel_handler import SentinelHandler
from utils.logger import logger
class LoginFlow:
"""OpenAI 账号登录流程编排器"""
# ChatGPT 相关接口
CHATGPT_HOME = "https://chatgpt.com/"
CHATGPT_CSRF = "https://chatgpt.com/api/auth/csrf"
CHATGPT_SIGNIN = "https://chatgpt.com/api/auth/signin/openai"
CHATGPT_SESSION = "https://chatgpt.com/api/auth/session"
# Auth 相关接口
AUTH_PASSWORD_VERIFY = "https://auth.openai.com/api/accounts/password/verify"
def __init__(self, session: OAISession, email: str, password: str):
"""
初始化登录流程
参数:
session: OAISession 会话实例
email: 登录邮箱
password: 登录密码
"""
self.s = session
self.email = email
self.password = password
# Sentinel 处理器
self.sentinel = SentinelHandler(session)
# 流程状态
self.csrf_token: Optional[str] = None
self.sentinel_token: Optional[Dict[str, Any]] = None
self.auth_session_logging_id = str(uuid.uuid4())
# 登录结果
self.access_token: Optional[str] = None
self.session_token: Optional[str] = None
logger.info(f"LoginFlow initialized for {self.email}")
async def run(self) -> Dict[str, Any]:
"""
执行完整登录流程
返回:
登录结果字典,包含:
- status: 状态 (success/failed/...)
- access_token: 访问令牌(成功时)
- session_token: 会话令牌(成功时)
- error: 错误信息(失败时)
"""
try:
logger.info(f"[{self.email}] Starting login flow")
# Step 1: 初始化会话
await self._step1_init_session()
# Step 2: 获取 CSRF Token
await self._step2_get_csrf_token()
# Step 3: 发起 OAuth 登录请求
await self._step3_oauth_signin()
# Step 4: 获取 Sentinel Token
await self._step4_get_sentinel_token()
# Step 5: 提交密码验证
continue_url = await self._step5_submit_password()
# Step 6: 完成 OAuth 回调
if continue_url:
await self._step6_complete_oauth(continue_url)
# Step 7: 获取 Access Token
await self._step7_get_access_token()
if self.access_token:
logger.success(f"[{self.email}] Login successful! ✅")
return {
"email": self.email,
"status": "success",
"access_token": self.access_token,
"session_token": self.session_token,
"message": "Login successful"
}
else:
return {
"email": self.email,
"status": "failed",
"error": "Failed to get access token"
}
except CloudflareBlockError as e:
logger.error(f"[{self.email}] Cloudflare blocked: {e}")
return {"email": self.email, "status": "cloudflare_blocked", "error": str(e)}
except SessionInvalidError as e:
logger.error(f"[{self.email}] Session invalid: {e}")
return {"email": self.email, "status": "session_invalid", "error": str(e)}
except RateLimitError as e:
logger.error(f"[{self.email}] Rate limited: {e}")
return {"email": self.email, "status": "rate_limited", "error": str(e)}
except Exception as e:
logger.exception(f"[{self.email}] Unexpected error during login")
return {"email": self.email, "status": "failed", "error": str(e)}
async def _step1_init_session(self):
"""Step 1: 初始化会话(访问首页获取初始 cookies"""
logger.info(f"[{self.email}] Step 1: Initializing session")
headers = {
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8",
"Sec-Fetch-Dest": "document",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-Site": "none",
"Sec-Fetch-User": "?1",
"Upgrade-Insecure-Requests": "1",
}
resp = self.s.get(self.CHATGPT_HOME, headers=headers)
logger.info(f"[{self.email}] ✓ Session initialized (status: {resp.status_code})")
async def _step2_get_csrf_token(self):
"""Step 2: 获取 CSRF Token"""
logger.info(f"[{self.email}] Step 2: Getting CSRF token")
headers = {
"Accept": "*/*",
"Referer": self.CHATGPT_HOME,
"Sec-Fetch-Dest": "empty",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Site": "same-origin",
}
resp = self.s.get(self.CHATGPT_CSRF, headers=headers)
if resp.status_code != 200:
raise RuntimeError(f"Failed to get CSRF token: {resp.status_code}")
# 方法1从 Cookie 提取
csrf_cookie = self.s.get_cookie("__Host-next-auth.csrf-token")
if csrf_cookie:
csrf_cookie = unquote(csrf_cookie)
if "|" in csrf_cookie:
self.csrf_token = csrf_cookie.split("|")[0]
# 方法2从响应 JSON 提取
if not self.csrf_token:
try:
data = resp.json()
self.csrf_token = data.get("csrfToken")
except:
pass
if not self.csrf_token:
raise RuntimeError("CSRF token not found")
logger.info(f"[{self.email}] ✓ CSRF token obtained")
async def _step3_oauth_signin(self):
"""Step 3: 发起 OAuth 登录请求"""
logger.info(f"[{self.email}] Step 3: Starting OAuth signin")
# Query 参数
params = {
"prompt": "login",
"ext-oai-did": self.s.oai_did,
"auth_session_logging_id": self.auth_session_logging_id,
"screen_hint": "login_or_signup",
"login_hint": self.email,
}
# POST body
data = {
"callbackUrl": self.CHATGPT_HOME,
"csrfToken": self.csrf_token,
"json": "true",
}
headers = {
"Content-Type": "application/x-www-form-urlencoded",
"Accept": "*/*",
"Origin": "https://chatgpt.com",
"Referer": self.CHATGPT_HOME,
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Dest": "empty",
"Sec-Fetch-Site": "same-origin",
}
resp = self.s.post(
self.CHATGPT_SIGNIN,
params=params,
data=data,
headers=headers,
allow_redirects=False
)
# 提取 OAuth URL
oauth_url = None
if resp.status_code == 200:
try:
result = resp.json()
oauth_url = result.get("url")
except:
pass
elif resp.status_code in [301, 302, 303, 307, 308]:
oauth_url = resp.headers.get("Location")
if not oauth_url:
raise RuntimeError(f"Failed to get OAuth URL: {resp.status_code}")
# 访问 OAuth URL跟随重定向到密码页面
oauth_headers = {
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Referer": "https://chatgpt.com/",
"Sec-Fetch-Dest": "document",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-Site": "cross-site",
"Upgrade-Insecure-Requests": "1",
}
self.s.get(oauth_url, headers=oauth_headers, allow_redirects=True)
logger.info(f"[{self.email}] ✓ OAuth flow initiated")
async def _step4_get_sentinel_token(self):
"""Step 4: 获取 Sentinel Token"""
logger.info(f"[{self.email}] Step 4: Getting Sentinel token")
try:
# 使用登录流程的 flow 参数
self.sentinel_token = await self.sentinel.get_token(flow="username_password_login")
logger.info(f"[{self.email}] ✓ Sentinel token obtained")
except NotImplementedError as e:
logger.warning(f"[{self.email}] Sentinel not available, continuing without it: {e}")
self.sentinel_token = None
except Exception as e:
logger.warning(f"[{self.email}] Sentinel error, continuing without it: {e}")
self.sentinel_token = None
async def _step5_submit_password(self) -> Optional[str]:
"""
Step 5: 提交密码验证
返回:
continue_url: OAuth 回调 URL如果需要
"""
logger.info(f"[{self.email}] Step 5: Submitting password")
payload = {
"username": self.email,
"password": self.password,
}
headers = {
"Content-Type": "application/json",
"Accept": "application/json",
"Origin": "https://auth.openai.com",
"Referer": "https://auth.openai.com/log-in/password",
"Sec-Fetch-Dest": "empty",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Site": "same-origin",
# Datadog tracing headers
"X-Datadog-Trace-Id": str(secrets.randbits(63)),
"X-Datadog-Parent-Id": str(secrets.randbits(63)),
"X-Datadog-Sampling-Priority": "1",
"X-Datadog-Origin": "rum",
"Traceparent": f"00-0000000000000000{secrets.token_hex(8)}-{secrets.token_hex(8)}-01",
"Tracestate": "dd=s:1;o:rum",
}
# 添加 Sentinel token如果有
if self.sentinel_token:
headers["Openai-Sentinel-Token"] = json.dumps(self.sentinel_token)
logger.debug(f"[{self.email}] Using Sentinel token")
resp = self.s.post(
self.AUTH_PASSWORD_VERIFY,
json=payload,
headers=headers,
allow_redirects=False
)
if resp.status_code == 200:
try:
data = resp.json()
continue_url = data.get("continue_url")
if continue_url:
logger.info(f"[{self.email}] ✓ Password verified, need OAuth callback")
return continue_url
else:
logger.info(f"[{self.email}] ✓ Password verified")
return None
except:
logger.info(f"[{self.email}] ✓ Password verified")
return None
else:
error_msg = resp.text[:200] if resp.text else "Unknown error"
raise RuntimeError(f"Password verification failed: {resp.status_code} - {error_msg}")
async def _step6_complete_oauth(self, continue_url: str):
"""Step 6: 完成 OAuth 回调"""
logger.info(f"[{self.email}] Step 6: Completing OAuth callback")
# 确保是完整 URL
if not continue_url.startswith("http"):
continue_url = f"https://auth.openai.com{continue_url}"
headers = {
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Referer": "https://auth.openai.com/log-in/password",
"Sec-Fetch-Dest": "document",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-Site": "same-origin",
"Upgrade-Insecure-Requests": "1",
}
resp = self.s.get(continue_url, headers=headers, allow_redirects=True)
# 检查是否获取到 session-token
self.session_token = self.s.get_cookie("__Secure-next-auth.session-token")
if self.session_token:
logger.info(f"[{self.email}] ✓ OAuth callback completed, got session token")
else:
logger.warning(f"[{self.email}] OAuth callback completed but no session token found")
async def _step7_get_access_token(self):
"""Step 7: 获取 Access Token"""
logger.info(f"[{self.email}] Step 7: Getting access token")
headers = {
"Accept": "application/json",
"Referer": self.CHATGPT_HOME,
"Sec-Fetch-Dest": "empty",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Site": "same-origin",
}
resp = self.s.get(self.CHATGPT_SESSION, headers=headers)
if resp.status_code == 200:
try:
data = resp.json()
self.access_token = data.get("accessToken")
if self.access_token:
logger.info(f"[{self.email}] ✓ Access token obtained")
else:
logger.warning(f"[{self.email}] No accessToken in response")
logger.debug(f"Response: {json.dumps(data, indent=2)}")
except Exception as e:
logger.error(f"[{self.email}] Failed to parse session response: {e}")
else:
logger.error(f"[{self.email}] Failed to get session: {resp.status_code}")
__all__ = ["LoginFlow"]

View File

@@ -40,6 +40,7 @@ class OAISession:
- oai-did Cookie 管理(设备指纹)
- 自动错误检测和异常抛出
- 代理支持HTTP/HTTPS/SOCKS5
- 登录获取 access_token
"""
# OpenAI 相关域名
@@ -89,6 +90,11 @@ class OAISession:
logger.info(f"Session initialized with oai-did: {self.oai_did}")
# 登录状态
self.access_token: Optional[str] = None
self.session_token: Optional[str] = None
self.logged_in_email: Optional[str] = None
def _setup_headers(self):
"""
设置 HTTP 请求头,模拟真实 Chrome 浏览器
@@ -288,6 +294,49 @@ class OAISession:
except Exception as e:
logger.warning(f"Error closing session: {e}")
async def login(self, email: str, password: str) -> Dict[str, Any]:
"""
使用邮箱密码登录,获取 access_token
参数:
email: 登录邮箱
password: 登录密码
返回:
登录结果字典,包含:
- status: 状态 (success/failed/...)
- access_token: 访问令牌(成功时)
- session_token: 会话令牌(成功时)
- error: 错误信息(失败时)
示例:
session = OAISession()
result = await session.login("user@example.com", "password123")
if result["status"] == "success":
print(f"Access Token: {result['access_token']}")
"""
from core.login_flow import LoginFlow
flow = LoginFlow(self, email, password)
result = await flow.run()
# 保存登录状态
if result.get("status") == "success":
self.access_token = result.get("access_token")
self.session_token = result.get("session_token")
self.logged_in_email = email
logger.info(f"Session logged in as: {email}")
return result
def is_logged_in(self) -> bool:
"""检查是否已登录"""
return self.access_token is not None
def get_access_token(self) -> Optional[str]:
"""获取当前的 access_token"""
return self.access_token
def __enter__(self):
"""支持 with 语句上下文管理"""
return self