Files
autoPlus/reference/register.py
2026-01-26 15:04:02 +08:00

1130 lines
39 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# modules/registrar.py (更新版)
"""OpenAI 注册流程控制器"""
import json
from typing import Dict, Optional
import secrets
import random
import uuid
import requests
from urllib.parse import urlparse
from .fingerprint import BrowserFingerprint
from .sentinel_solver import SentinelSolver
from .http_client import HTTPClient
from .stripe_payment import StripePaymentHandler
from config import AUTH_BASE_URL, DEBUG, TEMPMAIL_CONFIG
from modules.pow_solver import ProofOfWorkSolver
from modules.tempmail import TempMailClient
class OpenAIRegistrar:
"""完整的 OpenAI 注册流程"""
def __init__(self, session_id: Optional[str] = None, tempmail_client: Optional[TempMailClient] = None):
self.fingerprint = BrowserFingerprint(session_id)
self.solver = SentinelSolver(self.fingerprint)
self.http_client = HTTPClient(self.fingerprint)
self.pow_solver = ProofOfWorkSolver() # 新增
self.tempmail_client = tempmail_client # 临时邮箱客户端(可选)
def _step1_init_through_chatgpt(self, email: str):
"""通过 ChatGPT web 初始化注册流程"""
# 1.1 访问 ChatGPT 首页
chatgpt_url = "https://chatgpt.com/"
headers = self.http_client.fingerprint.get_headers(host='chatgpt.com')
headers['Accept'] = 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8'
headers['Sec-Fetch-Dest'] = 'document'
headers['Sec-Fetch-Mode'] = 'navigate'
headers['Sec-Fetch-Site'] = 'none'
resp = self.http_client.session.get(
chatgpt_url,
headers=headers,
timeout=30
)
if DEBUG:
print(f"✅ [1.1] Visited ChatGPT ({resp.status_code})")
# 1.2 获取 CSRF token
csrf_url = "https://chatgpt.com/api/auth/csrf"
csrf_headers = headers.copy()
csrf_headers['Accept'] = 'application/json, text/plain, */*'
csrf_headers['Sec-Fetch-Dest'] = 'empty'
csrf_headers['Referer'] = 'https://chatgpt.com/'
resp = self.http_client.session.get(
csrf_url,
headers=csrf_headers,
timeout=30
)
if DEBUG:
print(f"✅ [1.2] CSRF API ({resp.status_code})")
# 优先从 cookie 提取 CSRF token
csrf_token = None
csrf_cookie = self.http_client.cookies.get('__Host-next-auth.csrf-token', '')
if csrf_cookie:
from urllib.parse import unquote
csrf_cookie = unquote(csrf_cookie)
if '|' in csrf_cookie:
csrf_token = csrf_cookie.split('|')[0]
if DEBUG:
print(f"✅ [1.2] CSRF token extracted")
# 备选:从响应 JSON 提取
if not csrf_token and resp.status_code == 200:
try:
data = resp.json()
csrf_token = data.get('csrfToken', '')
if csrf_token and DEBUG:
print(f"✅ [1.2] CSRF token from JSON")
except Exception as e:
if DEBUG:
print(f"⚠️ [1.2] Failed to parse JSON")
if not csrf_token:
if DEBUG:
print(f"❌ [1.2] Failed to obtain CSRF token")
raise Exception(f"Failed to obtain CSRF token")
# 1.3 初始化注册(通过 NextAuth
import uuid
auth_session_logging_id = str(uuid.uuid4())
signin_url = "https://chatgpt.com/api/auth/signin/openai"
signin_params = {
'prompt': 'login',
'ext-oai-did': self.fingerprint.session_id,
'auth_session_logging_id': auth_session_logging_id,
'screen_hint': 'signup', # 明确指定注册
'login_hint': email, # 🔥 关键:传入邮箱
}
signin_data = {
'callbackUrl': 'https://chatgpt.com/',
'csrfToken': csrf_token,
'json': 'true',
}
signin_headers = headers.copy()
signin_headers['Content-Type'] = 'application/x-www-form-urlencoded'
signin_headers['Accept'] = '*/*'
signin_headers['Referer'] = 'https://chatgpt.com/'
signin_headers['Sec-Fetch-Mode'] = 'cors'
signin_headers['Sec-Fetch-Dest'] = 'empty'
resp = self.http_client.session.post(
signin_url,
params=signin_params,
data=signin_data,
headers=signin_headers,
allow_redirects=False,
timeout=30
)
if DEBUG:
print(f"✅ [1.3] NextAuth response ({resp.status_code})")
# 1.4 提取 OAuth URL从 JSON 响应)
if resp.status_code == 200:
try:
try:
data = resp.json()
except Exception:
data = json.loads(resp.text or "{}")
oauth_url = data.get('url')
if not oauth_url:
raise Exception(f"No 'url' in NextAuth response: {data}")
if DEBUG:
print(f"✅ [1.4] Got OAuth URL")
except Exception as e:
raise Exception(f"Failed to parse NextAuth response: {e}")
elif resp.status_code in [301, 302, 303, 307, 308]:
# 旧版流程:直接重定向
oauth_url = resp.headers.get('Location')
if not oauth_url:
if DEBUG:
print(f"❌ [1.4] Got redirect but no Location header")
raise Exception("Got redirect but no Location header")
if DEBUG:
print(f"✅ [1.4] Got OAuth redirect")
else:
raise Exception(f"Unexpected NextAuth response: {resp.status_code}")
# 1.5 访问 OAuth authorize endpoint
auth_headers = self.http_client.fingerprint.get_headers(host='auth.openai.com')
auth_headers['Accept'] = 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8'
auth_headers['Referer'] = 'https://chatgpt.com/'
auth_headers['Sec-Fetch-Dest'] = 'document'
auth_headers['Sec-Fetch-Mode'] = 'navigate'
auth_headers['Sec-Fetch-Site'] = 'cross-site'
resp = self.http_client.session.get(
oauth_url,
headers=auth_headers,
allow_redirects=True, # 自动跟随重定向
timeout=30
)
if DEBUG:
print(f"✅ [1.5] OAuth flow completed ({resp.status_code})")
# 检查必需的 cookies
required_cookies = ['login_session', 'oai-did']
missing = [c for c in required_cookies if c not in self.http_client.cookies]
if missing:
if DEBUG:
print(f"⚠️ Missing cookies: {', '.join(missing)}")
else:
if DEBUG:
print(f"✅ All required cookies present")
# 1.6 确保获取 auth.openai.com 的 CSRF用于后续 /register 请求)
try:
auth_csrf = self.http_client.get_csrf_token(domain="auth.openai.com")
if DEBUG and auth_csrf:
print(f"✅ [1.6] Auth CSRF obtained")
except Exception as e:
if DEBUG:
print(f"⚠️ [1.6] Failed to get auth CSRF")
def _step2_init_sentinel(self):
"""初始化 Sentinel生成 token"""
try:
token_data = self.solver.generate_requirements_token()
self.sentinel_token = json.dumps(token_data)
if DEBUG:
print(f"✅ [2] Sentinel token generated")
except Exception as e:
if DEBUG:
print(f"❌ [2] Sentinel initialization failed: {e}")
raise
def _step2_5_submit_sentinel(self):
"""Step 2.5: 提交 Sentinel token 到 OpenAI"""
# 如果 sentinel_token 是字符串,先解析
if isinstance(self.sentinel_token, str):
token_data = json.loads(self.sentinel_token)
else:
token_data = self.sentinel_token
url = "https://sentinel.openai.com/backend-api/sentinel/req"
payload = {
"p": token_data['p'],
"id": self.fingerprint.session_id,
"flow": "username_password_create"
}
headers = self.http_client.fingerprint.get_headers(host='sentinel.openai.com')
headers['Content-Type'] = 'text/plain;charset=UTF-8' # 注意text/plain
headers['Origin'] = 'https://sentinel.openai.com'
headers['Referer'] = 'https://sentinel.openai.com/backend-api/sentinel/frame.html'
headers['Accept'] = '*/*'
headers['Sec-Fetch-Site'] = 'same-origin'
headers['Sec-Fetch-Mode'] = 'cors'
headers['Sec-Fetch-Dest'] = 'empty'
resp = self.http_client.session.post(
url,
json=payload,
headers=headers,
timeout=30
)
if resp.status_code != 200:
if DEBUG:
print(f"❌ [2.5] Failed to submit Sentinel ({resp.status_code})")
raise Exception(f"Failed to submit Sentinel token: {resp.status_code} {resp.text}")
try:
data = resp.json()
if DEBUG:
print(f"✅ [2.5] Sentinel accepted (persona: {data.get('persona')})")
# 检查是否需要额外验证
if data.get('turnstile', {}).get('required'):
print(f"⚠️ Turnstile required")
if data.get('proofofwork', {}).get('required'):
print(f"⚠️ Proof of Work required (difficulty: {data.get('proofofwork', {}).get('difficulty')})")
# 保存 token可能后续需要
self.sentinel_response = data
return data
except Exception as e:
if DEBUG:
print(f"❌ [2.5] Failed to parse response: {e}")
raise
def _step2_6_solve_pow(self):
"""Step 2.6: 解 Proof of Work"""
pow_data = self.sentinel_response.get('proofofwork', {})
if not pow_data.get('required'):
if DEBUG:
print("⏭️ [2.6] PoW not required, skipping")
return None
seed = pow_data.get('seed')
difficulty = pow_data.get('difficulty')
if not seed or not difficulty:
raise Exception(f"Missing PoW parameters")
# 解 PoW
self.pow_answer = self.pow_solver.solve(seed, difficulty)
if DEBUG:
print(f"✅ [2.6] PoW solved (difficulty: {difficulty})")
return self.pow_answer
def _step2_7_submit_pow(self):
"""Step 2.7: 提交 PoW 答案到 Sentinel"""
url = "https://sentinel.openai.com/backend-api/sentinel/req"
# 再次生成 requirements token或重用之前的
requirements_token = self.sentinel_token.split('"p": "')[1].split('"')[0]
payload = {
"p": requirements_token,
"id": self.fingerprint.session_id,
"answer": self.pow_answer
}
headers = self.http_client.fingerprint.get_headers(host='sentinel.openai.com')
headers['Content-Type'] = 'text/plain;charset=UTF-8'
headers['Origin'] = 'https://sentinel.openai.com'
headers['Referer'] = 'https://sentinel.openai.com/backend-api/sentinel/frame.html'
# 补全 Sec-Fetch-* headers
headers['Sec-Fetch-Dest'] = 'empty'
headers['Sec-Fetch-Mode'] = 'cors'
headers['Sec-Fetch-Site'] = 'same-origin'
resp = self.http_client.session.post(
url,
json=payload,
headers=headers,
timeout=30
)
if resp.status_code != 200:
if DEBUG:
print(f"❌ [2.7] Failed to submit PoW ({resp.status_code})")
raise Exception(f"Failed to submit PoW: {resp.status_code}")
# 解析响应
result = resp.json()
if DEBUG:
print(f"✅ [2.7] PoW accepted")
if 'turnstile' in result:
print(f"⚠️ Turnstile still required")
# 保存最终响应
self.sentinel_response = result
return result
def _step3_attempt_register(self, email: str, password: str) -> Optional[Dict]:
"""尝试注册(不验证邮箱)"""
# 3.2 准备注册请求(正确的 payload
url = "https://auth.openai.com/api/accounts/user/register"
payload = {
"username": email, # 不是 "email"
"password": password,
}
# 3.3 准备 headers完全匹配真实请求
headers = self.http_client.fingerprint.get_headers(host='auth.openai.com')
# 添加 Sentinel token
if hasattr(self, 'sentinel_token') and self.sentinel_token:
headers['Openai-Sentinel-Token'] = self.sentinel_token # 注意大小写
headers.update({
'Content-Type': 'application/json',
'Accept': 'application/json',
'Accept-Language': 'zh-CN,zh;q=0.9',
'Origin': 'https://auth.openai.com',
'Referer': 'https://auth.openai.com/create-account/password', # 注意是 /password
'Sec-Fetch-Dest': 'empty',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Site': 'same-origin',
'Priority': 'u=1, i',
})
# 添加 Datadog tracing headers匹配真实请求
import secrets
headers.update({
'X-Datadog-Trace-Id': str(secrets.randbits(63)),
'X-Datadog-Parent-Id': str(secrets.randbits(63)),
'X-Datadog-Sampling-Priority': '1',
'X-Datadog-Origin': 'rum',
'Traceparent': f'00-0000000000000000{secrets.token_hex(8)}-{secrets.token_hex(8)}-01',
'Tracestate': 'dd=s:1;o:rum',
})
# 3.4 准备 cookies使用所有 auth.openai.com 的 cookies
# 不要手动过滤,让 requests 自动处理域名匹配
# 3.5 发送注册请求
resp = self.http_client.session.post(
url,
json=payload,
headers=headers,
# 不手动指定 cookies让 session 自动处理
timeout=30
)
# 3.6 处理响应
try:
data = resp.json()
if resp.status_code == 200:
if DEBUG:
print(f"✅ [3] Registration successful!")
# 检查是否需要邮箱验证
continue_url = data.get('continue_url', '')
if 'email-otp' in continue_url:
if DEBUG:
print(f"⏭️ [3] Email OTP verification required")
return {
'success': True,
'requires_verification': True,
'continue_url': continue_url,
'method': data.get('method', 'GET'),
'data': data
}
else:
return {'success': True, 'data': data}
elif resp.status_code == 409:
error = data.get('error', {})
error_code = error.get('code', '')
if error_code == 'invalid_state':
# Session 无效
if DEBUG:
print(f"❌ [3] Invalid session")
raise Exception(f"Invalid session: {error}")
elif error_code == 'email_taken' or 'already' in str(error).lower():
# 邮箱已被使用
if DEBUG:
print(f"⚠️ [3] Email already registered")
return None
else:
if DEBUG:
print(f"❌ [3] Registration conflict: {error_code}")
raise Exception(f"Registration conflict: {data}")
elif resp.status_code == 400:
if DEBUG:
print(f"❌ [3] Bad Request: {data}")
raise Exception(f"Bad request: {data}")
else:
if DEBUG:
print(f"❌ [3] Unexpected status: {resp.status_code}")
raise Exception(f"Registration failed with {resp.status_code}: {data}")
except json.JSONDecodeError as e:
if DEBUG:
print(f"❌ [3] Failed to parse JSON response")
raise Exception(f"Invalid JSON response: {e}")
except Exception as e:
raise
def _step5_get_access_token(self) -> str:
"""Step 5: Retrieve access token from authenticated session
This method leverages the existing session cookies (login_session, oai-did,
__Host-next-auth.csrf-token) obtained after successful registration and
email verification.
Returns:
Access token string
Raises:
Exception: If access token retrieval fails
"""
url = "https://chatgpt.com/api/auth/session"
headers = self.http_client.fingerprint.get_headers(host='chatgpt.com')
headers.update({
'Accept': 'application/json',
'Referer': 'https://chatgpt.com/',
'Sec-Fetch-Dest': 'empty',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Site': 'same-origin',
})
try:
resp = self.http_client.session.get(
url,
headers=headers,
timeout=30
)
if resp.status_code != 200:
error_msg = f"Failed to retrieve session: HTTP {resp.status_code}"
if DEBUG:
print(f"❌ [5] {error_msg}")
raise Exception(error_msg)
data = resp.json()
access_token = data.get('accessToken')
if not access_token:
error_msg = "No accessToken in session response"
if DEBUG:
print(f"❌ [5] {error_msg}")
raise Exception(error_msg)
if DEBUG:
print(f"✅ [5] Access token retrieved")
print(f" Token: {access_token[:50]}...")
return access_token
except requests.RequestException as e:
error_msg = f"Network error retrieving access token: {e}"
if DEBUG:
print(f"❌ [5] {error_msg}")
raise Exception(error_msg)
except Exception as e:
if DEBUG:
print(f"❌ [5] Unexpected error: {e}")
raise
def _step4_verify_email(self, mailbox: str, continue_url: str) -> Dict:
"""Step 4: 验证邮箱(通过 OTP 验证码)
Args:
mailbox: 邮箱地址
continue_url: Step 3 返回的 continue_url例如: /email-otp/send
Returns:
验证结果
"""
# 4.0 触发发送验证邮件(访问 continue_url
if continue_url:
# 构造完整 URL
if not continue_url.startswith('http'):
send_url = f"https://auth.openai.com{continue_url}"
else:
send_url = continue_url
# 准备 headers
headers = self.http_client.fingerprint.get_headers(host='auth.openai.com')
headers.update({
'Accept': 'application/json',
'Referer': 'https://auth.openai.com/create-account/password',
'Sec-Fetch-Dest': 'empty',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Site': 'same-origin',
})
# 发送请求(根据 Step 3 返回的 method
resp = self.http_client.session.get(
send_url,
headers=headers,
timeout=30
)
if resp.status_code != 200:
if DEBUG:
print(f"❌ [4.0] Failed to trigger email send ({resp.status_code})")
raise Exception(f"Failed to trigger email send: {resp.status_code} {resp.text}")
elif DEBUG:
print(f"✅ [4.0] Email send triggered")
# 4.1 检查是否有 tempmail_client
if not self.tempmail_client:
raise Exception("No TempMailClient configured. Cannot receive verification emails.")
# 4.2 等待验证邮件
if DEBUG:
print(f"⏳ [4.1] Waiting for verification email...")
email_data = self.tempmail_client.wait_for_email(
mailbox=mailbox,
from_filter=None, # 不过滤发件人(因为临时邮箱可能不返回 from 字段)
subject_filter="chatgpt", # 主题包含 "chatgpt"(匹配 "Your ChatGPT code is..."
timeout=120,
interval=5
)
if not email_data:
if DEBUG:
print(f"❌ [4.1] Timeout: No verification email received")
raise Exception("Timeout: No verification email received")
# 4.3 提取验证码
verification_code = self.tempmail_client.extract_verification_code(email_data)
if not verification_code:
# 尝试提取验证链接(备选方案)
verification_link = self.tempmail_client.extract_verification_link(email_data)
if verification_link:
if DEBUG:
print(f"❌ [4.3] Link-based verification not implemented")
raise NotImplementedError("Link-based verification not implemented yet")
else:
if DEBUG:
print(f"❌ [4.3] Failed to extract verification code")
raise Exception("Failed to extract verification code or link from email")
if DEBUG:
print(f"✅ [4.3] Got verification code: {verification_code}")
# 4.4 提交验证码
# 使用正确的验证接口
verify_url = "https://auth.openai.com/api/accounts/email-otp/validate"
# 准备 payload
payload = {
"code": verification_code,
}
# 准备 headers
headers = self.http_client.fingerprint.get_headers(host='auth.openai.com')
headers.update({
'Content-Type': 'application/json',
'Accept': 'application/json',
'Origin': 'https://auth.openai.com',
'Referer': 'https://auth.openai.com/email-verification',
'Sec-Fetch-Dest': 'empty',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Site': 'same-origin',
'Priority': 'u=1, i',
})
# 添加 Datadog tracing headers
import secrets
headers.update({
'X-Datadog-Trace-Id': str(secrets.randbits(63)),
'X-Datadog-Parent-Id': str(secrets.randbits(63)),
'X-Datadog-Sampling-Priority': '1',
'X-Datadog-Origin': 'rum',
'Traceparent': f'00-0000000000000000{secrets.token_hex(8)}-{secrets.token_hex(8)}-01',
'Tracestate': 'dd=s:1;o:rum',
})
# 发送验证请求
resp = self.http_client.session.post(
verify_url,
json=payload,
headers=headers,
timeout=30
)
# 处理响应
try:
data = resp.json()
if resp.status_code == 200:
if DEBUG:
print(f"✅ [4.4] Email verified successfully!")
print(f"📋 [4.4] Response data: {json.dumps(data, indent=2)}")
return {
'success': True,
'verified': True,
'data': data
}
else:
if DEBUG:
print(f"❌ [4.4] Verification failed: {data}")
return {
'success': False,
'error': data
}
except Exception as e:
if DEBUG:
print(f"❌ [4.4] Exception: {e}")
raise
def _step4_5_submit_personal_info(self) -> Dict:
"""Step 4.5: 提交个人信息(姓名、生日)完成账号设置
Returns:
包含 OAuth callback URL 的字典
"""
url = "https://auth.openai.com/api/accounts/create_account"
# 生成随机姓名和生日
from modules.data_generator import NameGenerator
random_name = NameGenerator.generate_full_name()
# 生成随机生日(1980-2000年之间)
year = random.randint(1980, 2000)
month = random.randint(1, 12)
day = random.randint(1, 28) # 保险起见,避免2月29日等边界情况
birthdate = f"{year}-{month:02d}-{day:02d}"
payload = {
"name": random_name,
"birthdate": birthdate
}
# 准备 headers
headers = self.http_client.fingerprint.get_headers(host='auth.openai.com')
headers.update({
'Content-Type': 'application/json',
'Accept': 'application/json',
'Origin': 'https://auth.openai.com',
'Referer': 'https://auth.openai.com/about-you',
'Sec-Fetch-Dest': 'empty',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Site': 'same-origin',
'Priority': 'u=1, i',
})
# 添加 Datadog tracing headers
import secrets
headers.update({
'X-Datadog-Trace-Id': str(secrets.randbits(63)),
'X-Datadog-Parent-Id': str(secrets.randbits(63)),
'X-Datadog-Sampling-Priority': '1',
'X-Datadog-Origin': 'rum',
'Traceparent': f'00-0000000000000000{secrets.token_hex(8)}-{secrets.token_hex(8)}-01',
'Tracestate': 'dd=s:1;o:rum',
})
# 发送请求
resp = self.http_client.session.post(
url,
json=payload,
headers=headers,
timeout=30
)
# 处理响应
try:
data = resp.json()
if resp.status_code == 200:
oauth_url = data.get('continue_url')
if DEBUG:
print(f"✅ [4.5] Personal info submitted")
print(f" Name: {random_name}")
print(f" Birthdate: {birthdate}")
if oauth_url:
return {
'success': True,
'oauth_url': oauth_url,
'method': data.get('method', 'GET')
}
else:
raise Exception(f"No continue_url in response: {data}")
else:
if DEBUG:
print(f"❌ [4.5] Failed to submit personal info: {data}")
raise Exception(f"Failed to submit personal info: {resp.status_code} {data}")
except Exception as e:
if DEBUG:
print(f"❌ [4.5] Exception: {e}")
raise
def _step4_6_complete_oauth_flow(self, oauth_url: str):
"""Step 4.6: 完成 OAuth 回调流程,获取 session-token
这一步会自动跟随重定向链:
1. GET oauth_url → 302 to /api/accounts/consent
2. GET /api/accounts/consent → 302 to chatgpt.com/api/auth/callback/openai
3. GET chatgpt.com/api/auth/callback/openai → 最终生成 session-token
Args:
oauth_url: Step 4.5 返回的 OAuth URL
"""
# 确保是完整 URL
if not oauth_url.startswith('http'):
oauth_url = f"https://auth.openai.com{oauth_url}"
# 准备 headers
headers = self.http_client.fingerprint.get_headers(host='auth.openai.com')
headers.update({
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Referer': 'https://auth.openai.com/about-you',
'Sec-Fetch-Dest': 'document',
'Sec-Fetch-Mode': 'navigate',
'Sec-Fetch-Site': 'same-origin',
'Upgrade-Insecure-Requests': '1',
})
# 发送请求,允许自动跟随重定向
# requests.Session 会自动处理跨域重定向(auth.openai.com → chatgpt.com)
resp = self.http_client.session.get(
oauth_url,
headers=headers,
allow_redirects=True, # 自动跟随所有重定向
timeout=30
)
# 检查是否获取到 session-token
session_token = None
# curl_cffi 的 cookies 是字典格式requests 的是 CookieJar
cookies = self.http_client.session.cookies
# 尝试字典访问curl_cffi
if hasattr(cookies, 'get'):
session_token = cookies.get('__Secure-next-auth.session-token')
if session_token:
if DEBUG:
print(f"✅ [4.6] OAuth flow completed")
print(f" Got session-token: {session_token[:50]}...")
return True
else:
if DEBUG:
print(f"❌ [4.6] OAuth flow completed but no session-token found")
print(f" Final URL: {resp.url}")
print(f" Status: {resp.status_code}")
raise Exception("OAuth flow completed but no session-token cookie was set")
def _step5_solve_challenge(self, challenge: Dict) -> str:
"""解决 enforcement 挑战"""
sentinel_token = self.solver.solve_enforcement(challenge)
return sentinel_token
def _step6_register_with_token(self, email: str, password: str,
sentinel_token: str) -> Dict:
"""带 Sentinel token 重新注册"""
url = f"{AUTH_BASE_URL}/api/accounts/user/register"
payload = {
'username': email,
'password': password,
'client_id': 'sentinel'
}
resp = self.http_client.post(
url,
json_data=payload,
sentinel_token=sentinel_token
)
if resp.status_code == 200:
if DEBUG:
print("✅ [6] Registration successful!")
return {'success': True, 'data': resp.json()}
else:
if DEBUG:
print(f"❌ [6] Registration failed: {resp.status_code}")
return {
'success': False,
'status_code': resp.status_code,
'error': resp.text
}
def register(self, email: str, password: str) -> Dict:
"""完整注册流程"""
try:
# Step 1: 通过 ChatGPT web 初始化(获取正确的 session
self._step1_init_through_chatgpt(email)
# Step 2: 初始化 Sentinel
self._step2_init_sentinel()
self._step2_5_submit_sentinel()
self._step2_6_solve_pow()
self._step2_7_submit_pow()
# Step 3: 尝试注册(提交邮箱和密码)
result = self._step3_attempt_register(email, password)
# 检查是否需要邮箱验证
if result and result.get('success') and result.get('requires_verification'):
# Step 4: 验证邮箱
verify_result = self._step4_verify_email(
mailbox=email,
continue_url=result['continue_url']
)
if verify_result.get('success'):
# Step 4.5: 提交个人信息
personal_info_result = self._step4_5_submit_personal_info()
if personal_info_result.get('success'):
oauth_url = personal_info_result['oauth_url']
# Step 4.6: 完成 OAuth 回调流程,获取 session-token
self._step4_6_complete_oauth_flow(oauth_url)
if DEBUG:
print(f"\n✅ Registration completed successfully!")
return {
'success': True,
'verified': True,
'email': email,
'data': verify_result.get('data')
}
else:
raise Exception(f"Failed to submit personal info: {personal_info_result}")
else:
raise Exception(f"Email verification failed: {verify_result.get('error')}")
# 如果直接成功(无需验证)
elif result and result.get('success'):
if DEBUG:
print(f"\n✅ Registration completed (no verification needed)!")
return result
# 如果注册失败(邮箱已被使用等)
elif result is None:
if DEBUG:
print(f"\n❌ Registration failed: Email already taken or invalid")
return {
'success': False,
'error': 'Email already taken or registration rejected'
}
# 其他情况:触发 enforcement challenge旧逻辑
else:
if DEBUG:
print(f"\n⚠️ Enforcement challenge triggered")
# Step 5: 解决挑战
sentinel_token = self._step5_solve_challenge(result)
# Step 6: 带 token 重新注册
final_result = self._step6_register_with_token(email, password, sentinel_token)
return final_result
except Exception as e:
if DEBUG:
import traceback
print(f"\n❌ Registration failed with exception:")
traceback.print_exc()
return {
'success': False,
'error': str(e)
}
def register_with_auto_email(self, password: str) -> Dict:
"""自动生成临时邮箱并完成注册流程
Args:
password: 要设置的密码
Returns:
注册结果(包含邮箱地址)
"""
# 检查是否配置了 TempMailClient
if not self.tempmail_client:
return {
'success': False,
'error': 'TempMailClient not configured. Please initialize with tempmail_client parameter.'
}
generated_email = None
try:
# Step 0: 生成临时邮箱(domain_index 从配置读取)
domain_index = TEMPMAIL_CONFIG.get('domain_index', 0) # 默认使用第1个域名
generated_email = self.tempmail_client.generate_mailbox(domain_index=domain_index)
if DEBUG:
print(f"\n✅ Generated temp email: {generated_email}")
# 调用正常的注册流程
result = self.register(email=generated_email, password=password)
# 检查注册结果
if result.get('success'):
if DEBUG:
print(f"\n✅ AUTO REGISTRATION SUCCESS")
print(f" Email: {generated_email}")
print(f" Password: {password}")
return {
'success': True,
'email': generated_email,
'password': password,
'verified': result.get('verified', False),
'data': result.get('data')
}
else:
# 注册失败 → 删除邮箱
if DEBUG:
print(f"\n❌ AUTO REGISTRATION FAILED")
print(f" Email: {generated_email}")
print(f" Error: {result.get('error')}")
# 删除失败的邮箱
self.tempmail_client.delete_mailbox(generated_email)
return {
'success': False,
'email': generated_email,
'error': result.get('error'),
'mailbox_deleted': True
}
except Exception as e:
# 发生异常 → 清理邮箱
if DEBUG:
import traceback
print(f"\n❌ AUTO REGISTRATION EXCEPTION")
if generated_email:
print(f" Email: {generated_email}")
print(f" Exception: {e}")
traceback.print_exc()
# 清理邮箱(如果已生成)
if generated_email:
try:
self.tempmail_client.delete_mailbox(generated_email)
except:
pass
return {
'success': False,
'email': generated_email,
'error': str(e),
'mailbox_deleted': True if generated_email else False
}
def add_payment_method(
self,
checkout_session_url: str,
iban: str,
name: str,
email: str,
address_line1: str,
city: str,
postal_code: str,
state: str,
country: str = "US"
) -> Dict:
"""
为账户添加Stripe支付方式SEPA
Args:
checkout_session_url: Stripe checkout session URL
iban: 德国IBAN账号
name: 持卡人姓名
email: 邮箱
address_line1: 街道地址
city: 城市
postal_code: 邮编
state: 州/省
country: 国家代码
Returns:
支付结果字典
"""
try:
if DEBUG:
print(f"\n🔐 [Payment] Starting payment method setup...")
print(f" Session URL: {checkout_session_url[:60]}...")
# 初始化Stripe支付处理器
payment_handler = StripePaymentHandler(
checkout_session_url=checkout_session_url,
http_client=self.http_client
)
# 执行完整支付流程
success = payment_handler.complete_payment(
iban=iban,
name=name,
email=email,
address_line1=address_line1,
city=city,
postal_code=postal_code,
state=state,
country=country
)
if success:
if DEBUG:
print(f"\n✅ [Payment] Payment method added successfully!")
return {
'success': True,
'message': 'Payment method added'
}
else:
if DEBUG:
print(f"\n❌ [Payment] Failed to add payment method")
return {
'success': False,
'error': 'Payment setup failed'
}
except Exception as e:
if DEBUG:
import traceback
print(f"\n❌ [Payment] Exception occurred:")
traceback.print_exc()
return {
'success': False,
'error': str(e)
}