Files
autoPlus/core/flow.py
2026-01-29 19:16:01 +08:00

364 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""OpenAI 账号注册流程编排模块"""
from typing import Dict, Any, Optional
import secrets
import random
import json
import uuid
from core.session import OAISession, CloudflareBlockError, SessionInvalidError
from core.sentinel_handler import SentinelHandler
from core.challenge import CloudflareSolver
from utils.mail_box import MailHandler
from utils.crypto import generate_random_password
from utils.logger import logger
class RegisterFlow:
"""OpenAI 账号注册流程编排器"""
CHATGPT_HOME = "https://chatgpt.com/"
CHATGPT_PROVIDERS = "https://chatgpt.com/api/auth/providers"
CHATGPT_CSRF = "https://chatgpt.com/api/auth/csrf"
CHATGPT_SIGNIN = "https://chatgpt.com/api/auth/signin/openai"
AUTH_CREATE_ACCOUNT = "https://auth.openai.com/create-account/password"
AUTH_REGISTER = "https://auth.openai.com/api/accounts/user/register"
AUTH_SEND_OTP = "https://auth.openai.com/api/accounts/email-otp/send"
AUTH_VALIDATE_OTP = "https://auth.openai.com/api/accounts/email-otp/validate"
AUTH_COMPLETE_PROFILE = "https://auth.openai.com/api/accounts/create_account"
AUTH_CONSENT = "https://auth.openai.com/api/accounts/consent"
CHATGPT_SESSION = "https://chatgpt.com/api/auth/session"
def __init__(self, session: OAISession, config, email: Optional[str] = None, password: Optional[str] = None):
self.s = session
self.config = config
self.email = email or self._generate_email()
self.password = password or generate_random_password()
self.sentinel = SentinelHandler(session)
self.mail = MailHandler.create(config.mail.to_dict() if config.mail.enabled else None)
self.cloudflare_solver = CloudflareSolver()
self.csrf_token: Optional[str] = None
self.sentinel_token: Optional[Dict[str, Any]] = None
self.otp: Optional[str] = None
self.oauth_callback_url: Optional[str] = None
logger.info(f"RegisterFlow initialized for {self.email} (oai-did: {self.s.oai_did})")
async def run(self) -> Dict[str, Any]:
"""执行完整注册流程"""
try:
logger.info(f"[{self.email}] Starting registration flow")
# Step 0: CloudMail 邮箱账户
if self.config.mail.enabled and self.config.mail.type == "cloudmail":
try:
from utils.mail_box import CloudMailHandler
if isinstance(self.mail, CloudMailHandler):
logger.info(f"[{self.email}] Step 0: Ensuring email account exists")
await self.mail.ensure_email_exists(self.email)
logger.info(f"[{self.email}] ✓ Email account ready")
except Exception as e:
logger.warning(f"[{self.email}] Failed to create CloudMail account: {e}")
await self._step1_init_session()
await self._step2_get_csrf_token()
await self._step3_oauth_signin()
await self._step4_get_sentinel_token()
await self._step5_submit_registration()
await self._step6_send_email_otp()
await self._step7_submit_otp()
await self._step8_complete_profile()
logger.success(f"[{self.email}] Registration completed successfully! ✅")
return {
"email": self.email,
"password": self.password,
"oai_did": self.s.oai_did,
"status": "success",
"message": "Account registered successfully"
}
except CloudflareBlockError as e:
logger.error(f"[{self.email}] Cloudflare blocked: {e}")
return {"email": self.email, "password": self.password, "status": "cloudflare_blocked", "error": str(e)}
except SessionInvalidError as e:
logger.error(f"[{self.email}] Session invalid: {e}")
return {"email": self.email, "password": self.password, "status": "session_invalid", "error": str(e)}
except NotImplementedError as e:
logger.warning(f"[{self.email}] Feature not implemented: {e}")
return {"email": self.email, "password": self.password, "status": "pending_manual", "error": str(e)}
except Exception as e:
logger.exception(f"[{self.email}] Unexpected error")
return {"email": self.email, "password": self.password, "status": "failed", "error": str(e)}
async def _step1_init_session(self):
"""Step 1: 初始化会话"""
logger.info(f"[{self.email}] Step 1: Initializing session")
self.s.get(self.CHATGPT_HOME)
self.s.get(self.CHATGPT_PROVIDERS)
logger.info(f"[{self.email}] ✓ Session initialized")
async def _step2_get_csrf_token(self):
"""Step 2: 获取 CSRF Token"""
logger.info(f"[{self.email}] Step 2: Getting CSRF token")
resp = self.s.get(self.CHATGPT_CSRF)
if resp.status_code != 200:
raise RuntimeError(f"Failed to get CSRF token: {resp.status_code}")
self.csrf_token = resp.json().get("csrfToken")
if not self.csrf_token:
raise RuntimeError("CSRF token not found")
logger.info(f"[{self.email}] ✓ CSRF token obtained")
async def _step3_oauth_signin(self):
"""Step 3: OAuth 登录流程"""
logger.info(f"[{self.email}] Step 3: Starting OAuth flow")
signin_params = {
'prompt': 'login',
'ext-oai-did': self.s.oai_did,
'auth_session_logging_id': str(uuid.uuid4()),
'screen_hint': 'signup',
'login_hint': self.email,
}
payload = {"callbackUrl": "/", "csrfToken": self.csrf_token, "json": "true"}
resp = self.s.post(
self.CHATGPT_SIGNIN,
params=signin_params,
data=payload,
headers={"Content-Type": "application/x-www-form-urlencoded"}
)
if resp.status_code != 200:
raise RuntimeError(f"OAuth signin failed: {resp.status_code}")
auth_url = resp.json().get("url")
if not auth_url:
raise RuntimeError("OAuth URL not found")
self.s.get(auth_url, allow_redirects=True)
self.s.get(self.AUTH_CREATE_ACCOUNT)
logger.info(f"[{self.email}] ✓ OAuth flow completed")
async def _step4_get_sentinel_token(self):
"""Step 4: Sentinel 握手"""
logger.info(f"[{self.email}] Step 4: Getting Sentinel token")
try:
self.sentinel_token = await self.sentinel.get_token(flow="username_password_create")
logger.info(f"[{self.email}] ✓ Sentinel token obtained")
except (NotImplementedError, ImportError) as e:
logger.error(f"[{self.email}] Sentinel solver not available: {e}")
raise
async def _step5_submit_registration(self):
"""Step 5: 提交注册信息"""
logger.info(f"[{self.email}] Step 5: Submitting registration")
payload = {"username": self.email, "password": self.password}
headers = {
"Content-Type": "application/json",
"Accept": "application/json",
"Origin": "https://auth.openai.com",
"Referer": self.AUTH_CREATE_ACCOUNT,
"Openai-Sentinel-Token": json.dumps(self.sentinel_token),
"X-Datadog-Trace-Id": str(secrets.randbits(63)),
"X-Datadog-Parent-Id": str(secrets.randbits(63)),
"X-Datadog-Sampling-Priority": "1",
"X-Datadog-Origin": "rum",
"Traceparent": f"00-0000000000000000{secrets.token_hex(8)}-{secrets.token_hex(8)}-01",
"Tracestate": "dd=s:1;o:rum",
}
resp = self.s.post(self.AUTH_REGISTER, json=payload, headers=headers)
if resp.status_code != 200:
if "email already exists" in resp.text.lower():
raise ValueError(f"Email already registered: {self.email}")
raise RuntimeError(f"Registration failed: {resp.status_code}")
logger.info(f"[{self.email}] ✓ Registration submitted")
async def _step6_send_email_otp(self):
"""Step 6: 触发邮件验证"""
logger.info(f"[{self.email}] Step 6: Sending email OTP")
resp = self.s.post(
self.AUTH_SEND_OTP,
json={},
headers={"Content-Type": "application/json", "Referer": self.AUTH_CREATE_ACCOUNT}
)
if resp.status_code == 403 and CloudflareSolver.detect_challenge(resp):
raise CloudflareBlockError("Cloudflare challenge triggered")
if resp.status_code != 200:
raise RuntimeError(f"OTP send failed: {resp.status_code}")
logger.info(f"[{self.email}] ✓ OTP email sent")
async def _step7_submit_otp(self):
"""Step 7: 提交 OTP 验证码"""
logger.info(f"[{self.email}] Step 7: Waiting for OTP")
try:
self.otp = await self.mail.wait_for_otp(email=self.email, timeout=300)
logger.info(f"[{self.email}] ✓ OTP received: {self.otp}")
except NotImplementedError:
logger.warning(f"[{self.email}] Mail handler not configured")
raise
except TimeoutError:
raise TimeoutError(f"Timeout waiting for OTP: {self.email}")
resp = self.s.post(
self.AUTH_VALIDATE_OTP,
json={"code": self.otp},
headers={"Content-Type": "application/json", "Referer": self.AUTH_CREATE_ACCOUNT}
)
if resp.status_code != 200:
raise RuntimeError(f"OTP validation failed: {resp.status_code}")
logger.info(f"[{self.email}] ✓ OTP validated")
async def _step8_complete_profile(self):
"""Step 8: 完成用户信息"""
logger.info(f"[{self.email}] Step 8: Completing profile")
name = self._generate_name()
birthdate = self._generate_birthdate()
resp = self.s.post(
self.AUTH_COMPLETE_PROFILE,
json={"name": name, "birthdate": birthdate},
headers={"Content-Type": "application/json", "Referer": self.AUTH_CREATE_ACCOUNT}
)
if resp.status_code != 200:
raise RuntimeError(f"Profile completion failed: {resp.status_code}")
# 获取 OAuth 回调 URL
data = resp.json()
self.oauth_callback_url = data.get("continue_url")
logger.debug(f"[{self.email}] OAuth callback URL: {self.oauth_callback_url}")
logger.info(f"[{self.email}] ✓ Profile completed")
async def _step9_complete_oauth_callback(self):
"""Step 9: 完成 OAuth 回调,获取 session-token"""
logger.info(f"[{self.email}] Step 9: Completing OAuth callback")
# 使用 step 8 返回的 continue_url跟随重定向链到 chatgpt.com
if self.oauth_callback_url:
# continue_url 可能是相对路径或完整 URL
if self.oauth_callback_url.startswith("http"):
callback_url = self.oauth_callback_url
else:
callback_url = f"https://auth.openai.com{self.oauth_callback_url}"
else:
# 如果没有 continue_url尝试访问 consent 端点
callback_url = self.AUTH_CONSENT
logger.debug(f"[{self.email}] Following OAuth callback: {callback_url}")
# 循环跟随重定向,直到到达 chatgpt.com 或获取到 session-token
max_redirects = 10
for i in range(max_redirects):
resp = self.s.get(callback_url, allow_redirects=True)
# 检查是否已到达 chatgpt.com
if 'chatgpt.com' in str(resp.url):
logger.debug(f"[{self.email}] Reached chatgpt.com")
break
# 检查响应是否包含重定向 URLJSON 格式)
try:
data = resp.json()
redirect_url = data.get("redirect_url") or data.get("location") or data.get("url")
if redirect_url:
logger.debug(f"[{self.email}] Following JSON redirect ({i+1}): {redirect_url[:100]}...")
callback_url = redirect_url
continue
except (json.JSONDecodeError, ValueError):
pass
# 没有更多重定向,退出循环
break
# 检查是否获取到 session-token cookie
session_token = self.s.get_cookie('__Secure-next-auth.session-token')
if not session_token:
# 尝试直接访问 chatgpt.com 首页触发 cookie 设置
logger.debug(f"[{self.email}] Session token not found, trying chatgpt.com homepage")
self.s.get(self.CHATGPT_HOME, allow_redirects=True)
session_token = self.s.get_cookie('__Secure-next-auth.session-token')
if not session_token:
# 打印所有 cookies 用于调试,但不抛出错误
# 尝试继续获取 access token有时候 session token 不是必需的
all_cookies = self.s.get_cookies()
logger.warning(f"[{self.email}] Session token not found, available cookies: {list(all_cookies.keys())}")
logger.info(f"[{self.email}] Continuing without session-token, will try to get access token directly")
else:
logger.info(f"[{self.email}] ✓ OAuth callback completed, session-token obtained")
async def _step10_get_access_token(self) -> str:
"""Step 10: 获取 AccessToken"""
logger.info(f"[{self.email}] Step 10: Getting access token")
resp = self.s.get(
self.CHATGPT_SESSION,
headers={
'Accept': 'application/json',
'Referer': 'https://chatgpt.com/',
}
)
if resp.status_code != 200:
raise RuntimeError(f"Failed to get access token: {resp.status_code}")
data = resp.json()
access_token = data.get('accessToken')
if not access_token:
raise RuntimeError("AccessToken not found in response")
logger.info(f"[{self.email}] ✓ AccessToken obtained successfully")
return access_token
def _generate_email(self) -> str:
"""生成随机邮箱"""
random_part = secrets.token_hex(8)
if self.config.mail.enabled and self.config.mail.type == "cloudmail":
domain = self.config.mail.cloudmail_domain
if domain:
return f"user_{random_part}@{domain}"
logger.warning("Using example.com domain. Configure MAIL_CLOUDMAIL_DOMAIN.")
return f"user_{random_part}@example.com"
def _generate_name(self) -> str:
"""生成随机姓名"""
first_names = ["James", "John", "Robert", "Michael", "William", "David", "Mary", "Patricia", "Jennifer", "Linda"]
last_names = ["Smith", "Johnson", "Williams", "Brown", "Jones", "Garcia", "Miller", "Davis", "Wilson", "Anderson"]
return f"{random.choice(first_names)} {random.choice(last_names)}"
def _generate_birthdate(self) -> str:
"""生成随机生日 (1980-2002)"""
year = random.randint(1980, 2002)
month = random.randint(1, 12)
day = random.randint(1, 28) if month == 2 else random.randint(1, 30) if month in [4, 6, 9, 11] else random.randint(1, 31)
return f"{year}-{month:02d}-{day:02d}"
__all__ = ["RegisterFlow"]