Files
autoPlus/core/flow.py
2026-01-29 18:54:04 +08:00

323 lines
13 KiB
Python

"""OpenAI 账号注册流程编排模块"""
from typing import Dict, Any, Optional
import secrets
import random
import json
import uuid
from core.session import OAISession, CloudflareBlockError, SessionInvalidError, RateLimitError
from core.sentinel_handler import SentinelHandler
from core.challenge import CloudflareSolver
from utils.mail_box import MailHandler
from utils.crypto import generate_random_password
from utils.logger import logger
class RegisterFlow:
"""OpenAI 账号注册流程编排器"""
CHATGPT_HOME = "https://chatgpt.com/"
CHATGPT_PROVIDERS = "https://chatgpt.com/api/auth/providers"
CHATGPT_CSRF = "https://chatgpt.com/api/auth/csrf"
CHATGPT_SIGNIN = "https://chatgpt.com/api/auth/signin/openai"
AUTH_CREATE_ACCOUNT = "https://auth.openai.com/create-account/password"
AUTH_REGISTER = "https://auth.openai.com/api/accounts/user/register"
AUTH_SEND_OTP = "https://auth.openai.com/api/accounts/email-otp/send"
AUTH_VALIDATE_OTP = "https://auth.openai.com/api/accounts/email-otp/validate"
AUTH_COMPLETE_PROFILE = "https://auth.openai.com/api/accounts/create_account"
# 获取 Token 相关
CHATGPT_SESSION = "https://chatgpt.com/api/auth/session"
def __init__(self, session: OAISession, config, email: Optional[str] = None, password: Optional[str] = None):
self.s = session
self.config = config
self.email = email or self._generate_email()
self.password = password or generate_random_password()
self.sentinel = SentinelHandler(session)
self.mail = MailHandler.create(config.mail.to_dict() if config.mail.enabled else None)
self.cloudflare_solver = CloudflareSolver()
self.csrf_token: Optional[str] = None
self.sentinel_token: Optional[Dict[str, Any]] = None
self.otp: Optional[str] = None
self.access_token: Optional[str] = None
logger.info(f"RegisterFlow initialized for {self.email} (oai-did: {self.s.oai_did})")
async def run(self) -> Dict[str, Any]:
"""执行完整注册流程"""
try:
logger.info(f"[{self.email}] Starting registration flow")
# Step 0: CloudMail 邮箱账户
if self.config.mail.enabled and self.config.mail.type == "cloudmail":
try:
from utils.mail_box import CloudMailHandler
if isinstance(self.mail, CloudMailHandler):
logger.info(f"[{self.email}] Step 0: Ensuring email account exists")
await self.mail.ensure_email_exists(self.email)
logger.info(f"[{self.email}] ✓ Email account ready")
except Exception as e:
logger.warning(f"[{self.email}] Failed to create CloudMail account: {e}")
await self._step1_init_session()
await self._step2_get_csrf_token()
await self._step3_oauth_signin()
await self._step4_get_sentinel_token()
await self._step5_submit_registration()
await self._step6_send_email_otp()
await self._step7_submit_otp()
await self._step8_complete_profile()
await self._step9_get_access_token()
logger.success(f"[{self.email}] Registration completed successfully! ✅")
return {
"email": self.email,
"password": self.password,
"oai_did": self.s.oai_did,
"access_token": self.access_token,
"status": "success",
"message": "Account registered successfully"
}
except CloudflareBlockError as e:
logger.error(f"[{self.email}] Cloudflare blocked: {e}")
return {"email": self.email, "password": self.password, "status": "cloudflare_blocked", "error": str(e)}
except SessionInvalidError as e:
logger.error(f"[{self.email}] Session invalid: {e}")
return {"email": self.email, "password": self.password, "status": "session_invalid", "error": str(e)}
except RateLimitError as e:
logger.error(f"[{self.email}] Rate limited: {e}")
return {"email": self.email, "password": self.password, "status": "rate_limited", "error": str(e)}
except NotImplementedError as e:
logger.warning(f"[{self.email}] Feature not implemented: {e}")
return {"email": self.email, "password": self.password, "status": "pending_manual", "error": str(e)}
except Exception as e:
logger.exception(f"[{self.email}] Unexpected error")
return {"email": self.email, "password": self.password, "status": "failed", "error": str(e)}
async def _step1_init_session(self):
"""Step 1: 初始化会话"""
logger.info(f"[{self.email}] Step 1: Initializing session")
self.s.get(self.CHATGPT_HOME)
self.s.get(self.CHATGPT_PROVIDERS)
logger.info(f"[{self.email}] ✓ Session initialized")
async def _step2_get_csrf_token(self):
"""Step 2: 获取 CSRF Token"""
logger.info(f"[{self.email}] Step 2: Getting CSRF token")
resp = self.s.get(self.CHATGPT_CSRF)
if resp.status_code != 200:
raise RuntimeError(f"Failed to get CSRF token: {resp.status_code}")
self.csrf_token = resp.json().get("csrfToken")
if not self.csrf_token:
raise RuntimeError("CSRF token not found")
logger.info(f"[{self.email}] ✓ CSRF token obtained")
async def _step3_oauth_signin(self):
"""Step 3: OAuth 登录流程"""
logger.info(f"[{self.email}] Step 3: Starting OAuth flow")
signin_params = {
'prompt': 'login',
'ext-oai-did': self.s.oai_did,
'auth_session_logging_id': str(uuid.uuid4()),
'screen_hint': 'signup',
'login_hint': self.email,
}
payload = {"callbackUrl": "/", "csrfToken": self.csrf_token, "json": "true"}
resp = self.s.post(
self.CHATGPT_SIGNIN,
params=signin_params,
data=payload,
headers={"Content-Type": "application/x-www-form-urlencoded"}
)
if resp.status_code != 200:
raise RuntimeError(f"OAuth signin failed: {resp.status_code}")
auth_url = resp.json().get("url")
if not auth_url:
raise RuntimeError("OAuth URL not found")
self.s.get(auth_url, allow_redirects=True)
self.s.get(self.AUTH_CREATE_ACCOUNT)
logger.info(f"[{self.email}] ✓ OAuth flow completed")
async def _step4_get_sentinel_token(self):
"""Step 4: Sentinel 握手"""
logger.info(f"[{self.email}] Step 4: Getting Sentinel token")
try:
self.sentinel_token = await self.sentinel.get_token(flow="username_password_create")
logger.info(f"[{self.email}] ✓ Sentinel token obtained")
except (NotImplementedError, ImportError) as e:
logger.error(f"[{self.email}] Sentinel solver not available: {e}")
raise
async def _step5_submit_registration(self):
"""Step 5: 提交注册信息"""
logger.info(f"[{self.email}] Step 5: Submitting registration")
payload = {"username": self.email, "password": self.password}
headers = {
"Content-Type": "application/json",
"Accept": "application/json",
"Origin": "https://auth.openai.com",
"Referer": self.AUTH_CREATE_ACCOUNT,
"Openai-Sentinel-Token": json.dumps(self.sentinel_token),
"X-Datadog-Trace-Id": str(secrets.randbits(63)),
"X-Datadog-Parent-Id": str(secrets.randbits(63)),
"X-Datadog-Sampling-Priority": "1",
"X-Datadog-Origin": "rum",
"Traceparent": f"00-0000000000000000{secrets.token_hex(8)}-{secrets.token_hex(8)}-01",
"Tracestate": "dd=s:1;o:rum",
}
resp = self.s.post(self.AUTH_REGISTER, json=payload, headers=headers)
if resp.status_code != 200:
if "email already exists" in resp.text.lower():
raise ValueError(f"Email already registered: {self.email}")
raise RuntimeError(f"Registration failed: {resp.status_code}")
logger.info(f"[{self.email}] ✓ Registration submitted")
async def _step6_send_email_otp(self):
"""Step 6: 触发邮件验证"""
logger.info(f"[{self.email}] Step 6: Sending email OTP")
resp = self.s.post(
self.AUTH_SEND_OTP,
json={},
headers={"Content-Type": "application/json", "Referer": self.AUTH_CREATE_ACCOUNT}
)
if resp.status_code == 403 and CloudflareSolver.detect_challenge(resp):
raise CloudflareBlockError("Cloudflare challenge triggered")
if resp.status_code != 200:
raise RuntimeError(f"OTP send failed: {resp.status_code}")
logger.info(f"[{self.email}] ✓ OTP email sent")
async def _step7_submit_otp(self):
"""Step 7: 提交 OTP 验证码"""
logger.info(f"[{self.email}] Step 7: Waiting for OTP")
try:
self.otp = await self.mail.wait_for_otp(email=self.email, timeout=300)
logger.info(f"[{self.email}] ✓ OTP received: {self.otp}")
except NotImplementedError:
logger.warning(f"[{self.email}] Mail handler not configured")
raise
except TimeoutError:
raise TimeoutError(f"Timeout waiting for OTP: {self.email}")
resp = self.s.post(
self.AUTH_VALIDATE_OTP,
json={"code": self.otp},
headers={"Content-Type": "application/json", "Referer": self.AUTH_CREATE_ACCOUNT}
)
if resp.status_code != 200:
raise RuntimeError(f"OTP validation failed: {resp.status_code}")
logger.info(f"[{self.email}] ✓ OTP validated")
async def _step8_complete_profile(self):
"""Step 8: 完成用户信息"""
logger.info(f"[{self.email}] Step 8: Completing profile")
name = self._generate_name()
birthdate = self._generate_birthdate()
resp = self.s.post(
self.AUTH_COMPLETE_PROFILE,
json={"name": name, "birthdate": birthdate},
headers={"Content-Type": "application/json", "Referer": self.AUTH_CREATE_ACCOUNT},
allow_redirects=False
)
if resp.status_code not in [200, 302, 303]:
raise RuntimeError(f"Profile completion failed: {resp.status_code}")
# 检查是否有 continue_url 需要跟随
try:
data = resp.json()
continue_url = data.get("continue_url")
if continue_url:
logger.info(f"[{self.email}] Following OAuth callback...")
if not continue_url.startswith("http"):
continue_url = f"https://auth.openai.com{continue_url}"
# 跟随 OAuth 回调,最终会重定向到 chatgpt.com
callback_headers = {
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Referer": "https://auth.openai.com/",
"Sec-Fetch-Dest": "document",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-Site": "same-origin",
"Upgrade-Insecure-Requests": "1",
}
self.s.get(continue_url, headers=callback_headers, allow_redirects=True)
logger.info(f"[{self.email}] ✓ OAuth callback completed")
except Exception as e:
logger.debug(f"[{self.email}] No continue_url or parse error: {e}")
logger.info(f"[{self.email}] ✓ Profile completed")
async def _step9_get_access_token(self):
"""Step 9: 通过登录流程获取 Access Token"""
logger.info(f"[{self.email}] Step 9: Getting access token via login")
from core.login_flow import LoginFlow
# 使用当前 session 执行登录流程
login_flow = LoginFlow(self.s, self.email, self.password)
result = await login_flow.run()
if result.get("status") == "success":
self.access_token = result.get("access_token")
logger.info(f"[{self.email}] ✓ Access token obtained: {self.access_token[:50]}...")
else:
logger.warning(f"[{self.email}] Failed to get access token: {result.get('error')}")
def _generate_email(self) -> str:
"""生成随机邮箱"""
random_part = secrets.token_hex(8)
if self.config.mail.enabled and self.config.mail.type == "cloudmail":
domain = self.config.mail.cloudmail_domain
if domain:
return f"user_{random_part}@{domain}"
logger.warning("Using example.com domain. Configure MAIL_CLOUDMAIL_DOMAIN.")
return f"user_{random_part}@example.com"
def _generate_name(self) -> str:
"""生成随机姓名"""
first_names = ["James", "John", "Robert", "Michael", "William", "David", "Mary", "Patricia", "Jennifer", "Linda"]
last_names = ["Smith", "Johnson", "Williams", "Brown", "Jones", "Garcia", "Miller", "Davis", "Wilson", "Anderson"]
return f"{random.choice(first_names)} {random.choice(last_names)}"
def _generate_birthdate(self) -> str:
"""生成随机生日 (1980-2002)"""
year = random.randint(1980, 2002)
month = random.randint(1, 12)
day = random.randint(1, 28) if month == 2 else random.randint(1, 30) if month in [4, 6, 9, 11] else random.randint(1, 31)
return f"{year}-{month:02d}-{day:02d}"
__all__ = ["RegisterFlow"]