From e4f330500d4024935dc0d69a3ab6b77771f8958a Mon Sep 17 00:00:00 2001 From: kyx236 Date: Fri, 30 Jan 2026 10:50:25 +0800 Subject: [PATCH] feat(telegram_bot): Add concurrent team registration with multi-worker support - Implement concurrent registration processing with configurable worker count - Add CONCURRENT_ENABLED and CONCURRENT_WORKERS config options for parallel execution - Replace single-threaded loop with task queue and worker thread pool architecture - Add per-worker step tracking and status display in progress messages - Implement thread-safe result collection with results_lock for concurrent access - Update progress UI to show individual worker status and concurrent worker count - Refactor step callback to support multiple workers with worker_id tracking - Add graceful shutdown handling for concurrent workers - Improve progress message updates to only refresh when content changes - Optimize performance by allowing multiple registrations to run in parallel --- telegram_bot.py | 248 +++++++++++++++++++++++------------------------- 1 file changed, 117 insertions(+), 131 deletions(-) diff --git a/telegram_bot.py b/telegram_bot.py index 58f7ae7..1a5322a 100644 --- a/telegram_bot.py +++ b/telegram_bot.py @@ -3439,34 +3439,43 @@ class ProvisionerBot: ) async def _run_team_registration(self, chat_id: int, count: int, output_type: str): - """执行 GPT Team 注册任务""" + """执行 GPT Team 注册任务 (支持并发)""" from auto_gpt_team import run_single_registration_auto, cleanup_chrome_processes, get_register_mode + from config import CONCURRENT_ENABLED, CONCURRENT_WORKERS import json import threading results = [] success_count = 0 fail_count = 0 + results_lock = threading.Lock() - # 获取当前注册模式 + # 获取当前注册模式和并发数 current_mode = get_register_mode() mode_display = "🌐 协议模式" if current_mode == "api" else "🖥️ 浏览器模式" - # 当前步骤 (用于显示) - current_step = ["初始化..."] - current_account = [""] - step_lock = threading.Lock() + # 确定并发数 + workers = CONCURRENT_WORKERS if CONCURRENT_ENABLED else 1 + workers = min(workers, count) # 不超过总数 - def step_callback(step: str): - """步骤回调 - 更新当前步骤""" - with step_lock: - current_step[0] = step + # 当前步骤 (用于显示) + current_steps = {} # worker_id -> step + step_lock = threading.Lock() + completed_count = [0] # 使用列表以便在闭包中修改 + + def make_step_callback(worker_id): + """创建步骤回调""" + def callback(step: str): + with step_lock: + current_steps[worker_id] = step + return callback # 发送开始消息 progress_msg = await self.app.bot.send_message( chat_id, f"🚀 开始注册\n\n" f"模式: {mode_display}\n" + f"并发: {workers}\n" f"进度: 0/{count}\n" f"{'▱' * 20}", parse_mode="HTML" @@ -3475,35 +3484,40 @@ class ProvisionerBot: # 进度更新任务 async def update_progress_loop(): """定期更新进度消息""" - last_step = "" + last_text = "" while True: - await asyncio.sleep(1.5) # 每 1.5 秒更新一次 + await asyncio.sleep(1.5) try: - with step_lock: - step = current_step[0] - account = current_account[0] + with results_lock: + s_count = success_count + f_count = fail_count - # 只有步骤变化时才更新 - if step != last_step: - last_step = step - progress = int((success_count + fail_count) / count * 20) if count > 0 else 0 - progress_bar = '▰' * progress + '▱' * (20 - progress) - - text = ( - f"🚀 注册中...\n\n" - f"模式: {mode_display}\n" - f"进度: {success_count + fail_count}/{count}\n" - f"{progress_bar}\n\n" - f"✅ 成功: {success_count}\n" - f"❌ 失败: {fail_count}\n" - ) - - if account: - text += f"\n⏳ 账号: {account[:20]}..." - - if step: - text += f"\n ▸ {step}" - + with step_lock: + steps_copy = dict(current_steps) + + total_done = s_count + f_count + progress = int(total_done / count * 20) if count > 0 else 0 + progress_bar = '▰' * progress + '▱' * (20 - progress) + + text = ( + f"🚀 注册中...\n\n" + f"模式: {mode_display}\n" + f"并发: {workers}\n" + f"进度: {total_done}/{count}\n" + f"{progress_bar}\n\n" + f"✅ 成功: {s_count}\n" + f"❌ 失败: {f_count}\n" + ) + + # 显示各 worker 状态 + if steps_copy: + text += "\n工作状态:\n" + for wid, step in sorted(steps_copy.items()): + if step: + text += f" #{wid+1}: {step[:30]}\n" + + if text != last_text: + last_text = text try: await progress_msg.edit_text(text, parse_mode="HTML") except: @@ -3516,85 +3530,80 @@ class ProvisionerBot: # 启动进度更新任务 progress_task = asyncio.create_task(update_progress_loop()) - for i in range(count): - # 检查停止请求 - try: - import run - if run._shutdown_requested: - with step_lock: - current_step[0] = "用户请求停止..." - break - except: - pass + # 任务队列 + task_queue = list(range(count)) + queue_lock = threading.Lock() + + def worker_task(worker_id: int): + """单个 worker 的任务""" + nonlocal success_count, fail_count - # 执行注册 - try: - # 使用 functools.partial 传递回调 - import functools + step_callback = make_step_callback(worker_id) + + while True: + # 检查停止请求 + try: + import run + if run._shutdown_requested: + step_callback("已停止") + break + except: + pass - def run_with_callback(): - # 在执行前再次检查停止请求 - try: - import run as run_module - if run_module._shutdown_requested: - return {"success": False, "error": "用户停止", "stopped": True} - except: - pass - return run_single_registration_auto( + # 获取任务 + with queue_lock: + if not task_queue: + break + task_idx = task_queue.pop(0) + + step_callback(f"第 {task_idx + 1} 个...") + + try: + result = run_single_registration_auto( progress_callback=None, step_callback=step_callback ) + + with results_lock: + if result.get("stopped"): + step_callback("已停止") + break + elif result.get("success"): + success_count += 1 + results.append({ + "account": result["account"], + "password": result["password"], + "token": result["token"], + "account_id": result.get("account_id", "") + }) + else: + fail_count += 1 + log.warning(f"Worker {worker_id}: 注册失败: {result.get('error', '未知错误')}") + except Exception as e: + with results_lock: + fail_count += 1 + log.error(f"Worker {worker_id}: 注册异常: {e}") - # 更新当前账号 - with step_lock: - current_step[0] = "生成账号信息..." - current_account[0] = f"第 {i+1} 个" - - result = await asyncio.get_event_loop().run_in_executor( - self.executor, - run_with_callback - ) - - if result.get("stopped"): - # 被 /stop 命令中断,不计入失败 - log.info("注册被用户停止") - with step_lock: - current_step[0] = "已停止" - break - elif result.get("success"): - success_count += 1 - results.append({ - "account": result["account"], - "password": result["password"], - "token": result["token"], - "account_id": result.get("account_id", "") - }) - with step_lock: - current_account[0] = result["account"] - else: - fail_count += 1 - log.warning(f"注册失败: {result.get('error', '未知错误')}") - except asyncio.CancelledError: - log.info("注册任务被取消") - with step_lock: - current_step[0] = "已取消" - break - except Exception as e: - fail_count += 1 - log.error(f"注册异常: {e}") + # 清理浏览器进程 + cleanup_chrome_processes() - # 清理浏览器进程 - cleanup_chrome_processes() + # 清理 worker 状态 + with step_lock: + if worker_id in current_steps: + del current_steps[worker_id] + + # 使用线程池并发执行 + import concurrent.futures + + with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as executor: + futures = [executor.submit(worker_task, i) for i in range(workers)] - # 每次注册后检查停止请求 - try: - import run - if run._shutdown_requested: - with step_lock: - current_step[0] = "用户请求停止..." - break - except: - pass + # 等待所有任务完成 + for future in concurrent.futures.as_completed(futures): + try: + future.result() + except Exception as e: + log.error(f"Worker 异常: {e}") # 检查是否被停止 stopped = False @@ -3856,35 +3865,12 @@ class ProvisionerBot: @admin_only async def cmd_autogptplus(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """AutoGPTPlus 配置管理 - 交互式菜单""" - keyboard = [ - [ - InlineKeyboardButton("📋 查看配置", callback_data="autogptplus:config"), - InlineKeyboardButton("🔑 设置 Token", callback_data="autogptplus:set_token"), - ], - [ - InlineKeyboardButton("📧 域名管理", callback_data="autogptplus:domains"), - InlineKeyboardButton("💳 IBAN 管理", callback_data="autogptplus:ibans"), - ], - [ - InlineKeyboardButton("🎭 随机指纹", callback_data="autogptplus:fingerprint"), - InlineKeyboardButton("📊 统计信息", callback_data="autogptplus:stats"), - ], - [ - InlineKeyboardButton("📧 测试邮件", callback_data="autogptplus:test_email"), - InlineKeyboardButton("🔄 测试 API", callback_data="autogptplus:test_api"), - ], - [ - InlineKeyboardButton("🚀 开始注册", callback_data="autogptplus:register"), - ], - ] - reply_markup = InlineKeyboardMarkup(keyboard) - await update.message.reply_text( "🤖 AutoGPTPlus 管理面板\n\n" "ChatGPT 订阅自动化配置管理\n\n" "请选择功能:", parse_mode="HTML", - reply_markup=reply_markup + reply_markup=self._get_autogptplus_main_keyboard() ) def _get_autogptplus_main_keyboard(self):