1151 lines
40 KiB
Python
1151 lines
40 KiB
Python
# modules/registrar.py (更新版)
|
||
"""OpenAI 注册流程控制器"""
|
||
|
||
import json
|
||
from typing import Dict, Optional
|
||
import secrets
|
||
import random
|
||
import uuid
|
||
import requests
|
||
from urllib.parse import urlparse
|
||
from .fingerprint import BrowserFingerprint
|
||
from .sentinel_solver import SentinelSolver
|
||
from .http_client import HTTPClient
|
||
from .stripe_payment import StripePaymentHandler
|
||
from config import AUTH_BASE_URL, DEBUG, TEMPMAIL_CONFIG
|
||
from modules.pow_solver import ProofOfWorkSolver
|
||
from modules.tempmail import TempMailClient
|
||
|
||
# 导入 Sentinel 配置
|
||
try:
|
||
from config import SENTINEL_CONFIG
|
||
except ImportError:
|
||
SENTINEL_CONFIG = {'use_native': True} # 默认使用纯 Python 实现
|
||
|
||
class OpenAIRegistrar:
|
||
"""完整的 OpenAI 注册流程"""
|
||
|
||
def __init__(self, session_id: Optional[str] = None, tempmail_client: Optional[TempMailClient] = None, use_native: Optional[bool] = None):
|
||
self.fingerprint = BrowserFingerprint(session_id)
|
||
|
||
# 决定使用哪种 solver
|
||
if use_native is None:
|
||
use_native = SENTINEL_CONFIG.get('use_native', True)
|
||
|
||
self.solver = SentinelSolver(self.fingerprint, use_native=use_native)
|
||
self.http_client = HTTPClient(self.fingerprint)
|
||
self.pow_solver = ProofOfWorkSolver() # 新增
|
||
self.tempmail_client = tempmail_client # 临时邮箱客户端(可选)
|
||
|
||
def _step1_init_through_chatgpt(self, email: str):
|
||
"""通过 ChatGPT web 初始化注册流程"""
|
||
|
||
# 1.1 访问 ChatGPT 首页
|
||
chatgpt_url = "https://chatgpt.com/"
|
||
headers = self.http_client.fingerprint.get_headers(host='chatgpt.com')
|
||
headers['Accept'] = 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8'
|
||
headers['Sec-Fetch-Dest'] = 'document'
|
||
headers['Sec-Fetch-Mode'] = 'navigate'
|
||
headers['Sec-Fetch-Site'] = 'none'
|
||
|
||
resp = self.http_client.session.get(
|
||
chatgpt_url,
|
||
headers=headers,
|
||
timeout=30
|
||
)
|
||
|
||
# 提取 prod_version (用于 native solver)
|
||
try:
|
||
self.prod_version = resp.text.split('data-build="')[1].split('"')[0]
|
||
except (IndexError, AttributeError):
|
||
self.prod_version = "unknown"
|
||
|
||
if DEBUG:
|
||
print(f"✅ [1.1] Visited ChatGPT ({resp.status_code}), prod={self.prod_version[:20]}...")
|
||
|
||
# 1.2 获取 CSRF token
|
||
csrf_url = "https://chatgpt.com/api/auth/csrf"
|
||
|
||
csrf_headers = headers.copy()
|
||
csrf_headers['Accept'] = 'application/json, text/plain, */*'
|
||
csrf_headers['Sec-Fetch-Dest'] = 'empty'
|
||
csrf_headers['Referer'] = 'https://chatgpt.com/'
|
||
|
||
resp = self.http_client.session.get(
|
||
csrf_url,
|
||
headers=csrf_headers,
|
||
timeout=30
|
||
)
|
||
|
||
if DEBUG:
|
||
print(f"✅ [1.2] CSRF API ({resp.status_code})")
|
||
|
||
# 优先从 cookie 提取 CSRF token
|
||
csrf_token = None
|
||
csrf_cookie = self.http_client.cookies.get('__Host-next-auth.csrf-token', '')
|
||
|
||
if csrf_cookie:
|
||
from urllib.parse import unquote
|
||
csrf_cookie = unquote(csrf_cookie)
|
||
|
||
if '|' in csrf_cookie:
|
||
csrf_token = csrf_cookie.split('|')[0]
|
||
if DEBUG:
|
||
print(f"✅ [1.2] CSRF token extracted")
|
||
|
||
# 备选:从响应 JSON 提取
|
||
if not csrf_token and resp.status_code == 200:
|
||
try:
|
||
data = resp.json()
|
||
csrf_token = data.get('csrfToken', '')
|
||
if csrf_token and DEBUG:
|
||
print(f"✅ [1.2] CSRF token from JSON")
|
||
except Exception as e:
|
||
if DEBUG:
|
||
print(f"⚠️ [1.2] Failed to parse JSON")
|
||
|
||
if not csrf_token:
|
||
if DEBUG:
|
||
print(f"❌ [1.2] Failed to obtain CSRF token")
|
||
raise Exception(f"Failed to obtain CSRF token")
|
||
|
||
# 1.3 初始化注册(通过 NextAuth)
|
||
import uuid
|
||
auth_session_logging_id = str(uuid.uuid4())
|
||
|
||
signin_url = "https://chatgpt.com/api/auth/signin/openai"
|
||
signin_params = {
|
||
'prompt': 'login',
|
||
'ext-oai-did': self.fingerprint.session_id,
|
||
'auth_session_logging_id': auth_session_logging_id,
|
||
'screen_hint': 'signup', # 明确指定注册
|
||
'login_hint': email, # 🔥 关键:传入邮箱
|
||
}
|
||
|
||
signin_data = {
|
||
'callbackUrl': 'https://chatgpt.com/',
|
||
'csrfToken': csrf_token,
|
||
'json': 'true',
|
||
}
|
||
|
||
signin_headers = headers.copy()
|
||
signin_headers['Content-Type'] = 'application/x-www-form-urlencoded'
|
||
signin_headers['Accept'] = '*/*'
|
||
signin_headers['Referer'] = 'https://chatgpt.com/'
|
||
signin_headers['Sec-Fetch-Mode'] = 'cors'
|
||
signin_headers['Sec-Fetch-Dest'] = 'empty'
|
||
|
||
resp = self.http_client.session.post(
|
||
signin_url,
|
||
params=signin_params,
|
||
data=signin_data,
|
||
headers=signin_headers,
|
||
allow_redirects=False,
|
||
timeout=30
|
||
)
|
||
|
||
|
||
if DEBUG:
|
||
print(f"✅ [1.3] NextAuth response ({resp.status_code})")
|
||
|
||
|
||
# 1.4 提取 OAuth URL(从 JSON 响应)
|
||
if resp.status_code == 200:
|
||
try:
|
||
try:
|
||
data = resp.json()
|
||
except Exception:
|
||
data = json.loads(resp.text or "{}")
|
||
|
||
oauth_url = data.get('url')
|
||
|
||
if not oauth_url:
|
||
raise Exception(f"No 'url' in NextAuth response: {data}")
|
||
|
||
if DEBUG:
|
||
print(f"✅ [1.4] Got OAuth URL")
|
||
except Exception as e:
|
||
raise Exception(f"Failed to parse NextAuth response: {e}")
|
||
|
||
elif resp.status_code in [301, 302, 303, 307, 308]:
|
||
# 旧版流程:直接重定向
|
||
oauth_url = resp.headers.get('Location')
|
||
if not oauth_url:
|
||
if DEBUG:
|
||
print(f"❌ [1.4] Got redirect but no Location header")
|
||
raise Exception("Got redirect but no Location header")
|
||
|
||
if DEBUG:
|
||
print(f"✅ [1.4] Got OAuth redirect")
|
||
|
||
else:
|
||
raise Exception(f"Unexpected NextAuth response: {resp.status_code}")
|
||
|
||
# 1.5 访问 OAuth authorize endpoint
|
||
auth_headers = self.http_client.fingerprint.get_headers(host='auth.openai.com')
|
||
auth_headers['Accept'] = 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8'
|
||
auth_headers['Referer'] = 'https://chatgpt.com/'
|
||
auth_headers['Sec-Fetch-Dest'] = 'document'
|
||
auth_headers['Sec-Fetch-Mode'] = 'navigate'
|
||
auth_headers['Sec-Fetch-Site'] = 'cross-site'
|
||
|
||
resp = self.http_client.session.get(
|
||
oauth_url,
|
||
headers=auth_headers,
|
||
allow_redirects=True, # 自动跟随重定向
|
||
timeout=30
|
||
)
|
||
|
||
if DEBUG:
|
||
print(f"✅ [1.5] OAuth flow completed ({resp.status_code})")
|
||
|
||
# 检查必需的 cookies
|
||
required_cookies = ['login_session', 'oai-did']
|
||
missing = [c for c in required_cookies if c not in self.http_client.cookies]
|
||
|
||
if missing:
|
||
if DEBUG:
|
||
print(f"⚠️ Missing cookies: {', '.join(missing)}")
|
||
else:
|
||
if DEBUG:
|
||
print(f"✅ All required cookies present")
|
||
|
||
# 1.6 确保获取 auth.openai.com 的 CSRF(用于后续 /register 请求)
|
||
try:
|
||
auth_csrf = self.http_client.get_csrf_token(domain="auth.openai.com")
|
||
if DEBUG and auth_csrf:
|
||
print(f"✅ [1.6] Auth CSRF obtained")
|
||
except Exception as e:
|
||
if DEBUG:
|
||
print(f"⚠️ [1.6] Failed to get auth CSRF")
|
||
|
||
|
||
|
||
|
||
def _step2_init_sentinel(self):
|
||
"""初始化 Sentinel(生成 token)"""
|
||
|
||
try:
|
||
token_data = self.solver.generate_requirements_token()
|
||
self.sentinel_token = json.dumps(token_data)
|
||
|
||
if DEBUG:
|
||
print(f"✅ [2] Sentinel token generated")
|
||
|
||
except Exception as e:
|
||
if DEBUG:
|
||
print(f"❌ [2] Sentinel initialization failed: {e}")
|
||
raise
|
||
def _step2_5_submit_sentinel(self):
|
||
"""Step 2.5: 提交 Sentinel token 到 OpenAI"""
|
||
|
||
# 如果 sentinel_token 是字符串,先解析
|
||
if isinstance(self.sentinel_token, str):
|
||
token_data = json.loads(self.sentinel_token)
|
||
else:
|
||
token_data = self.sentinel_token
|
||
|
||
url = "https://sentinel.openai.com/backend-api/sentinel/req"
|
||
|
||
payload = {
|
||
"p": token_data['p'],
|
||
"id": self.fingerprint.session_id,
|
||
"flow": "username_password_create"
|
||
}
|
||
|
||
headers = self.http_client.fingerprint.get_headers(host='sentinel.openai.com')
|
||
headers['Content-Type'] = 'text/plain;charset=UTF-8' # 注意:text/plain
|
||
headers['Origin'] = 'https://sentinel.openai.com'
|
||
headers['Referer'] = 'https://sentinel.openai.com/backend-api/sentinel/frame.html'
|
||
headers['Accept'] = '*/*'
|
||
headers['Sec-Fetch-Site'] = 'same-origin'
|
||
headers['Sec-Fetch-Mode'] = 'cors'
|
||
headers['Sec-Fetch-Dest'] = 'empty'
|
||
|
||
resp = self.http_client.session.post(
|
||
url,
|
||
json=payload,
|
||
headers=headers,
|
||
timeout=30
|
||
)
|
||
|
||
if resp.status_code != 200:
|
||
if DEBUG:
|
||
print(f"❌ [2.5] Failed to submit Sentinel ({resp.status_code})")
|
||
raise Exception(f"Failed to submit Sentinel token: {resp.status_code} {resp.text}")
|
||
|
||
try:
|
||
data = resp.json()
|
||
if DEBUG:
|
||
print(f"✅ [2.5] Sentinel accepted (persona: {data.get('persona')})")
|
||
|
||
# 检查是否需要额外验证
|
||
if data.get('turnstile', {}).get('required'):
|
||
print(f"⚠️ Turnstile required")
|
||
if data.get('proofofwork', {}).get('required'):
|
||
print(f"⚠️ Proof of Work required (difficulty: {data.get('proofofwork', {}).get('difficulty')})")
|
||
|
||
# 保存 token(可能后续需要)
|
||
self.sentinel_response = data
|
||
return data
|
||
|
||
except Exception as e:
|
||
if DEBUG:
|
||
print(f"❌ [2.5] Failed to parse response: {e}")
|
||
raise
|
||
|
||
def _step2_6_solve_pow(self):
|
||
"""Step 2.6: 解 Proof of Work"""
|
||
|
||
pow_data = self.sentinel_response.get('proofofwork', {})
|
||
|
||
if not pow_data.get('required'):
|
||
if DEBUG:
|
||
print("⏭️ [2.6] PoW not required, skipping")
|
||
return None
|
||
|
||
seed = pow_data.get('seed')
|
||
difficulty = pow_data.get('difficulty')
|
||
|
||
if not seed or not difficulty:
|
||
raise Exception(f"Missing PoW parameters")
|
||
|
||
# 解 PoW
|
||
self.pow_answer = self.pow_solver.solve(seed, difficulty)
|
||
|
||
if DEBUG:
|
||
print(f"✅ [2.6] PoW solved (difficulty: {difficulty})")
|
||
|
||
return self.pow_answer
|
||
def _step2_7_submit_pow(self):
|
||
"""Step 2.7: 提交 PoW 答案到 Sentinel"""
|
||
|
||
url = "https://sentinel.openai.com/backend-api/sentinel/req"
|
||
|
||
# 再次生成 requirements token(或重用之前的)
|
||
requirements_token = self.sentinel_token.split('"p": "')[1].split('"')[0]
|
||
|
||
payload = {
|
||
"p": requirements_token,
|
||
"id": self.fingerprint.session_id,
|
||
"answer": self.pow_answer
|
||
}
|
||
|
||
headers = self.http_client.fingerprint.get_headers(host='sentinel.openai.com')
|
||
headers['Content-Type'] = 'text/plain;charset=UTF-8'
|
||
headers['Origin'] = 'https://sentinel.openai.com'
|
||
headers['Referer'] = 'https://sentinel.openai.com/backend-api/sentinel/frame.html'
|
||
|
||
# 补全 Sec-Fetch-* headers
|
||
headers['Sec-Fetch-Dest'] = 'empty'
|
||
headers['Sec-Fetch-Mode'] = 'cors'
|
||
headers['Sec-Fetch-Site'] = 'same-origin'
|
||
|
||
resp = self.http_client.session.post(
|
||
url,
|
||
json=payload,
|
||
headers=headers,
|
||
timeout=30
|
||
)
|
||
|
||
|
||
if resp.status_code != 200:
|
||
if DEBUG:
|
||
print(f"❌ [2.7] Failed to submit PoW ({resp.status_code})")
|
||
raise Exception(f"Failed to submit PoW: {resp.status_code}")
|
||
|
||
# 解析响应
|
||
result = resp.json()
|
||
|
||
if DEBUG:
|
||
print(f"✅ [2.7] PoW accepted")
|
||
if 'turnstile' in result:
|
||
print(f"⚠️ Turnstile still required")
|
||
|
||
# 保存最终响应
|
||
self.sentinel_response = result
|
||
|
||
return result
|
||
|
||
|
||
|
||
def _step3_attempt_register(self, email: str, password: str) -> Optional[Dict]:
|
||
"""尝试注册(不验证邮箱)"""
|
||
|
||
# 3.2 准备注册请求(正确的 payload)
|
||
url = "https://auth.openai.com/api/accounts/user/register"
|
||
|
||
payload = {
|
||
"username": email, # 不是 "email"!
|
||
"password": password,
|
||
}
|
||
|
||
# 3.3 准备 headers(完全匹配真实请求)
|
||
headers = self.http_client.fingerprint.get_headers(host='auth.openai.com')
|
||
|
||
# 添加 Sentinel token
|
||
if hasattr(self, 'sentinel_token') and self.sentinel_token:
|
||
headers['Openai-Sentinel-Token'] = self.sentinel_token # 注意大小写
|
||
|
||
headers.update({
|
||
'Content-Type': 'application/json',
|
||
'Accept': 'application/json',
|
||
'Accept-Language': 'zh-CN,zh;q=0.9',
|
||
'Origin': 'https://auth.openai.com',
|
||
'Referer': 'https://auth.openai.com/create-account/password', # 注意是 /password
|
||
'Sec-Fetch-Dest': 'empty',
|
||
'Sec-Fetch-Mode': 'cors',
|
||
'Sec-Fetch-Site': 'same-origin',
|
||
'Priority': 'u=1, i',
|
||
})
|
||
|
||
# 添加 Datadog tracing headers(匹配真实请求)
|
||
import secrets
|
||
headers.update({
|
||
'X-Datadog-Trace-Id': str(secrets.randbits(63)),
|
||
'X-Datadog-Parent-Id': str(secrets.randbits(63)),
|
||
'X-Datadog-Sampling-Priority': '1',
|
||
'X-Datadog-Origin': 'rum',
|
||
'Traceparent': f'00-0000000000000000{secrets.token_hex(8)}-{secrets.token_hex(8)}-01',
|
||
'Tracestate': 'dd=s:1;o:rum',
|
||
})
|
||
|
||
# 3.4 准备 cookies(使用所有 auth.openai.com 的 cookies)
|
||
# 不要手动过滤,让 requests 自动处理域名匹配
|
||
|
||
|
||
# 3.5 发送注册请求
|
||
resp = self.http_client.session.post(
|
||
url,
|
||
json=payload,
|
||
headers=headers,
|
||
# 不手动指定 cookies,让 session 自动处理
|
||
timeout=30
|
||
)
|
||
|
||
|
||
# 3.6 处理响应
|
||
try:
|
||
data = resp.json()
|
||
|
||
if resp.status_code == 200:
|
||
if DEBUG:
|
||
print(f"✅ [3] Registration successful!")
|
||
|
||
# 检查是否需要邮箱验证
|
||
continue_url = data.get('continue_url', '')
|
||
if 'email-otp' in continue_url:
|
||
if DEBUG:
|
||
print(f"⏭️ [3] Email OTP verification required")
|
||
return {
|
||
'success': True,
|
||
'requires_verification': True,
|
||
'continue_url': continue_url,
|
||
'method': data.get('method', 'GET'),
|
||
'data': data
|
||
}
|
||
else:
|
||
return {'success': True, 'data': data}
|
||
|
||
elif resp.status_code == 409:
|
||
error = data.get('error', {})
|
||
error_code = error.get('code', '')
|
||
|
||
if error_code == 'invalid_state':
|
||
# Session 无效
|
||
if DEBUG:
|
||
print(f"❌ [3] Invalid session")
|
||
raise Exception(f"Invalid session: {error}")
|
||
|
||
elif error_code == 'email_taken' or 'already' in str(error).lower():
|
||
# 邮箱已被使用
|
||
if DEBUG:
|
||
print(f"⚠️ [3] Email already registered")
|
||
return None
|
||
|
||
else:
|
||
if DEBUG:
|
||
print(f"❌ [3] Registration conflict: {error_code}")
|
||
raise Exception(f"Registration conflict: {data}")
|
||
|
||
elif resp.status_code == 400:
|
||
if DEBUG:
|
||
print(f"❌ [3] Bad Request: {data}")
|
||
raise Exception(f"Bad request: {data}")
|
||
|
||
else:
|
||
if DEBUG:
|
||
print(f"❌ [3] Unexpected status: {resp.status_code}")
|
||
raise Exception(f"Registration failed with {resp.status_code}: {data}")
|
||
|
||
except json.JSONDecodeError as e:
|
||
if DEBUG:
|
||
print(f"❌ [3] Failed to parse JSON response")
|
||
raise Exception(f"Invalid JSON response: {e}")
|
||
|
||
except Exception as e:
|
||
raise
|
||
|
||
def _step5_get_access_token(self) -> str:
|
||
"""Step 5: Retrieve access token from authenticated session
|
||
|
||
This method leverages the existing session cookies (login_session, oai-did,
|
||
__Host-next-auth.csrf-token) obtained after successful registration and
|
||
email verification.
|
||
|
||
Returns:
|
||
Access token string
|
||
|
||
Raises:
|
||
Exception: If access token retrieval fails
|
||
"""
|
||
|
||
url = "https://chatgpt.com/api/auth/session"
|
||
|
||
headers = self.http_client.fingerprint.get_headers(host='chatgpt.com')
|
||
headers.update({
|
||
'Accept': 'application/json',
|
||
'Referer': 'https://chatgpt.com/',
|
||
'Sec-Fetch-Dest': 'empty',
|
||
'Sec-Fetch-Mode': 'cors',
|
||
'Sec-Fetch-Site': 'same-origin',
|
||
})
|
||
|
||
try:
|
||
resp = self.http_client.session.get(
|
||
url,
|
||
headers=headers,
|
||
timeout=30
|
||
)
|
||
|
||
if resp.status_code != 200:
|
||
error_msg = f"Failed to retrieve session: HTTP {resp.status_code}"
|
||
if DEBUG:
|
||
print(f"❌ [5] {error_msg}")
|
||
raise Exception(error_msg)
|
||
|
||
data = resp.json()
|
||
access_token = data.get('accessToken')
|
||
|
||
if not access_token:
|
||
error_msg = "No accessToken in session response"
|
||
if DEBUG:
|
||
print(f"❌ [5] {error_msg}")
|
||
raise Exception(error_msg)
|
||
|
||
if DEBUG:
|
||
print(f"✅ [5] Access token retrieved")
|
||
print(f" Token: {access_token[:50]}...")
|
||
|
||
return access_token
|
||
|
||
except requests.RequestException as e:
|
||
error_msg = f"Network error retrieving access token: {e}"
|
||
if DEBUG:
|
||
print(f"❌ [5] {error_msg}")
|
||
raise Exception(error_msg)
|
||
|
||
except Exception as e:
|
||
if DEBUG:
|
||
print(f"❌ [5] Unexpected error: {e}")
|
||
raise
|
||
|
||
def _step4_verify_email(self, mailbox: str, continue_url: str) -> Dict:
|
||
"""Step 4: 验证邮箱(通过 OTP 验证码)
|
||
|
||
Args:
|
||
mailbox: 邮箱地址
|
||
continue_url: Step 3 返回的 continue_url(例如: /email-otp/send)
|
||
|
||
Returns:
|
||
验证结果
|
||
"""
|
||
|
||
# 4.0 触发发送验证邮件(访问 continue_url)
|
||
if continue_url:
|
||
# 构造完整 URL
|
||
if not continue_url.startswith('http'):
|
||
send_url = f"https://auth.openai.com{continue_url}"
|
||
else:
|
||
send_url = continue_url
|
||
|
||
|
||
# 准备 headers
|
||
headers = self.http_client.fingerprint.get_headers(host='auth.openai.com')
|
||
headers.update({
|
||
'Accept': 'application/json',
|
||
'Referer': 'https://auth.openai.com/create-account/password',
|
||
'Sec-Fetch-Dest': 'empty',
|
||
'Sec-Fetch-Mode': 'cors',
|
||
'Sec-Fetch-Site': 'same-origin',
|
||
})
|
||
|
||
# 发送请求(根据 Step 3 返回的 method)
|
||
resp = self.http_client.session.get(
|
||
send_url,
|
||
headers=headers,
|
||
timeout=30
|
||
)
|
||
|
||
if resp.status_code != 200:
|
||
if DEBUG:
|
||
print(f"❌ [4.0] Failed to trigger email send ({resp.status_code})")
|
||
raise Exception(f"Failed to trigger email send: {resp.status_code} {resp.text}")
|
||
elif DEBUG:
|
||
print(f"✅ [4.0] Email send triggered")
|
||
|
||
# 4.1 检查是否有 tempmail_client
|
||
if not self.tempmail_client:
|
||
raise Exception("No TempMailClient configured. Cannot receive verification emails.")
|
||
|
||
# 4.2 等待验证邮件
|
||
if DEBUG:
|
||
print(f"⏳ [4.1] Waiting for verification email...")
|
||
|
||
email_data = self.tempmail_client.wait_for_email(
|
||
mailbox=mailbox,
|
||
from_filter=None, # 不过滤发件人(因为临时邮箱可能不返回 from 字段)
|
||
subject_filter="chatgpt", # 主题包含 "chatgpt"(匹配 "Your ChatGPT code is...")
|
||
timeout=120,
|
||
interval=5
|
||
)
|
||
|
||
if not email_data:
|
||
if DEBUG:
|
||
print(f"❌ [4.1] Timeout: No verification email received")
|
||
raise Exception("Timeout: No verification email received")
|
||
|
||
# 4.3 提取验证码
|
||
verification_code = self.tempmail_client.extract_verification_code(email_data)
|
||
|
||
if not verification_code:
|
||
# 尝试提取验证链接(备选方案)
|
||
verification_link = self.tempmail_client.extract_verification_link(email_data)
|
||
if verification_link:
|
||
if DEBUG:
|
||
print(f"❌ [4.3] Link-based verification not implemented")
|
||
raise NotImplementedError("Link-based verification not implemented yet")
|
||
else:
|
||
if DEBUG:
|
||
print(f"❌ [4.3] Failed to extract verification code")
|
||
raise Exception("Failed to extract verification code or link from email")
|
||
|
||
if DEBUG:
|
||
print(f"✅ [4.3] Got verification code: {verification_code}")
|
||
|
||
# 4.4 提交验证码
|
||
# 使用正确的验证接口
|
||
verify_url = "https://auth.openai.com/api/accounts/email-otp/validate"
|
||
|
||
# 准备 payload
|
||
payload = {
|
||
"code": verification_code,
|
||
}
|
||
|
||
# 准备 headers
|
||
headers = self.http_client.fingerprint.get_headers(host='auth.openai.com')
|
||
headers.update({
|
||
'Content-Type': 'application/json',
|
||
'Accept': 'application/json',
|
||
'Origin': 'https://auth.openai.com',
|
||
'Referer': 'https://auth.openai.com/email-verification',
|
||
'Sec-Fetch-Dest': 'empty',
|
||
'Sec-Fetch-Mode': 'cors',
|
||
'Sec-Fetch-Site': 'same-origin',
|
||
'Priority': 'u=1, i',
|
||
})
|
||
|
||
# 添加 Datadog tracing headers
|
||
import secrets
|
||
headers.update({
|
||
'X-Datadog-Trace-Id': str(secrets.randbits(63)),
|
||
'X-Datadog-Parent-Id': str(secrets.randbits(63)),
|
||
'X-Datadog-Sampling-Priority': '1',
|
||
'X-Datadog-Origin': 'rum',
|
||
'Traceparent': f'00-0000000000000000{secrets.token_hex(8)}-{secrets.token_hex(8)}-01',
|
||
'Tracestate': 'dd=s:1;o:rum',
|
||
})
|
||
|
||
|
||
# 发送验证请求
|
||
resp = self.http_client.session.post(
|
||
verify_url,
|
||
json=payload,
|
||
headers=headers,
|
||
timeout=30
|
||
)
|
||
|
||
# 处理响应
|
||
try:
|
||
data = resp.json()
|
||
|
||
if resp.status_code == 200:
|
||
if DEBUG:
|
||
print(f"✅ [4.4] Email verified successfully!")
|
||
print(f"📋 [4.4] Response data: {json.dumps(data, indent=2)}")
|
||
|
||
return {
|
||
'success': True,
|
||
'verified': True,
|
||
'data': data
|
||
}
|
||
|
||
else:
|
||
if DEBUG:
|
||
print(f"❌ [4.4] Verification failed: {data}")
|
||
|
||
return {
|
||
'success': False,
|
||
'error': data
|
||
}
|
||
|
||
except Exception as e:
|
||
if DEBUG:
|
||
print(f"❌ [4.4] Exception: {e}")
|
||
raise
|
||
|
||
def _step4_5_submit_personal_info(self) -> Dict:
|
||
"""Step 4.5: 提交个人信息(姓名、生日)完成账号设置
|
||
|
||
Returns:
|
||
包含 OAuth callback URL 的字典
|
||
"""
|
||
|
||
url = "https://auth.openai.com/api/accounts/create_account"
|
||
|
||
# 生成随机姓名和生日
|
||
from modules.data_generator import NameGenerator
|
||
random_name = NameGenerator.generate_full_name()
|
||
|
||
# 生成随机生日(1980-2000年之间)
|
||
year = random.randint(1980, 2000)
|
||
month = random.randint(1, 12)
|
||
day = random.randint(1, 28) # 保险起见,避免2月29日等边界情况
|
||
birthdate = f"{year}-{month:02d}-{day:02d}"
|
||
|
||
payload = {
|
||
"name": random_name,
|
||
"birthdate": birthdate
|
||
}
|
||
|
||
# 准备 headers
|
||
headers = self.http_client.fingerprint.get_headers(host='auth.openai.com')
|
||
headers.update({
|
||
'Content-Type': 'application/json',
|
||
'Accept': 'application/json',
|
||
'Origin': 'https://auth.openai.com',
|
||
'Referer': 'https://auth.openai.com/about-you',
|
||
'Sec-Fetch-Dest': 'empty',
|
||
'Sec-Fetch-Mode': 'cors',
|
||
'Sec-Fetch-Site': 'same-origin',
|
||
'Priority': 'u=1, i',
|
||
})
|
||
|
||
# 添加 Datadog tracing headers
|
||
import secrets
|
||
headers.update({
|
||
'X-Datadog-Trace-Id': str(secrets.randbits(63)),
|
||
'X-Datadog-Parent-Id': str(secrets.randbits(63)),
|
||
'X-Datadog-Sampling-Priority': '1',
|
||
'X-Datadog-Origin': 'rum',
|
||
'Traceparent': f'00-0000000000000000{secrets.token_hex(8)}-{secrets.token_hex(8)}-01',
|
||
'Tracestate': 'dd=s:1;o:rum',
|
||
})
|
||
|
||
# 发送请求
|
||
resp = self.http_client.session.post(
|
||
url,
|
||
json=payload,
|
||
headers=headers,
|
||
timeout=30
|
||
)
|
||
|
||
# 处理响应
|
||
try:
|
||
data = resp.json()
|
||
|
||
if resp.status_code == 200:
|
||
oauth_url = data.get('continue_url')
|
||
|
||
if DEBUG:
|
||
print(f"✅ [4.5] Personal info submitted")
|
||
print(f" Name: {random_name}")
|
||
print(f" Birthdate: {birthdate}")
|
||
|
||
if oauth_url:
|
||
return {
|
||
'success': True,
|
||
'oauth_url': oauth_url,
|
||
'method': data.get('method', 'GET')
|
||
}
|
||
else:
|
||
raise Exception(f"No continue_url in response: {data}")
|
||
|
||
else:
|
||
if DEBUG:
|
||
print(f"❌ [4.5] Failed to submit personal info: {data}")
|
||
raise Exception(f"Failed to submit personal info: {resp.status_code} {data}")
|
||
|
||
except Exception as e:
|
||
if DEBUG:
|
||
print(f"❌ [4.5] Exception: {e}")
|
||
raise
|
||
|
||
def _step4_6_complete_oauth_flow(self, oauth_url: str):
|
||
"""Step 4.6: 完成 OAuth 回调流程,获取 session-token
|
||
|
||
这一步会自动跟随重定向链:
|
||
1. GET oauth_url → 302 to /api/accounts/consent
|
||
2. GET /api/accounts/consent → 302 to chatgpt.com/api/auth/callback/openai
|
||
3. GET chatgpt.com/api/auth/callback/openai → 最终生成 session-token
|
||
|
||
Args:
|
||
oauth_url: Step 4.5 返回的 OAuth URL
|
||
"""
|
||
|
||
# 确保是完整 URL
|
||
if not oauth_url.startswith('http'):
|
||
oauth_url = f"https://auth.openai.com{oauth_url}"
|
||
|
||
# 准备 headers
|
||
headers = self.http_client.fingerprint.get_headers(host='auth.openai.com')
|
||
headers.update({
|
||
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
|
||
'Referer': 'https://auth.openai.com/about-you',
|
||
'Sec-Fetch-Dest': 'document',
|
||
'Sec-Fetch-Mode': 'navigate',
|
||
'Sec-Fetch-Site': 'same-origin',
|
||
'Upgrade-Insecure-Requests': '1',
|
||
})
|
||
|
||
# 发送请求,允许自动跟随重定向
|
||
# requests.Session 会自动处理跨域重定向(auth.openai.com → chatgpt.com)
|
||
resp = self.http_client.session.get(
|
||
oauth_url,
|
||
headers=headers,
|
||
allow_redirects=True, # 自动跟随所有重定向
|
||
timeout=30
|
||
)
|
||
|
||
# 检查是否获取到 session-token
|
||
session_token = None
|
||
|
||
# curl_cffi 的 cookies 是字典格式,requests 的是 CookieJar
|
||
cookies = self.http_client.session.cookies
|
||
|
||
# 尝试字典访问(curl_cffi)
|
||
if hasattr(cookies, 'get'):
|
||
session_token = cookies.get('__Secure-next-auth.session-token')
|
||
|
||
if session_token:
|
||
if DEBUG:
|
||
print(f"✅ [4.6] OAuth flow completed")
|
||
print(f" Got session-token: {session_token[:50]}...")
|
||
return True
|
||
else:
|
||
if DEBUG:
|
||
print(f"❌ [4.6] OAuth flow completed but no session-token found")
|
||
print(f" Final URL: {resp.url}")
|
||
print(f" Status: {resp.status_code}")
|
||
raise Exception("OAuth flow completed but no session-token cookie was set")
|
||
|
||
|
||
def _step5_solve_challenge(self, challenge: Dict) -> str:
|
||
"""解决 enforcement 挑战"""
|
||
|
||
sentinel_token = self.solver.solve_enforcement(challenge)
|
||
|
||
return sentinel_token
|
||
|
||
def _step6_register_with_token(self, email: str, password: str,
|
||
sentinel_token: str) -> Dict:
|
||
"""带 Sentinel token 重新注册"""
|
||
|
||
url = f"{AUTH_BASE_URL}/api/accounts/user/register"
|
||
|
||
payload = {
|
||
'username': email,
|
||
'password': password,
|
||
'client_id': 'sentinel'
|
||
}
|
||
|
||
resp = self.http_client.post(
|
||
url,
|
||
json_data=payload,
|
||
sentinel_token=sentinel_token
|
||
)
|
||
|
||
if resp.status_code == 200:
|
||
if DEBUG:
|
||
print("✅ [6] Registration successful!")
|
||
return {'success': True, 'data': resp.json()}
|
||
|
||
else:
|
||
if DEBUG:
|
||
print(f"❌ [6] Registration failed: {resp.status_code}")
|
||
|
||
return {
|
||
'success': False,
|
||
'status_code': resp.status_code,
|
||
'error': resp.text
|
||
}
|
||
|
||
def register(self, email: str, password: str) -> Dict:
|
||
"""完整注册流程"""
|
||
|
||
try:
|
||
# Step 1: 通过 ChatGPT web 初始化(获取正确的 session)
|
||
self._step1_init_through_chatgpt(email)
|
||
|
||
# Step 1.5: 设置 Sentinel solver 上下文(native 模式需要)
|
||
prod_version = getattr(self, 'prod_version', 'unknown')
|
||
self.solver.set_context(prod_version=prod_version, ip_info=None)
|
||
|
||
# Step 2: 初始化 Sentinel
|
||
self._step2_init_sentinel()
|
||
self._step2_5_submit_sentinel()
|
||
self._step2_6_solve_pow()
|
||
self._step2_7_submit_pow()
|
||
|
||
# Step 3: 尝试注册(提交邮箱和密码)
|
||
result = self._step3_attempt_register(email, password)
|
||
|
||
# 检查是否需要邮箱验证
|
||
if result and result.get('success') and result.get('requires_verification'):
|
||
|
||
# Step 4: 验证邮箱
|
||
verify_result = self._step4_verify_email(
|
||
mailbox=email,
|
||
continue_url=result['continue_url']
|
||
)
|
||
|
||
if verify_result.get('success'):
|
||
# Step 4.5: 提交个人信息
|
||
personal_info_result = self._step4_5_submit_personal_info()
|
||
|
||
if personal_info_result.get('success'):
|
||
oauth_url = personal_info_result['oauth_url']
|
||
|
||
# Step 4.6: 完成 OAuth 回调流程,获取 session-token
|
||
self._step4_6_complete_oauth_flow(oauth_url)
|
||
|
||
if DEBUG:
|
||
print(f"\n✅ Registration completed successfully!")
|
||
|
||
return {
|
||
'success': True,
|
||
'verified': True,
|
||
'email': email,
|
||
'data': verify_result.get('data')
|
||
}
|
||
else:
|
||
raise Exception(f"Failed to submit personal info: {personal_info_result}")
|
||
|
||
else:
|
||
raise Exception(f"Email verification failed: {verify_result.get('error')}")
|
||
|
||
# 如果直接成功(无需验证)
|
||
elif result and result.get('success'):
|
||
if DEBUG:
|
||
print(f"\n✅ Registration completed (no verification needed)!")
|
||
return result
|
||
|
||
# 如果注册失败(邮箱已被使用等)
|
||
elif result is None:
|
||
if DEBUG:
|
||
print(f"\n❌ Registration failed: Email already taken or invalid")
|
||
return {
|
||
'success': False,
|
||
'error': 'Email already taken or registration rejected'
|
||
}
|
||
|
||
# 其他情况:触发 enforcement challenge(旧逻辑)
|
||
else:
|
||
if DEBUG:
|
||
print(f"\n⚠️ Enforcement challenge triggered")
|
||
|
||
# Step 5: 解决挑战
|
||
sentinel_token = self._step5_solve_challenge(result)
|
||
|
||
# Step 6: 带 token 重新注册
|
||
final_result = self._step6_register_with_token(email, password, sentinel_token)
|
||
|
||
return final_result
|
||
|
||
except Exception as e:
|
||
if DEBUG:
|
||
import traceback
|
||
print(f"\n❌ Registration failed with exception:")
|
||
traceback.print_exc()
|
||
|
||
return {
|
||
'success': False,
|
||
'error': str(e)
|
||
}
|
||
|
||
def register_with_auto_email(self, password: str) -> Dict:
|
||
"""自动生成临时邮箱并完成注册流程
|
||
|
||
Args:
|
||
password: 要设置的密码
|
||
|
||
Returns:
|
||
注册结果(包含邮箱地址)
|
||
"""
|
||
|
||
# 检查是否配置了 TempMailClient
|
||
if not self.tempmail_client:
|
||
return {
|
||
'success': False,
|
||
'error': 'TempMailClient not configured. Please initialize with tempmail_client parameter.'
|
||
}
|
||
|
||
generated_email = None
|
||
|
||
try:
|
||
# Step 0: 生成临时邮箱(domain_index 从配置读取)
|
||
domain_index = TEMPMAIL_CONFIG.get('domain_index', 0) # 默认使用第1个域名
|
||
generated_email = self.tempmail_client.generate_mailbox(domain_index=domain_index)
|
||
|
||
if DEBUG:
|
||
print(f"\n✅ Generated temp email: {generated_email}")
|
||
|
||
# 调用正常的注册流程
|
||
result = self.register(email=generated_email, password=password)
|
||
|
||
# 检查注册结果
|
||
if result.get('success'):
|
||
if DEBUG:
|
||
print(f"\n✅ AUTO REGISTRATION SUCCESS")
|
||
print(f" Email: {generated_email}")
|
||
print(f" Password: {password}")
|
||
|
||
return {
|
||
'success': True,
|
||
'email': generated_email,
|
||
'password': password,
|
||
'verified': result.get('verified', False),
|
||
'data': result.get('data')
|
||
}
|
||
|
||
else:
|
||
# 注册失败 → 删除邮箱
|
||
if DEBUG:
|
||
print(f"\n❌ AUTO REGISTRATION FAILED")
|
||
print(f" Email: {generated_email}")
|
||
print(f" Error: {result.get('error')}")
|
||
|
||
# 删除失败的邮箱
|
||
self.tempmail_client.delete_mailbox(generated_email)
|
||
|
||
return {
|
||
'success': False,
|
||
'email': generated_email,
|
||
'error': result.get('error'),
|
||
'mailbox_deleted': True
|
||
}
|
||
|
||
except Exception as e:
|
||
# 发生异常 → 清理邮箱
|
||
if DEBUG:
|
||
import traceback
|
||
print(f"\n❌ AUTO REGISTRATION EXCEPTION")
|
||
if generated_email:
|
||
print(f" Email: {generated_email}")
|
||
print(f" Exception: {e}")
|
||
traceback.print_exc()
|
||
|
||
# 清理邮箱(如果已生成)
|
||
if generated_email:
|
||
try:
|
||
self.tempmail_client.delete_mailbox(generated_email)
|
||
except:
|
||
pass
|
||
|
||
return {
|
||
'success': False,
|
||
'email': generated_email,
|
||
'error': str(e),
|
||
'mailbox_deleted': True if generated_email else False
|
||
}
|
||
|
||
def add_payment_method(
|
||
self,
|
||
checkout_session_url: str,
|
||
iban: str,
|
||
name: str,
|
||
email: str,
|
||
address_line1: str,
|
||
city: str,
|
||
postal_code: str,
|
||
state: str,
|
||
country: str = "US"
|
||
) -> Dict:
|
||
"""
|
||
为账户添加Stripe支付方式(SEPA)
|
||
|
||
Args:
|
||
checkout_session_url: Stripe checkout session URL
|
||
iban: 德国IBAN账号
|
||
name: 持卡人姓名
|
||
email: 邮箱
|
||
address_line1: 街道地址
|
||
city: 城市
|
||
postal_code: 邮编
|
||
state: 州/省
|
||
country: 国家代码
|
||
|
||
Returns:
|
||
支付结果字典
|
||
"""
|
||
try:
|
||
if DEBUG:
|
||
print(f"\n🔐 [Payment] Starting payment method setup...")
|
||
print(f" Session URL: {checkout_session_url[:60]}...")
|
||
|
||
# 初始化Stripe支付处理器
|
||
payment_handler = StripePaymentHandler(
|
||
checkout_session_url=checkout_session_url,
|
||
http_client=self.http_client
|
||
)
|
||
|
||
# 执行完整支付流程
|
||
success = payment_handler.complete_payment(
|
||
iban=iban,
|
||
name=name,
|
||
email=email,
|
||
address_line1=address_line1,
|
||
city=city,
|
||
postal_code=postal_code,
|
||
state=state,
|
||
country=country
|
||
)
|
||
|
||
if success:
|
||
if DEBUG:
|
||
print(f"\n✅ [Payment] Payment method added successfully!")
|
||
|
||
return {
|
||
'success': True,
|
||
'message': 'Payment method added'
|
||
}
|
||
else:
|
||
if DEBUG:
|
||
print(f"\n❌ [Payment] Failed to add payment method")
|
||
|
||
return {
|
||
'success': False,
|
||
'error': 'Payment setup failed'
|
||
}
|
||
|
||
except Exception as e:
|
||
if DEBUG:
|
||
import traceback
|
||
print(f"\n❌ [Payment] Exception occurred:")
|
||
traceback.print_exc()
|
||
|
||
return {
|
||
'success': False,
|
||
'error': str(e)
|
||
}
|