Files
autoPlus/core/flow.py
2026-01-26 15:04:02 +08:00

590 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
OpenAI 账号注册流程编排模块
完整的注册流程实现,包含以下步骤:
1. 初始化会话(访问主页 + API providers
2. 获取 CSRF Token
3. OAuth 流程(跳转到 auth.openai.com
4. Sentinel 握手(获取 Token
5. 提交注册信息(邮箱 + 密码)
6. 触发邮件验证(可能遇到 Cloudflare 403
7. 提交 OTP 验证码
8. 完成用户信息(姓名 + 生日)
参考文档: /home/carry/myprj/gptAutoPlus/docs/开发文档.md
"""
from typing import Dict, Any, Optional
import secrets
import random
import asyncio
import json
from core.session import (
OAISession,
CloudflareBlockError,
SessionInvalidError,
RateLimitError
)
from core.sentinel import SentinelHandler
from core.challenge import CloudflareSolver
from utils.mail_box import MailHandler
from utils.crypto import generate_random_password
from utils.logger import logger
class RegisterFlow:
"""
OpenAI 账号注册流程编排器
负责协调各个组件,按照正确的顺序执行注册流程
"""
# OpenAI 相关 URL
CHATGPT_HOME = "https://chatgpt.com/"
CHATGPT_PROVIDERS = "https://chatgpt.com/api/auth/providers"
CHATGPT_CSRF = "https://chatgpt.com/api/auth/csrf"
CHATGPT_SIGNIN = "https://chatgpt.com/api/auth/signin/openai"
AUTH_CREATE_ACCOUNT = "https://auth.openai.com/create-account/password"
AUTH_REGISTER = "https://auth.openai.com/api/accounts/user/register"
AUTH_SEND_OTP = "https://auth.openai.com/api/accounts/email-otp/send"
AUTH_VALIDATE_OTP = "https://auth.openai.com/api/accounts/email-otp/validate"
AUTH_COMPLETE_PROFILE = "https://auth.openai.com/api/accounts/create_account"
def __init__(
self,
session: OAISession,
config,
email: Optional[str] = None,
password: Optional[str] = None
):
"""
初始化注册流程
参数:
session: OAISession 实例(已配置 TLS 指纹和代理)
config: AppConfig 配置对象
email: 注册邮箱(可选,不提供则自动生成)
password: 密码(可选,不提供则自动生成)
"""
self.s = session
self.config = config
self.email = email or self._generate_email()
self.password = password or generate_random_password()
# 初始化子模块
self.sentinel = SentinelHandler(session)
self.mail = MailHandler.create(
config.mail.to_dict() if config.mail.enabled else None
)
self.cloudflare_solver = CloudflareSolver()
# 流程状态
self.csrf_token: Optional[str] = None
self.sentinel_token: Optional[Dict[str, Any]] = None
self.otp: Optional[str] = None
logger.info(
f"RegisterFlow initialized for {self.email} "
f"(oai-did: {self.s.oai_did})"
)
async def run(self) -> Dict[str, Any]:
"""
执行完整注册流程
返回:
注册结果字典,包含:
- email: 注册邮箱
- password: 密码
- status: 状态 ("success", "failed", "pending_otp", etc.)
- error: 错误信息(如果失败)
- message: 额外信息
"""
try:
logger.info(f"[{self.email}] Starting registration flow")
# Step 0: 如果使用 CloudMail确保邮箱账户存在
if self.config.mail.enabled and self.config.mail.type == "cloudmail":
try:
from utils.mail_box import CloudMailHandler
if isinstance(self.mail, CloudMailHandler):
logger.info(f"[{self.email}] Step 0: Ensuring email account exists in CloudMail")
await self.mail.ensure_email_exists(self.email)
logger.info(f"[{self.email}] ✓ Email account ready")
except Exception as e:
logger.warning(f"[{self.email}] Failed to create CloudMail account: {e}")
# 继续执行,可能邮箱已经存在
# Step 1: 初始化会话
await self._step1_init_session()
# Step 2: 获取 CSRF Token
await self._step2_get_csrf_token()
# Step 3: OAuth 流程
await self._step3_oauth_signin()
# Step 4: Sentinel 握手
await self._step4_get_sentinel_token()
# Step 5: 提交注册信息
await self._step5_submit_registration()
# Step 6: 触发邮件验证
await self._step6_send_email_otp()
# Step 7: 提交 OTP
await self._step7_submit_otp()
# Step 8: 完成用户信息
await self._step8_complete_profile()
# 注册成功
logger.success(f"[{self.email}] Registration completed successfully! ✅")
return {
"email": self.email,
"password": self.password,
"oai_did": self.s.oai_did,
"status": "success",
"message": "Account registered successfully"
}
except CloudflareBlockError as e:
logger.error(f"[{self.email}] Cloudflare blocked: {e}")
return {
"email": self.email,
"password": self.password,
"status": "cloudflare_blocked",
"error": str(e),
"message": "Encountered Cloudflare challenge. Consider using residential proxy or solver."
}
except SessionInvalidError as e:
logger.error(f"[{self.email}] Session invalid (409): {e}")
return {
"email": self.email,
"password": self.password,
"status": "session_invalid",
"error": str(e),
"message": "Session conflict. CSRF token expired. Retry recommended."
}
except RateLimitError as e:
logger.error(f"[{self.email}] Rate limited (429): {e}")
return {
"email": self.email,
"password": self.password,
"status": "rate_limited",
"error": str(e),
"message": "Rate limit exceeded. Wait and retry with different IP."
}
except NotImplementedError as e:
logger.warning(f"[{self.email}] Feature not implemented: {e}")
return {
"email": self.email,
"password": self.password,
"status": "pending_manual",
"error": str(e),
"message": "Partial success. User needs to complete manual steps (Sentinel or OTP)."
}
except Exception as e:
logger.exception(f"[{self.email}] Unexpected error during registration")
return {
"email": self.email,
"password": self.password,
"status": "failed",
"error": str(e),
"message": f"Registration failed: {type(e).__name__}"
}
async def _step1_init_session(self):
"""
Step 1: 初始化会话
访问 ChatGPT 主页和 API providers 端点,建立基础会话
"""
logger.info(f"[{self.email}] Step 1: Initializing session")
# 访问主页
resp = self.s.get(self.CHATGPT_HOME)
logger.debug(f" - GET {self.CHATGPT_HOME}: {resp.status_code}")
# 获取 auth providers
resp = self.s.get(self.CHATGPT_PROVIDERS)
logger.debug(f" - GET {self.CHATGPT_PROVIDERS}: {resp.status_code}")
logger.info(f"[{self.email}] ✓ Session initialized")
async def _step2_get_csrf_token(self):
"""
Step 2: 获取 CSRF Token
CSRF Token 用于后续的 OAuth 登录流程
"""
logger.info(f"[{self.email}] Step 2: Getting CSRF token")
resp = self.s.get(self.CHATGPT_CSRF)
if resp.status_code != 200:
raise RuntimeError(f"Failed to get CSRF token: {resp.status_code}")
data = resp.json()
self.csrf_token = data.get("csrfToken")
if not self.csrf_token:
raise RuntimeError(f"CSRF token not found in response: {data}")
logger.info(f"[{self.email}] ✓ CSRF token obtained: {self.csrf_token[:20]}...")
async def _step3_oauth_signin(self):
"""
Step 3: OAuth 登录流程
启动 OAuth 流程,跳转到 auth.openai.com
确保获取所有必要的 session cookies
"""
logger.info(f"[{self.email}] Step 3: Starting OAuth flow")
# 生成 auth_session_logging_id
import uuid
auth_session_logging_id = str(uuid.uuid4())
# 发起 OAuth signin 请求(添加关键参数)
signin_params = {
'prompt': 'login',
'ext-oai-did': self.s.oai_did,
'auth_session_logging_id': auth_session_logging_id,
'screen_hint': 'signup', # 🔥 明确指定注册
'login_hint': self.email, # 🔥 传入邮箱
}
payload = {
"callbackUrl": "/",
"csrfToken": self.csrf_token,
"json": "true"
}
resp = self.s.post(
self.CHATGPT_SIGNIN,
params=signin_params, # ✅ 添加 URL 参数
data=payload,
headers={"Content-Type": "application/x-www-form-urlencoded"}
)
if resp.status_code != 200:
raise RuntimeError(f"OAuth signin failed: {resp.status_code} - {resp.text[:200]}")
data = resp.json()
auth_url = data.get("url")
if not auth_url:
raise RuntimeError(f"OAuth URL not found in response: {data}")
logger.debug(f" - OAuth URL: {auth_url[:100]}...")
# 访问 OAuth 跳转链接(建立 auth.openai.com 会话)
# 这一步会设置关键的 cookies: login_session, oai-client-auth-session, auth_provider 等
resp = self.s.get(auth_url, allow_redirects=True)
logger.debug(f" - GET OAuth URL: {resp.status_code}")
logger.debug(f" - Final URL after redirects: {resp.url}")
# 检查关键 cookies 是否已设置
important_cookies = ["login_session", "oai-client-auth-session"]
cookies_status = {
cookie: cookie in self.s.client.cookies
for cookie in important_cookies
}
logger.debug(f" - Cookies status: {cookies_status}")
# 访问注册页面
resp = self.s.get(self.AUTH_CREATE_ACCOUNT)
logger.debug(f" - GET create-account page: {resp.status_code}")
logger.info(f"[{self.email}] ✓ OAuth flow completed, redirected to auth.openai.com")
async def _step4_get_sentinel_token(self):
"""
Step 4: Sentinel 握手
获取 Sentinel Token 用于提交注册信息
✅ 已集成完整的 Sentinel 解决方案
"""
logger.info(f"[{self.email}] Step 4: Getting Sentinel token")
try:
self.sentinel_token = await self.sentinel.get_token(
flow="username_password_create"
)
logger.info(f"[{self.email}] ✓ Sentinel token obtained: {str(self.sentinel_token)[:50]}...")
except (NotImplementedError, ImportError) as e:
logger.error(
f"[{self.email}] ❌ Sentinel solver not available: {e}"
)
# 重新抛出异常,让调用方知道需要修复
raise
async def _step5_submit_registration(self):
"""
Step 5: 提交注册信息
POST /api/accounts/user/register
提交用户名(邮箱)、密码Sentinel Token 放在 Header 中
"""
logger.info(f"[{self.email}] Step 5: Submitting registration")
# 请求 Bodyusername 和 password
payload = {
"username": self.email, # ✅ 改为 username
"password": self.password
}
# Sentinel Token 作为 Header 传递JSON 字符串格式)
headers = {
"Content-Type": "application/json",
"Accept": "application/json",
"Origin": "https://auth.openai.com",
"Referer": self.AUTH_CREATE_ACCOUNT,
"Openai-Sentinel-Token": json.dumps(self.sentinel_token), # ✅ 注意大小写
"Sec-Fetch-Dest": "empty",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Site": "same-origin",
"Priority": "u=1, i",
}
# 添加 Datadog tracing headers模拟真实浏览器
headers.update({
"X-Datadog-Trace-Id": str(secrets.randbits(63)),
"X-Datadog-Parent-Id": str(secrets.randbits(63)),
"X-Datadog-Sampling-Priority": "1",
"X-Datadog-Origin": "rum",
"Traceparent": f"00-0000000000000000{secrets.token_hex(8)}-{secrets.token_hex(8)}-01",
"Tracestate": "dd=s:1;o:rum",
})
resp = self.s.post(
self.AUTH_REGISTER,
json=payload,
headers=headers
)
if resp.status_code != 200:
error_msg = resp.text[:300]
logger.error(f" - Registration failed: {resp.status_code} - {error_msg}")
# 检查常见错误
if "email already exists" in resp.text.lower():
raise ValueError(f"Email already registered: {self.email}")
elif "invalid token" in resp.text.lower():
raise ValueError("Invalid Sentinel token. Check your Sentinel solver.")
else:
raise RuntimeError(f"Registration submission failed: {resp.status_code} - {error_msg}")
logger.info(f"[{self.email}] ✓ Registration info submitted successfully")
async def _step6_send_email_otp(self):
"""
Step 6: 触发邮件验证
POST /api/accounts/email-otp/send
触发 OpenAI 发送 OTP 验证码到注册邮箱
⚠️ 此步骤最容易触发 Cloudflare 403
"""
logger.info(f"[{self.email}] Step 6: Sending email OTP")
resp = self.s.post(
self.AUTH_SEND_OTP,
json={},
headers={
"Content-Type": "application/json",
"Referer": self.AUTH_CREATE_ACCOUNT
}
)
# 检查 Cloudflare 拦截
if resp.status_code == 403:
if CloudflareSolver.detect_challenge(resp):
logger.error(f"[{self.email}] ⚠️ Cloudflare challenge detected at OTP send")
raise CloudflareBlockError(
"Cloudflare Turnstile challenge triggered when sending OTP. "
"Recommendations: use residential proxy or integrate captcha solver."
)
if resp.status_code != 200:
raise RuntimeError(
f"Email OTP send failed: {resp.status_code} - {resp.text[:200]}"
)
logger.info(f"[{self.email}] ✓ OTP email sent successfully")
async def _step7_submit_otp(self):
"""
Step 7: 提交 OTP 验证码
等待邮件接收 OTP然后提交验证
⚠️ 用户需要配置邮箱服务
"""
logger.info(f"[{self.email}] Step 7: Waiting for OTP")
try:
# 等待 OTP最多 5 分钟)
self.otp = await self.mail.wait_for_otp(
email=self.email,
timeout=300
)
logger.info(f"[{self.email}] ✓ OTP received: {self.otp}")
except NotImplementedError:
logger.warning(
f"[{self.email}] ⚠️ Mail handler not configured, cannot retrieve OTP"
)
# 重新抛出异常,让调用方知道需要手动输入 OTP
raise
except TimeoutError:
logger.error(f"[{self.email}] ⏱ Timeout waiting for OTP")
raise TimeoutError(
f"Timeout waiting for OTP email (5 minutes). "
f"Please check email: {self.email}"
)
# 提交 OTP
payload = {"code": self.otp}
resp = self.s.post(
self.AUTH_VALIDATE_OTP,
json=payload,
headers={
"Content-Type": "application/json",
"Referer": self.AUTH_CREATE_ACCOUNT
}
)
if resp.status_code != 200:
error_msg = resp.text[:200]
logger.error(f" - OTP validation failed: {resp.status_code} - {error_msg}")
if "invalid code" in resp.text.lower():
raise ValueError(f"Invalid OTP code: {self.otp}")
else:
raise RuntimeError(f"OTP validation failed: {resp.status_code} - {error_msg}")
logger.info(f"[{self.email}] ✓ OTP validated successfully")
async def _step8_complete_profile(self):
"""
Step 8: 完成用户信息
POST /api/accounts/create_account
提交姓名和生日,完成注册
"""
logger.info(f"[{self.email}] Step 8: Completing profile")
name = self._generate_name()
birthdate = self._generate_birthdate()
payload = {
"name": name,
"birthdate": birthdate
}
resp = self.s.post(
self.AUTH_COMPLETE_PROFILE,
json=payload,
headers={
"Content-Type": "application/json",
"Referer": self.AUTH_CREATE_ACCOUNT
}
)
if resp.status_code != 200:
raise RuntimeError(
f"Profile completion failed: {resp.status_code} - {resp.text[:200]}"
)
logger.info(
f"[{self.email}] ✓ Profile completed: name={name}, birthdate={birthdate}"
)
# ========== 辅助方法 ==========
def _generate_email(self) -> str:
"""
生成随机邮箱
如果启用了 CloudMail 且配置了域名,使用 CloudMail 域名
否则使用 example.com需要用户替换
返回:
邮箱地址
"""
random_part = secrets.token_hex(8)
# 如果使用 CloudMail 且配置了域名,使用真实域名
if self.config.mail.enabled and self.config.mail.type == "cloudmail":
domain = self.config.mail.cloudmail_domain
if domain:
return f"user_{random_part}@{domain}"
# 默认使用 example.com用户应该替换为实际域名
logger.warning(
f"Using example.com domain. Please configure MAIL_CLOUDMAIL_DOMAIN "
f"or replace with your actual domain."
)
return f"user_{random_part}@example.com"
def _generate_name(self) -> str:
"""
生成随机姓名
返回:
姓名字符串
"""
first_names = [
"James", "John", "Robert", "Michael", "William",
"David", "Richard", "Joseph", "Thomas", "Charles",
"Mary", "Patricia", "Jennifer", "Linda", "Elizabeth",
"Barbara", "Susan", "Jessica", "Sarah", "Karen"
]
last_names = [
"Smith", "Johnson", "Williams", "Brown", "Jones",
"Garcia", "Miller", "Davis", "Rodriguez", "Martinez",
"Hernandez", "Lopez", "Gonzalez", "Wilson", "Anderson",
"Thomas", "Taylor", "Moore", "Jackson", "Martin"
]
first = random.choice(first_names)
last = random.choice(last_names)
return f"{first} {last}"
def _generate_birthdate(self) -> str:
"""
生成随机生日1980-2002 年,确保满 18 岁)
返回:
日期字符串,格式: YYYY-MM-DD
"""
year = random.randint(1980, 2002)
month = random.randint(1, 12)
# 避免 2 月 29/30/31 日等无效日期
if month == 2:
day = random.randint(1, 28)
elif month in [4, 6, 9, 11]:
day = random.randint(1, 30)
else:
day = random.randint(1, 31)
return f"{year}-{month:02d}-{day:02d}"
# 导出主要接口
__all__ = ["RegisterFlow"]