feat(registration): Add email retry mechanism with team invitation support

- Add automatic email retry logic when verification code times out (5-second timeout)
- Implement new email creation and team invitation for failed verification attempts
- Add max_email_retries parameter to control retry attempts (default: 3)
- Add team_name parameter to enable automatic team invitations for new emails
- Return special "new_email:xxx@xxx.com:password" format when new email is used
- Update register_openai_account_auto() to support team_name parameter
- Update register_and_authorize() to support team_name parameter and return new email info
- Improve verification code timeout handling with configurable retry intervals
- Add nonlocal verification_timeout flag to track timeout state across retries
- Update docstrings to document new parameters and return value changes
This commit is contained in:
2026-01-30 08:46:03 +08:00
parent b7e658c567
commit 75a0dccebe
3 changed files with 261 additions and 62 deletions

View File

@@ -1079,64 +1079,136 @@ def is_logged_in(page, timeout: int = 5) -> bool:
return False
def register_openai_account_api(email: str, password: str, proxy: str = None) -> bool:
def register_openai_account_api(email: str, password: str, proxy: str = None,
team_name: str = None, max_email_retries: int = 3) -> bool:
"""使用协议模式 (API) 注册 OpenAI 账号
如果验证码获取超时,会自动创建新邮箱重试(不进入浏览器模式)
Args:
email: 邮箱地址
password: 密码
proxy: 代理地址 (可选)
team_name: Team 名称 (用于邀请新邮箱)
max_email_retries: 验证码超时后最大重试次数 (创建新邮箱)
Returns:
bool: 是否成功
str: 如果返回 "new_email:xxx@xxx.com:password",表示使用了新邮箱
"""
if not API_MODE_AVAILABLE:
log.warning("协议模式不可用,回退到浏览器模式")
return None # 返回 None 表示需要回退
log.info(f"[API模式] 开始注册 OpenAI 账号: {email}", icon="account")
current_email = email
current_password = password
# 生成随机姓名和生日
random_name = get_random_name()
birthday = get_random_birthday()
birthdate = f"{birthday['year']}-{birthday['month']}-{birthday['day']}"
log.step(f"姓名: {random_name}, 生日: {birthdate}")
# 定义获取验证码的函数
def get_code(target_email):
progress_update(phase="注册", step="等待验证码...")
log.step("等待验证码邮件...")
code, error, email_time = unified_get_verification_code(target_email)
if code:
log.success(f"获取到验证码: {code}")
return code
# 执行 API 注册
try:
result = api_register_account_only(
email=email,
password=password,
real_name=random_name,
birthdate=birthdate,
get_verification_code_func=get_code,
proxy=proxy,
progress_callback=lambda msg: log.step(msg)
)
if result:
log.success(f"[API模式] 注册完成: {email}")
return True
else:
log.warning("[API模式] 注册失败,可能需要回退到浏览器模式")
return False
for retry in range(max_email_retries):
if retry > 0:
log.warning(f"验证码超时,尝试创建新邮箱 (重试 {retry}/{max_email_retries - 1})...")
except Exception as e:
log.error(f"[API模式] 注册异常: {e}")
return False
# 创建新邮箱
from email_service import unified_create_email
new_email, new_password = unified_create_email()
if not new_email:
log.error("创建新邮箱失败")
continue
# 如果有 team_name邀请新邮箱到 Team
if team_name:
from team_service import invite_single_to_team
from config import TEAMS
# 查找 team 配置
team = None
for t in TEAMS:
if t.get("name") == team_name:
team = t
break
if team:
log.step(f"邀请新邮箱到 Team: {new_email}")
if not invite_single_to_team(new_email, team):
log.error("新邮箱邀请失败")
continue
log.success(f"新邮箱邀请成功: {new_email}")
current_email = new_email
current_password = new_password
log.info(f"[API模式] 开始注册 OpenAI 账号: {current_email}", icon="account")
# 生成随机姓名和生日
random_name = get_random_name()
birthday = get_random_birthday()
birthdate = f"{birthday['year']}-{birthday['month']}-{birthday['day']}"
log.step(f"姓名: {random_name}, 生日: {birthdate}")
# 验证码超时标志
verification_timeout = False
# 定义获取验证码的函数 (5秒超时)
def get_code(target_email):
nonlocal verification_timeout
progress_update(phase="注册", step="等待验证码...")
log.step("等待验证码邮件 (5秒超时)...")
# 使用较短的超时时间: 5 次快速重试,每次 1 秒
code, error, email_time = unified_get_verification_code(
target_email,
max_retries=5, # 5 次重试
interval=1 # 每次间隔 1 秒
)
if code:
log.success(f"获取到验证码: {code}")
return code
else:
verification_timeout = True
log.warning("验证码获取超时 (5秒)")
return None
# 执行 API 注册
try:
result = api_register_account_only(
email=current_email,
password=current_password,
real_name=random_name,
birthdate=birthdate,
get_verification_code_func=get_code,
proxy=proxy,
progress_callback=lambda msg: log.step(msg)
)
if result:
log.success(f"[API模式] 注册完成: {current_email}")
# 如果使用了新邮箱,返回特殊标记
if current_email != email:
return f"new_email:{current_email}:{current_password}"
return True
elif verification_timeout:
# 验证码超时,继续下一次重试(创建新邮箱)
log.warning("[API模式] 验证码超时,将创建新邮箱重试...")
continue
else:
log.warning("[API模式] 注册失败")
return False
except Exception as e:
log.error(f"[API模式] 注册异常: {e}")
if "验证码" in str(e) or "timeout" in str(e).lower():
# 验证码相关异常,继续重试
continue
return False
log.error(f"[API模式] 已重试 {max_email_retries} 次,全部失败")
return False
def register_openai_account_auto(page, email: str, password: str, use_api: bool = True, proxy: str = None) -> bool:
def register_openai_account_auto(page, email: str, password: str, use_api: bool = True,
proxy: str = None, team_name: str = None) -> bool:
"""自动选择模式注册 OpenAI 账号
优先使用 API 模式,失败则回退到浏览器模式
@@ -1147,15 +1219,20 @@ def register_openai_account_auto(page, email: str, password: str, use_api: bool
password: 密码
use_api: 是否优先使用 API 模式
proxy: 代理地址 (API 模式使用)
team_name: Team 名称 (用于验证码超时时邀请新邮箱)
Returns:
bool: 是否成功
str: 如果返回 "new_email:xxx@xxx.com:password",表示使用了新邮箱
"""
# 如果启用 API 模式且可用
if use_api and API_MODE_AVAILABLE:
result = register_openai_account_api(email, password, proxy)
result = register_openai_account_api(email, password, proxy, team_name)
if result is True:
return True
elif isinstance(result, str) and result.startswith("new_email:"):
# 使用了新邮箱,返回新邮箱信息
return result
elif result is False:
log.warning("API 模式注册失败,回退到浏览器模式...")
# result is None 表示 API 模式不可用,直接使用浏览器模式
@@ -2241,19 +2318,22 @@ def register_only(email: str, password: str, use_api_register: bool = True) -> s
return "failed"
def register_and_authorize(email: str, password: str, use_api_register: bool = True) -> tuple:
def register_and_authorize(email: str, password: str, use_api_register: bool = True,
team_name: str = None) -> tuple:
"""完整流程: 注册 OpenAI + Codex 授权 (带重试机制)
Args:
email: 邮箱地址
password: 密码
use_api_register: 是否优先使用 API 模式注册 (默认 True)
team_name: Team 名称 (用于验证码超时时邀请新邮箱)
Returns:
tuple: (register_success, codex_data)
tuple: (register_success, codex_data, new_email_info)
- register_success: True/False/"domain_blacklisted"
- CRS 模式: codex_data 包含 tokens
- CPA/S2A 模式: codex_data 为 None (后台自动处理)
- new_email_info: 如果使用了新邮箱,返回 {"email": "xxx", "password": "xxx"},否则为 None
"""
# 获取授权回调锁 (CPA/S2A 模式需要串行授权)
auth_lock = None
@@ -2264,25 +2344,42 @@ def register_and_authorize(email: str, password: str, use_api_register: bool = T
except (ImportError, AttributeError):
pass
# 用于跟踪是否使用了新邮箱
new_email_info = None
current_email = email
current_password = password
with browser_context_with_retry(max_browser_retries=2) as ctx:
for attempt in ctx.attempts():
try:
# 注册 OpenAI (优先使用 API 模式)
register_result = register_openai_account_auto(
ctx.page, email, password,
use_api=use_api_register
ctx.page, current_email, current_password,
use_api=use_api_register,
team_name=team_name
)
# 检查是否是域名黑名单错误
if register_result == "domain_blacklisted":
ctx.stop()
return "domain_blacklisted", None
return "domain_blacklisted", None, None
# 检查是否使用了新邮箱
if isinstance(register_result, str) and register_result.startswith("new_email:"):
# 解析新邮箱信息: "new_email:xxx@xxx.com:password"
parts = register_result.split(":", 2)
if len(parts) >= 3:
current_email = parts[1]
current_password = parts[2]
new_email_info = {"email": current_email, "password": current_password}
log.success(f"使用新邮箱继续: {current_email}")
register_result = True
if not register_result:
if attempt < ctx.max_retries - 1:
log.warning("注册失败,准备重试...")
continue
return False, None
return False, None, new_email_info
# 短暂等待确保注册完成
time.sleep(0.5)
@@ -2303,16 +2400,16 @@ def register_and_authorize(email: str, password: str, use_api_register: bool = T
# 根据配置选择授权方式
if AUTH_PROVIDER == "cpa":
# CPA 模式: 授权成功即完成,后台自动处理账号
success = perform_cpa_authorization(ctx.page, email, password)
return True, None if success else (True, None) # 注册成功,授权可能失败
success = perform_cpa_authorization(ctx.page, current_email, current_password)
return True, None, new_email_info if success else (True, None, new_email_info)
elif AUTH_PROVIDER == "s2a":
# S2A 模式: 授权成功即完成,后台自动处理账号
success = perform_s2a_authorization(ctx.page, email, password)
return True, None if success else (True, None) # 注册成功,授权可能失败
success = perform_s2a_authorization(ctx.page, current_email, current_password)
return True, None, new_email_info if success else (True, None, new_email_info)
else:
# CRS 模式: 需要 codex_data
codex_data = perform_codex_authorization(ctx.page, email, password)
return True, codex_data
codex_data = perform_codex_authorization(ctx.page, current_email, current_password)
return True, codex_data, new_email_info
finally:
if auth_lock and lock_acquired:
log.step("释放授权回调锁")
@@ -2321,9 +2418,9 @@ def register_and_authorize(email: str, password: str, use_api_register: bool = T
except Exception as e:
ctx.handle_error(e)
if ctx.current_attempt >= ctx.max_retries - 1:
return False, None
return False, None, new_email_info
return False, None
return False, None, new_email_info
def authorize_only(email: str, password: str) -> tuple[bool, dict]: