From 75a0dccebe2b2ff6e5459a0c673fa4743d652225 Mon Sep 17 00:00:00 2001 From: kyx236 Date: Fri, 30 Jan 2026 08:46:03 +0800 Subject: [PATCH] feat(registration): Add email retry mechanism with team invitation support - Add automatic email retry logic when verification code times out (5-second timeout) - Implement new email creation and team invitation for failed verification attempts - Add max_email_retries parameter to control retry attempts (default: 3) - Add team_name parameter to enable automatic team invitations for new emails - Return special "new_email:xxx@xxx.com:password" format when new email is used - Update register_openai_account_auto() to support team_name parameter - Update register_and_authorize() to support team_name parameter and return new email info - Improve verification code timeout handling with configurable retry intervals - Add nonlocal verification_timeout flag to track timeout state across retries - Update docstrings to document new parameters and return value changes --- browser_automation.py | 207 +++++++++++++++++++++++++++++++----------- run.py | 42 ++++++++- telegram_bot.py | 74 ++++++++++++++- 3 files changed, 261 insertions(+), 62 deletions(-) diff --git a/browser_automation.py b/browser_automation.py index b5794a8..09670ed 100644 --- a/browser_automation.py +++ b/browser_automation.py @@ -1079,64 +1079,136 @@ def is_logged_in(page, timeout: int = 5) -> bool: return False -def register_openai_account_api(email: str, password: str, proxy: str = None) -> bool: +def register_openai_account_api(email: str, password: str, proxy: str = None, + team_name: str = None, max_email_retries: int = 3) -> bool: """使用协议模式 (API) 注册 OpenAI 账号 + 如果验证码获取超时,会自动创建新邮箱重试(不进入浏览器模式) + Args: email: 邮箱地址 password: 密码 proxy: 代理地址 (可选) + team_name: Team 名称 (用于邀请新邮箱) + max_email_retries: 验证码超时后最大重试次数 (创建新邮箱) Returns: bool: 是否成功 + str: 如果返回 "new_email:xxx@xxx.com:password",表示使用了新邮箱 """ if not API_MODE_AVAILABLE: log.warning("协议模式不可用,回退到浏览器模式") return None # 返回 None 表示需要回退 - log.info(f"[API模式] 开始注册 OpenAI 账号: {email}", icon="account") + current_email = email + current_password = password - # 生成随机姓名和生日 - random_name = get_random_name() - birthday = get_random_birthday() - birthdate = f"{birthday['year']}-{birthday['month']}-{birthday['day']}" - - log.step(f"姓名: {random_name}, 生日: {birthdate}") - - # 定义获取验证码的函数 - def get_code(target_email): - progress_update(phase="注册", step="等待验证码...") - log.step("等待验证码邮件...") - code, error, email_time = unified_get_verification_code(target_email) - if code: - log.success(f"获取到验证码: {code}") - return code - - # 执行 API 注册 - try: - result = api_register_account_only( - email=email, - password=password, - real_name=random_name, - birthdate=birthdate, - get_verification_code_func=get_code, - proxy=proxy, - progress_callback=lambda msg: log.step(msg) - ) - - if result: - log.success(f"[API模式] 注册完成: {email}") - return True - else: - log.warning("[API模式] 注册失败,可能需要回退到浏览器模式") - return False + for retry in range(max_email_retries): + if retry > 0: + log.warning(f"验证码超时,尝试创建新邮箱 (重试 {retry}/{max_email_retries - 1})...") - except Exception as e: - log.error(f"[API模式] 注册异常: {e}") - return False + # 创建新邮箱 + from email_service import unified_create_email + new_email, new_password = unified_create_email() + + if not new_email: + log.error("创建新邮箱失败") + continue + + # 如果有 team_name,邀请新邮箱到 Team + if team_name: + from team_service import invite_single_to_team + from config import TEAMS + + # 查找 team 配置 + team = None + for t in TEAMS: + if t.get("name") == team_name: + team = t + break + + if team: + log.step(f"邀请新邮箱到 Team: {new_email}") + if not invite_single_to_team(new_email, team): + log.error("新邮箱邀请失败") + continue + log.success(f"新邮箱邀请成功: {new_email}") + + current_email = new_email + current_password = new_password + + log.info(f"[API模式] 开始注册 OpenAI 账号: {current_email}", icon="account") + + # 生成随机姓名和生日 + random_name = get_random_name() + birthday = get_random_birthday() + birthdate = f"{birthday['year']}-{birthday['month']}-{birthday['day']}" + + log.step(f"姓名: {random_name}, 生日: {birthdate}") + + # 验证码超时标志 + verification_timeout = False + + # 定义获取验证码的函数 (5秒超时) + def get_code(target_email): + nonlocal verification_timeout + progress_update(phase="注册", step="等待验证码...") + log.step("等待验证码邮件 (5秒超时)...") + + # 使用较短的超时时间: 5 次快速重试,每次 1 秒 + code, error, email_time = unified_get_verification_code( + target_email, + max_retries=5, # 5 次重试 + interval=1 # 每次间隔 1 秒 + ) + + if code: + log.success(f"获取到验证码: {code}") + return code + else: + verification_timeout = True + log.warning("验证码获取超时 (5秒)") + return None + + # 执行 API 注册 + try: + result = api_register_account_only( + email=current_email, + password=current_password, + real_name=random_name, + birthdate=birthdate, + get_verification_code_func=get_code, + proxy=proxy, + progress_callback=lambda msg: log.step(msg) + ) + + if result: + log.success(f"[API模式] 注册完成: {current_email}") + # 如果使用了新邮箱,返回特殊标记 + if current_email != email: + return f"new_email:{current_email}:{current_password}" + return True + elif verification_timeout: + # 验证码超时,继续下一次重试(创建新邮箱) + log.warning("[API模式] 验证码超时,将创建新邮箱重试...") + continue + else: + log.warning("[API模式] 注册失败") + return False + + except Exception as e: + log.error(f"[API模式] 注册异常: {e}") + if "验证码" in str(e) or "timeout" in str(e).lower(): + # 验证码相关异常,继续重试 + continue + return False + + log.error(f"[API模式] 已重试 {max_email_retries} 次,全部失败") + return False -def register_openai_account_auto(page, email: str, password: str, use_api: bool = True, proxy: str = None) -> bool: +def register_openai_account_auto(page, email: str, password: str, use_api: bool = True, + proxy: str = None, team_name: str = None) -> bool: """自动选择模式注册 OpenAI 账号 优先使用 API 模式,失败则回退到浏览器模式 @@ -1147,15 +1219,20 @@ def register_openai_account_auto(page, email: str, password: str, use_api: bool password: 密码 use_api: 是否优先使用 API 模式 proxy: 代理地址 (API 模式使用) + team_name: Team 名称 (用于验证码超时时邀请新邮箱) Returns: bool: 是否成功 + str: 如果返回 "new_email:xxx@xxx.com:password",表示使用了新邮箱 """ # 如果启用 API 模式且可用 if use_api and API_MODE_AVAILABLE: - result = register_openai_account_api(email, password, proxy) + result = register_openai_account_api(email, password, proxy, team_name) if result is True: return True + elif isinstance(result, str) and result.startswith("new_email:"): + # 使用了新邮箱,返回新邮箱信息 + return result elif result is False: log.warning("API 模式注册失败,回退到浏览器模式...") # result is None 表示 API 模式不可用,直接使用浏览器模式 @@ -2241,19 +2318,22 @@ def register_only(email: str, password: str, use_api_register: bool = True) -> s return "failed" -def register_and_authorize(email: str, password: str, use_api_register: bool = True) -> tuple: +def register_and_authorize(email: str, password: str, use_api_register: bool = True, + team_name: str = None) -> tuple: """完整流程: 注册 OpenAI + Codex 授权 (带重试机制) Args: email: 邮箱地址 password: 密码 use_api_register: 是否优先使用 API 模式注册 (默认 True) + team_name: Team 名称 (用于验证码超时时邀请新邮箱) Returns: - tuple: (register_success, codex_data) + tuple: (register_success, codex_data, new_email_info) - register_success: True/False/"domain_blacklisted" - CRS 模式: codex_data 包含 tokens - CPA/S2A 模式: codex_data 为 None (后台自动处理) + - new_email_info: 如果使用了新邮箱,返回 {"email": "xxx", "password": "xxx"},否则为 None """ # 获取授权回调锁 (CPA/S2A 模式需要串行授权) auth_lock = None @@ -2264,25 +2344,42 @@ def register_and_authorize(email: str, password: str, use_api_register: bool = T except (ImportError, AttributeError): pass + # 用于跟踪是否使用了新邮箱 + new_email_info = None + current_email = email + current_password = password + with browser_context_with_retry(max_browser_retries=2) as ctx: for attempt in ctx.attempts(): try: # 注册 OpenAI (优先使用 API 模式) register_result = register_openai_account_auto( - ctx.page, email, password, - use_api=use_api_register + ctx.page, current_email, current_password, + use_api=use_api_register, + team_name=team_name ) # 检查是否是域名黑名单错误 if register_result == "domain_blacklisted": ctx.stop() - return "domain_blacklisted", None + return "domain_blacklisted", None, None + + # 检查是否使用了新邮箱 + if isinstance(register_result, str) and register_result.startswith("new_email:"): + # 解析新邮箱信息: "new_email:xxx@xxx.com:password" + parts = register_result.split(":", 2) + if len(parts) >= 3: + current_email = parts[1] + current_password = parts[2] + new_email_info = {"email": current_email, "password": current_password} + log.success(f"使用新邮箱继续: {current_email}") + register_result = True if not register_result: if attempt < ctx.max_retries - 1: log.warning("注册失败,准备重试...") continue - return False, None + return False, None, new_email_info # 短暂等待确保注册完成 time.sleep(0.5) @@ -2303,16 +2400,16 @@ def register_and_authorize(email: str, password: str, use_api_register: bool = T # 根据配置选择授权方式 if AUTH_PROVIDER == "cpa": # CPA 模式: 授权成功即完成,后台自动处理账号 - success = perform_cpa_authorization(ctx.page, email, password) - return True, None if success else (True, None) # 注册成功,授权可能失败 + success = perform_cpa_authorization(ctx.page, current_email, current_password) + return True, None, new_email_info if success else (True, None, new_email_info) elif AUTH_PROVIDER == "s2a": # S2A 模式: 授权成功即完成,后台自动处理账号 - success = perform_s2a_authorization(ctx.page, email, password) - return True, None if success else (True, None) # 注册成功,授权可能失败 + success = perform_s2a_authorization(ctx.page, current_email, current_password) + return True, None, new_email_info if success else (True, None, new_email_info) else: # CRS 模式: 需要 codex_data - codex_data = perform_codex_authorization(ctx.page, email, password) - return True, codex_data + codex_data = perform_codex_authorization(ctx.page, current_email, current_password) + return True, codex_data, new_email_info finally: if auth_lock and lock_acquired: log.step("释放授权回调锁") @@ -2321,9 +2418,9 @@ def register_and_authorize(email: str, password: str, use_api_register: bool = T except Exception as e: ctx.handle_error(e) if ctx.current_attempt >= ctx.max_retries - 1: - return False, None + return False, None, new_email_info - return False, None + return False, None, new_email_info def authorize_only(email: str, password: str) -> tuple[bool, dict]: diff --git a/run.py b/run.py index 67616b9..4b55890 100644 --- a/run.py +++ b/run.py @@ -407,7 +407,26 @@ def process_accounts(accounts: list, team_name: str, team_index: int = 0, else: # 新账号: 注册 + Codex 授权 progress_update(phase="注册", step="注册 OpenAI...") - register_success, codex_data = register_and_authorize(email, password) + register_success, codex_data, new_email_info = register_and_authorize(email, password, team_name=team_name) + + # 如果使用了新邮箱,更新 tracker + if new_email_info: + new_email = new_email_info["email"] + new_password = new_email_info["password"] + log.info(f"验证码超时,已切换到新邮箱: {new_email}") + + # 从 tracker 中移除旧邮箱 + remove_account_from_tracker(_tracker, team_name, email) + + # 添加新邮箱到 tracker + add_account_with_password(_tracker, team_name, new_email, new_password, "registered") + save_team_tracker(_tracker) + + # 更新当前处理的邮箱信息 + email = new_email + password = new_password + result["email"] = email + result["password"] = password # 检查是否是域名黑名单错误 if register_success == "domain_blacklisted": @@ -643,7 +662,26 @@ def _process_single_account_worker( register_success = True else: log.info(f"[Worker-{worker_id}] 新账号,注册 + 授权...", icon="auth") - register_success, codex_data = register_and_authorize(email, password) + register_success, codex_data, new_email_info = register_and_authorize(email, password, team_name=team_name) + + # 如果使用了新邮箱,更新 tracker + if new_email_info: + new_email = new_email_info["email"] + new_password = new_email_info["password"] + log.info(f"[Worker-{worker_id}] 验证码超时,已切换到新邮箱: {new_email}") + + with _tracker_lock: + # 从 tracker 中移除旧邮箱 + remove_account_from_tracker(_tracker, team_name, email) + # 添加新邮箱到 tracker + add_account_with_password(_tracker, team_name, new_email, new_password, "registered") + save_team_tracker(_tracker) + + # 更新当前处理的邮箱信息 + email = new_email + password = new_password + result["email"] = email + result["password"] = password if register_success == "domain_blacklisted": domain = get_domain_from_email(email) diff --git a/telegram_bot.py b/telegram_bot.py index cb5749f..7ddade4 100644 --- a/telegram_bot.py +++ b/telegram_bot.py @@ -136,6 +136,7 @@ class ProvisionerBot: ("logs_stop", self.cmd_logs_stop), ("dashboard", self.cmd_dashboard), ("import", self.cmd_import), + ("verify", self.cmd_verify), ("stock", self.cmd_stock), ("gptmail_keys", self.cmd_gptmail_keys), ("gptmail_add", self.cmd_gptmail_add), @@ -269,6 +270,7 @@ class ProvisionerBot: BotCommand("stock", "查看账号库存"), BotCommand("s2a_config", "配置 S2A 参数"), BotCommand("import", "导入账号到 team.json"), + BotCommand("verify", "验证账号并移除无效账号"), # GPTMail BotCommand("gptmail_keys", "查看 GPTMail API Keys"), BotCommand("gptmail_add", "添加 GPTMail API Key"), @@ -336,6 +338,7 @@ class ProvisionerBot: 📤 导入账号: /import - 导入账号到 team.json +/verify - 验证账号并移除无效账号 或直接发送 JSON 文件 📧 GPTMail 管理: @@ -2093,7 +2096,7 @@ class ProvisionerBot: await self.app.bot.edit_message_text( chat_id=chat_id, message_id=progress_msg.message_id, - text=f"🔍 正在验证账号...\n\n⏳ 验证 {total} 个账号的 account_id...", + text=f"🔍 正在验证账号...\n\n⏳ 验证 {total} 个账号的 account_id (20 并发)...", parse_mode="HTML" ) @@ -2119,9 +2122,10 @@ class ProvisionerBot: return idx, email, account_id - # 使用线程池并行验证 - max_workers = min(10, total) + # 使用线程池并行验证 (20 并发) + max_workers = min(20, total) completed_count = 0 + last_update_time = 0 with ThreadPoolExecutor(max_workers=max_workers) as executor: futures = {executor.submit(verify_account, item): item for item in accounts_to_verify} @@ -2138,8 +2142,11 @@ class ProvisionerBot: # 验证失败 failed_accounts.append({"idx": idx, "email": email}) - # 每处理 5 个更新一次进度 - if completed_count % 5 == 0 or completed_count == total: + # 每处理 10 个或间隔 1 秒更新一次进度 + import time + current_time = time.time() + if completed_count % 10 == 0 or completed_count == total or current_time - last_update_time > 1: + last_update_time = current_time try: await self.app.bot.edit_message_text( chat_id=chat_id, @@ -2208,6 +2215,63 @@ class ProvisionerBot: except Exception: pass + @admin_only + async def cmd_verify(self, update: Update, context: ContextTypes.DEFAULT_TYPE): + """手动验证 team.json 中的账号,获取 account_id 并移除无效账号""" + chat_id = update.effective_chat.id + + # 检查是否有任务正在运行 + if self.current_task and not self.current_task.done(): + await update.message.reply_text( + f"⚠️ 有任务正在运行: {self.current_team}\n" + "请等待任务完成或使用 /stop 停止后再验证" + ) + return + + # 检查 team.json 是否存在 + from pathlib import Path + team_json_path = Path(TEAM_JSON_FILE) + if not team_json_path.exists(): + await update.message.reply_text("❌ team.json 不存在,请先导入账号") + return + + # 统计需要验证的账号 + import json + try: + with open(team_json_path, "r", encoding="utf-8") as f: + accounts = json.load(f) + if not isinstance(accounts, list): + accounts = [accounts] + except Exception as e: + await update.message.reply_text(f"❌ 读取 team.json 失败: {e}") + return + + total_accounts = len(accounts) + need_verify = sum(1 for acc in accounts if acc.get("token") and not acc.get("account_id")) + already_verified = sum(1 for acc in accounts if acc.get("account_id")) + + if need_verify == 0: + await update.message.reply_text( + f"✅ 无需验证\n\n" + f"team.json 共 {total_accounts} 个账号\n" + f"已验证: {already_verified}\n" + f"待验证: 0", + parse_mode="HTML" + ) + return + + await update.message.reply_text( + f"🔍 开始验证账号\n\n" + f"team.json 共 {total_accounts} 个账号\n" + f"已验证: {already_verified}\n" + f"待验证: {need_verify}\n\n" + f"⏳ 正在验证...", + parse_mode="HTML" + ) + + # 执行验证 + await self._validate_and_cleanup_accounts(chat_id) + async def _import_batch_timeout_callback(self, context: ContextTypes.DEFAULT_TYPE): """批量导入超时回调 - 由 job_queue 调用""" chat_id = context.job.data.get("chat_id")