546 lines
18 KiB
Python
546 lines
18 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
OpenAI 登录测试脚本
|
||
根据抓包数据实现完整登录流程
|
||
"""
|
||
|
||
import json
|
||
import uuid
|
||
import secrets
|
||
import sys
|
||
import os
|
||
from urllib.parse import urlparse, parse_qs
|
||
|
||
# 添加项目根目录到 Python 路径
|
||
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
||
|
||
try:
|
||
from curl_cffi.requests import Session
|
||
USE_CURL = True
|
||
print("✅ Using curl_cffi")
|
||
except ImportError:
|
||
import requests
|
||
Session = requests.Session
|
||
USE_CURL = False
|
||
print("⚠️ Using requests (may fail)")
|
||
|
||
# 导入项目模块
|
||
try:
|
||
from modules.fingerprint import BrowserFingerprint
|
||
from modules.http_client import HTTPClient
|
||
from modules.sentinel_solver import SentinelSolver
|
||
from modules.pow_solver import ProofOfWorkSolver
|
||
MODULES_AVAILABLE = True
|
||
print("✅ Project modules loaded")
|
||
except ImportError as e:
|
||
MODULES_AVAILABLE = False
|
||
print(f"⚠️ Project modules not available: {e}")
|
||
|
||
|
||
class OpenAILogin:
|
||
"""OpenAI 登录客户端"""
|
||
|
||
def __init__(self):
|
||
self.device_id = str(uuid.uuid4())
|
||
self.auth_session_logging_id = str(uuid.uuid4())
|
||
|
||
# 使用项目模块(如果可用)
|
||
if MODULES_AVAILABLE:
|
||
self.fingerprint = BrowserFingerprint(session_id=self.device_id)
|
||
self.http_client = HTTPClient(self.fingerprint)
|
||
self.session = self.http_client.session # 保持兼容性
|
||
self.sentinel_solver = SentinelSolver(self.fingerprint)
|
||
self.pow_solver = ProofOfWorkSolver()
|
||
print("✅ Using HTTPClient with project modules")
|
||
else:
|
||
# 降级使用原始 session
|
||
self.session = Session(impersonate='chrome110') if USE_CURL else Session()
|
||
self.fingerprint = None
|
||
self.http_client = None
|
||
self.sentinel_solver = None
|
||
self.pow_solver = None
|
||
print("⚠️ Using fallback session")
|
||
|
||
def get_headers(self, host='chatgpt.com', **extras):
|
||
"""生成请求头"""
|
||
# 如果有项目的 fingerprint,使用它
|
||
if self.fingerprint:
|
||
headers = self.fingerprint.get_headers(host=host)
|
||
headers.update(extras)
|
||
return headers
|
||
|
||
# 否则使用备用 headers
|
||
headers = {
|
||
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36',
|
||
'Accept-Language': 'en-US,en;q=0.9',
|
||
'Sec-Ch-Ua': '"Chromium";v="131", "Not_A Brand";v="24"',
|
||
'Sec-Ch-Ua-Mobile': '?0',
|
||
'Sec-Ch-Ua-Platform': '"Windows"',
|
||
}
|
||
|
||
if host:
|
||
headers['Host'] = host
|
||
|
||
headers.update(extras)
|
||
return headers
|
||
|
||
def step1_get_csrf(self):
|
||
"""Step 1: 获取 CSRF token"""
|
||
print("\n[1] Getting CSRF token...")
|
||
import time
|
||
|
||
# 1.1 访问首页(获取初始 cookies)
|
||
url = "https://chatgpt.com/"
|
||
headers = self.get_headers(host='chatgpt.com')
|
||
headers.update({
|
||
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8',
|
||
'Sec-Fetch-Dest': 'document',
|
||
'Sec-Fetch-Mode': 'navigate',
|
||
'Sec-Fetch-Site': 'none',
|
||
'Sec-Fetch-User': '?1',
|
||
'Upgrade-Insecure-Requests': '1',
|
||
})
|
||
|
||
resp = self.session.get(url, headers=headers, timeout=30)
|
||
print(f" [1.1] Visited chatgpt.com: {resp.status_code}")
|
||
|
||
# 如果遇到 Cloudflare,等待后重试
|
||
if resp.status_code == 403:
|
||
print(f" ⚠️ Cloudflare challenge detected, waiting 5s...")
|
||
time.sleep(5)
|
||
resp = self.session.get(url, headers=headers, timeout=30)
|
||
print(f" [1.1 Retry] Status: {resp.status_code}")
|
||
|
||
# 1.2 获取 CSRF
|
||
csrf_url = "https://chatgpt.com/api/auth/csrf"
|
||
headers = self.get_headers(host='chatgpt.com')
|
||
headers.update({
|
||
'Accept': '*/*',
|
||
'Referer': 'https://chatgpt.com/',
|
||
'Sec-Fetch-Dest': 'empty',
|
||
'Sec-Fetch-Mode': 'cors',
|
||
'Sec-Fetch-Site': 'same-origin',
|
||
})
|
||
|
||
resp = self.session.get(csrf_url, headers=headers, timeout=30)
|
||
print(f" [1.2] Got CSRF response: {resp.status_code}")
|
||
|
||
# 提取 CSRF token
|
||
csrf_token = None
|
||
|
||
# 方法1:从 cookie 提取
|
||
csrf_cookie = self.session.cookies.get('__Host-next-auth.csrf-token', '')
|
||
if csrf_cookie:
|
||
from urllib.parse import unquote
|
||
csrf_cookie = unquote(csrf_cookie)
|
||
if '|' in csrf_cookie:
|
||
csrf_token = csrf_cookie.split('|')[0]
|
||
|
||
# 方法2:从响应 JSON 提取
|
||
if not csrf_token:
|
||
try:
|
||
data = resp.json()
|
||
csrf_token = data.get('csrfToken', '')
|
||
except:
|
||
pass
|
||
|
||
if csrf_token:
|
||
print(f" ✅ CSRF token: {csrf_token[:30]}...")
|
||
return csrf_token
|
||
else:
|
||
raise Exception("Failed to get CSRF token")
|
||
|
||
def step2_signin_request(self, email: str, csrf_token: str):
|
||
"""Step 2: 发起登录请求(获取 OAuth URL)"""
|
||
print("\n[2] Initiating signin...")
|
||
|
||
url = "https://chatgpt.com/api/auth/signin/openai"
|
||
|
||
# Query 参数
|
||
params = {
|
||
'prompt': 'login',
|
||
'ext-oai-did': self.device_id,
|
||
'auth_session_logging_id': self.auth_session_logging_id,
|
||
'screen_hint': 'login_or_signup', # 或 'login'
|
||
'login_hint': email,
|
||
}
|
||
|
||
# POST body
|
||
data = {
|
||
'callbackUrl': 'https://chatgpt.com/',
|
||
'csrfToken': csrf_token,
|
||
'json': 'true',
|
||
}
|
||
|
||
# Headers
|
||
headers = self.get_headers(host='chatgpt.com')
|
||
headers.update({
|
||
'Content-Type': 'application/x-www-form-urlencoded',
|
||
'Accept': '*/*',
|
||
'Origin': 'https://chatgpt.com',
|
||
'Referer': 'https://chatgpt.com/',
|
||
'Sec-Fetch-Mode': 'cors',
|
||
'Sec-Fetch-Dest': 'empty',
|
||
'Sec-Fetch-Site': 'same-origin',
|
||
})
|
||
|
||
resp = self.session.post(
|
||
url,
|
||
params=params,
|
||
data=data,
|
||
headers=headers,
|
||
allow_redirects=False,
|
||
timeout=30
|
||
)
|
||
|
||
print(f" Status: {resp.status_code}")
|
||
|
||
# 提取 OAuth URL
|
||
oauth_url = None
|
||
|
||
if resp.status_code == 200:
|
||
try:
|
||
result = resp.json()
|
||
oauth_url = result.get('url')
|
||
except:
|
||
pass
|
||
elif resp.status_code in [301, 302, 303, 307, 308]:
|
||
oauth_url = resp.headers.get('Location')
|
||
|
||
if oauth_url:
|
||
print(f" ✅ Got OAuth URL")
|
||
return oauth_url
|
||
else:
|
||
raise Exception(f"Failed to get OAuth URL: {resp.status_code} {resp.text}")
|
||
|
||
def step3_visit_oauth(self, oauth_url: str):
|
||
"""Step 3: 访问 OAuth authorize(重定向到密码页面)"""
|
||
print("\n[3] Following OAuth flow...")
|
||
|
||
headers = self.get_headers(host='auth.openai.com')
|
||
headers.update({
|
||
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
|
||
'Referer': 'https://chatgpt.com/',
|
||
'Sec-Fetch-Dest': 'document',
|
||
'Sec-Fetch-Mode': 'navigate',
|
||
'Sec-Fetch-Site': 'cross-site',
|
||
'Upgrade-Insecure-Requests': '1',
|
||
})
|
||
|
||
resp = self.session.get(
|
||
oauth_url,
|
||
headers=headers,
|
||
allow_redirects=True, # 自动跟随重定向到 /log-in/password
|
||
timeout=30
|
||
)
|
||
|
||
print(f" Final URL: {resp.url}")
|
||
print(f" Status: {resp.status_code}")
|
||
|
||
# 检查是否获取到必需的 cookies
|
||
required_cookies = ['login_session', 'oai-did', 'oai-login-csrf_dev_3772291445']
|
||
for cookie_name in required_cookies:
|
||
cookie_value = self.session.cookies.get(cookie_name)
|
||
if cookie_value:
|
||
print(f" ✅ Cookie: {cookie_name}")
|
||
else:
|
||
print(f" ⚠️ Missing: {cookie_name}")
|
||
|
||
return resp
|
||
|
||
def step3_5_generate_sentinel(self):
|
||
"""Step 3.5: 生成 Sentinel token(如果需要)"""
|
||
if not MODULES_AVAILABLE or not self.sentinel_solver:
|
||
print("\n[3.5] ⚠️ Skipping Sentinel (not available)")
|
||
return None
|
||
|
||
print("\n[3.5] Generating Sentinel token...")
|
||
|
||
try:
|
||
# 生成 requirements token
|
||
token_data = self.sentinel_solver.generate_requirements_token()
|
||
sentinel_token = json.dumps(token_data)
|
||
print(f" ✅ Sentinel token generated")
|
||
|
||
# 提交到 Sentinel 服务器
|
||
url = "https://sentinel.openai.com/backend-api/sentinel/req"
|
||
|
||
payload = {
|
||
"p": token_data['p'],
|
||
"id": self.device_id,
|
||
"flow": "username_password_login" # 注意:登录用 login,注册用 create
|
||
}
|
||
|
||
headers = self.get_headers(host='sentinel.openai.com')
|
||
headers.update({
|
||
'Content-Type': 'text/plain;charset=UTF-8',
|
||
'Origin': 'https://sentinel.openai.com',
|
||
'Referer': 'https://sentinel.openai.com/backend-api/sentinel/frame.html',
|
||
'Accept': '*/*',
|
||
'Sec-Fetch-Site': 'same-origin',
|
||
'Sec-Fetch-Mode': 'cors',
|
||
'Sec-Fetch-Dest': 'empty',
|
||
})
|
||
|
||
resp = self.session.post(url, json=payload, headers=headers, timeout=30)
|
||
|
||
if resp.status_code == 200:
|
||
data = resp.json()
|
||
print(f" ✅ Sentinel accepted (persona: {data.get('persona')})")
|
||
|
||
# 检查是否需要 PoW
|
||
if data.get('proofofwork', {}).get('required'):
|
||
print(f" ⚠️ PoW required, solving...")
|
||
pow_data = data['proofofwork']
|
||
pow_answer = self.pow_solver.solve(pow_data['seed'], pow_data['difficulty'])
|
||
|
||
# 提交 PoW
|
||
pow_payload = {
|
||
"p": token_data['p'],
|
||
"id": self.device_id,
|
||
"answer": pow_answer
|
||
}
|
||
pow_resp = self.session.post(url, json=pow_payload, headers=headers, timeout=30)
|
||
if pow_resp.status_code == 200:
|
||
print(f" ✅ PoW accepted")
|
||
else:
|
||
print(f" ⚠️ PoW failed: {pow_resp.status_code}")
|
||
|
||
return sentinel_token
|
||
else:
|
||
print(f" ⚠️ Sentinel failed: {resp.status_code}")
|
||
return None
|
||
|
||
except Exception as e:
|
||
print(f" ⚠️ Sentinel error: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
return None
|
||
|
||
def step4_submit_password(self, email: str, password: str, sentinel_token: str = None):
|
||
"""Step 4: 提交密码(验证)"""
|
||
print("\n[4] Submitting password...")
|
||
|
||
# 正确的接口:/api/accounts/password/verify
|
||
url = "https://auth.openai.com/api/accounts/password/verify"
|
||
|
||
# 需要同时传 username 和 password
|
||
payload = {
|
||
"username": email,
|
||
"password": password,
|
||
}
|
||
|
||
# Headers
|
||
headers = self.get_headers(host='auth.openai.com')
|
||
headers.update({
|
||
'Content-Type': 'application/json',
|
||
'Accept': 'application/json',
|
||
'Origin': 'https://auth.openai.com',
|
||
'Referer': 'https://auth.openai.com/log-in/password',
|
||
'Sec-Fetch-Dest': 'empty',
|
||
'Sec-Fetch-Mode': 'cors',
|
||
'Sec-Fetch-Site': 'same-origin',
|
||
})
|
||
|
||
# 添加 Sentinel token(如果有)
|
||
if sentinel_token:
|
||
headers['Openai-Sentinel-Token'] = sentinel_token
|
||
print(f" ✅ Using Sentinel token")
|
||
|
||
# 添加 Datadog tracing headers(模拟真实请求)
|
||
headers.update({
|
||
'X-Datadog-Trace-Id': str(secrets.randbits(63)),
|
||
'X-Datadog-Parent-Id': str(secrets.randbits(63)),
|
||
'X-Datadog-Sampling-Priority': '1',
|
||
'X-Datadog-Origin': 'rum',
|
||
'Traceparent': f'00-0000000000000000{secrets.token_hex(8)}-{secrets.token_hex(8)}-01',
|
||
'Tracestate': 'dd=s:1;o:rum',
|
||
})
|
||
|
||
resp = self.session.post(
|
||
url,
|
||
json=payload,
|
||
headers=headers,
|
||
allow_redirects=False,
|
||
timeout=30
|
||
)
|
||
|
||
print(f" Status: {resp.status_code}")
|
||
|
||
try:
|
||
data = resp.json()
|
||
print(f" Response: {json.dumps(data, indent=2)}")
|
||
|
||
if resp.status_code == 200:
|
||
# 检查是否需要跟随 OAuth 回调
|
||
continue_url = data.get('continue_url')
|
||
if continue_url:
|
||
print(f" ✅ Login successful, need to complete OAuth")
|
||
return {'success': True, 'continue_url': continue_url}
|
||
else:
|
||
return {'success': True}
|
||
|
||
else:
|
||
return {'success': False, 'error': data}
|
||
|
||
except Exception as e:
|
||
print(f" ❌ Exception: {e}")
|
||
return {'success': False, 'error': str(e)}
|
||
|
||
def step5_complete_oauth(self, continue_url: str):
|
||
"""Step 5: 完成 OAuth 回调(获取 session-token)"""
|
||
print("\n[5] Completing OAuth callback...")
|
||
|
||
# 确保是完整 URL
|
||
if not continue_url.startswith('http'):
|
||
continue_url = f"https://auth.openai.com{continue_url}"
|
||
|
||
headers = self.get_headers(host='auth.openai.com')
|
||
headers.update({
|
||
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
|
||
'Referer': 'https://auth.openai.com/log-in/password',
|
||
'Sec-Fetch-Dest': 'document',
|
||
'Sec-Fetch-Mode': 'navigate',
|
||
'Sec-Fetch-Site': 'same-origin',
|
||
'Upgrade-Insecure-Requests': '1',
|
||
})
|
||
|
||
resp = self.session.get(
|
||
continue_url,
|
||
headers=headers,
|
||
allow_redirects=True, # 自动跟随重定向到 chatgpt.com
|
||
timeout=30
|
||
)
|
||
|
||
print(f" Final URL: {resp.url}")
|
||
print(f" Status: {resp.status_code}")
|
||
|
||
# 检查是否获取到 session-token
|
||
session_token = self.session.cookies.get('__Secure-next-auth.session-token')
|
||
|
||
if session_token:
|
||
print(f" ✅ Got session-token: {session_token[:50]}...")
|
||
return True
|
||
else:
|
||
print(f" ❌ No session-token found")
|
||
return False
|
||
|
||
def step6_get_access_token(self):
|
||
"""Step 6: 获取 accessToken"""
|
||
print("\n[6] Getting access token...")
|
||
|
||
url = "https://chatgpt.com/api/auth/session"
|
||
|
||
headers = self.get_headers(host='chatgpt.com')
|
||
headers.update({
|
||
'Accept': 'application/json',
|
||
'Referer': 'https://chatgpt.com/',
|
||
'Sec-Fetch-Dest': 'empty',
|
||
'Sec-Fetch-Mode': 'cors',
|
||
'Sec-Fetch-Site': 'same-origin',
|
||
})
|
||
|
||
resp = self.session.get(url, headers=headers, timeout=30)
|
||
|
||
print(f" Status: {resp.status_code}")
|
||
|
||
if resp.status_code == 200:
|
||
try:
|
||
data = resp.json()
|
||
access_token = data.get('accessToken')
|
||
|
||
if access_token:
|
||
print(f" ✅ Access token: {access_token[:50]}...")
|
||
return access_token
|
||
else:
|
||
print(f" ❌ No accessToken in response")
|
||
print(f" Response: {json.dumps(data, indent=2)}")
|
||
return None
|
||
except Exception as e:
|
||
print(f" ❌ Failed to parse response: {e}")
|
||
return None
|
||
else:
|
||
print(f" ❌ Failed: {resp.status_code}")
|
||
return None
|
||
|
||
def login(self, email: str, password: str):
|
||
"""完整登录流程"""
|
||
print(f"\n{'='*60}")
|
||
print(f"Starting login for: {email}")
|
||
print(f"{'='*60}")
|
||
|
||
try:
|
||
# Step 1: 获取 CSRF token
|
||
csrf_token = self.step1_get_csrf()
|
||
|
||
# Step 2: 发起登录请求
|
||
oauth_url = self.step2_signin_request(email, csrf_token)
|
||
|
||
# Step 3: 访问 OAuth(重定向到密码页面)
|
||
self.step3_visit_oauth(oauth_url)
|
||
|
||
# Step 3.5: 生成 Sentinel token
|
||
sentinel_token = self.step3_5_generate_sentinel()
|
||
|
||
# Step 4: 提交密码
|
||
login_result = self.step4_submit_password(email, password, sentinel_token)
|
||
|
||
if not login_result.get('success'):
|
||
print(f"\n❌ Login failed: {login_result.get('error')}")
|
||
return None
|
||
|
||
# Step 5: 完成 OAuth 回调
|
||
continue_url = login_result.get('continue_url')
|
||
if continue_url:
|
||
oauth_success = self.step5_complete_oauth(continue_url)
|
||
if not oauth_success:
|
||
print(f"\n❌ OAuth callback failed")
|
||
return None
|
||
|
||
# Step 6: 获取 access token
|
||
access_token = self.step6_get_access_token()
|
||
|
||
if access_token:
|
||
print(f"\n{'='*60}")
|
||
print(f"✅ LOGIN SUCCESS!")
|
||
print(f"{'='*60}")
|
||
return access_token
|
||
else:
|
||
print(f"\n❌ Failed to get access token")
|
||
return None
|
||
|
||
except Exception as e:
|
||
print(f"\n❌ Exception during login: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
return None
|
||
|
||
|
||
def main():
|
||
"""测试入口"""
|
||
|
||
# 从用户输入获取账号密码
|
||
email = input("Email: ").strip()
|
||
password = input("Password: ").strip()
|
||
|
||
if not email or not password:
|
||
print("❌ Email and password are required!")
|
||
return
|
||
|
||
# 创建登录客户端
|
||
client = OpenAILogin()
|
||
|
||
# 执行登录
|
||
access_token = client.login(email, password)
|
||
|
||
if access_token:
|
||
print(f"\n📋 Access Token:")
|
||
print(access_token)
|
||
print(f"\n💾 You can use this token to make API calls")
|
||
else:
|
||
print(f"\n❌ Login failed")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|