diff --git a/telegram_bot.py b/telegram_bot.py
index 58f7ae7..1a5322a 100644
--- a/telegram_bot.py
+++ b/telegram_bot.py
@@ -3439,34 +3439,43 @@ class ProvisionerBot:
)
async def _run_team_registration(self, chat_id: int, count: int, output_type: str):
- """执行 GPT Team 注册任务"""
+ """执行 GPT Team 注册任务 (支持并发)"""
from auto_gpt_team import run_single_registration_auto, cleanup_chrome_processes, get_register_mode
+ from config import CONCURRENT_ENABLED, CONCURRENT_WORKERS
import json
import threading
results = []
success_count = 0
fail_count = 0
+ results_lock = threading.Lock()
- # 获取当前注册模式
+ # 获取当前注册模式和并发数
current_mode = get_register_mode()
mode_display = "🌐 协议模式" if current_mode == "api" else "🖥️ 浏览器模式"
- # 当前步骤 (用于显示)
- current_step = ["初始化..."]
- current_account = [""]
- step_lock = threading.Lock()
+ # 确定并发数
+ workers = CONCURRENT_WORKERS if CONCURRENT_ENABLED else 1
+ workers = min(workers, count) # 不超过总数
- def step_callback(step: str):
- """步骤回调 - 更新当前步骤"""
- with step_lock:
- current_step[0] = step
+ # 当前步骤 (用于显示)
+ current_steps = {} # worker_id -> step
+ step_lock = threading.Lock()
+ completed_count = [0] # 使用列表以便在闭包中修改
+
+ def make_step_callback(worker_id):
+ """创建步骤回调"""
+ def callback(step: str):
+ with step_lock:
+ current_steps[worker_id] = step
+ return callback
# 发送开始消息
progress_msg = await self.app.bot.send_message(
chat_id,
f"🚀 开始注册\n\n"
f"模式: {mode_display}\n"
+ f"并发: {workers}\n"
f"进度: 0/{count}\n"
f"{'▱' * 20}",
parse_mode="HTML"
@@ -3475,35 +3484,40 @@ class ProvisionerBot:
# 进度更新任务
async def update_progress_loop():
"""定期更新进度消息"""
- last_step = ""
+ last_text = ""
while True:
- await asyncio.sleep(1.5) # 每 1.5 秒更新一次
+ await asyncio.sleep(1.5)
try:
- with step_lock:
- step = current_step[0]
- account = current_account[0]
+ with results_lock:
+ s_count = success_count
+ f_count = fail_count
- # 只有步骤变化时才更新
- if step != last_step:
- last_step = step
- progress = int((success_count + fail_count) / count * 20) if count > 0 else 0
- progress_bar = '▰' * progress + '▱' * (20 - progress)
-
- text = (
- f"🚀 注册中...\n\n"
- f"模式: {mode_display}\n"
- f"进度: {success_count + fail_count}/{count}\n"
- f"{progress_bar}\n\n"
- f"✅ 成功: {success_count}\n"
- f"❌ 失败: {fail_count}\n"
- )
-
- if account:
- text += f"\n⏳ 账号: {account[:20]}..."
-
- if step:
- text += f"\n ▸ {step}"
-
+ with step_lock:
+ steps_copy = dict(current_steps)
+
+ total_done = s_count + f_count
+ progress = int(total_done / count * 20) if count > 0 else 0
+ progress_bar = '▰' * progress + '▱' * (20 - progress)
+
+ text = (
+ f"🚀 注册中...\n\n"
+ f"模式: {mode_display}\n"
+ f"并发: {workers}\n"
+ f"进度: {total_done}/{count}\n"
+ f"{progress_bar}\n\n"
+ f"✅ 成功: {s_count}\n"
+ f"❌ 失败: {f_count}\n"
+ )
+
+ # 显示各 worker 状态
+ if steps_copy:
+ text += "\n工作状态:\n"
+ for wid, step in sorted(steps_copy.items()):
+ if step:
+ text += f" #{wid+1}: {step[:30]}\n"
+
+ if text != last_text:
+ last_text = text
try:
await progress_msg.edit_text(text, parse_mode="HTML")
except:
@@ -3516,85 +3530,80 @@ class ProvisionerBot:
# 启动进度更新任务
progress_task = asyncio.create_task(update_progress_loop())
- for i in range(count):
- # 检查停止请求
- try:
- import run
- if run._shutdown_requested:
- with step_lock:
- current_step[0] = "用户请求停止..."
- break
- except:
- pass
+ # 任务队列
+ task_queue = list(range(count))
+ queue_lock = threading.Lock()
+
+ def worker_task(worker_id: int):
+ """单个 worker 的任务"""
+ nonlocal success_count, fail_count
- # 执行注册
- try:
- # 使用 functools.partial 传递回调
- import functools
+ step_callback = make_step_callback(worker_id)
+
+ while True:
+ # 检查停止请求
+ try:
+ import run
+ if run._shutdown_requested:
+ step_callback("已停止")
+ break
+ except:
+ pass
- def run_with_callback():
- # 在执行前再次检查停止请求
- try:
- import run as run_module
- if run_module._shutdown_requested:
- return {"success": False, "error": "用户停止", "stopped": True}
- except:
- pass
- return run_single_registration_auto(
+ # 获取任务
+ with queue_lock:
+ if not task_queue:
+ break
+ task_idx = task_queue.pop(0)
+
+ step_callback(f"第 {task_idx + 1} 个...")
+
+ try:
+ result = run_single_registration_auto(
progress_callback=None,
step_callback=step_callback
)
+
+ with results_lock:
+ if result.get("stopped"):
+ step_callback("已停止")
+ break
+ elif result.get("success"):
+ success_count += 1
+ results.append({
+ "account": result["account"],
+ "password": result["password"],
+ "token": result["token"],
+ "account_id": result.get("account_id", "")
+ })
+ else:
+ fail_count += 1
+ log.warning(f"Worker {worker_id}: 注册失败: {result.get('error', '未知错误')}")
+ except Exception as e:
+ with results_lock:
+ fail_count += 1
+ log.error(f"Worker {worker_id}: 注册异常: {e}")
- # 更新当前账号
- with step_lock:
- current_step[0] = "生成账号信息..."
- current_account[0] = f"第 {i+1} 个"
-
- result = await asyncio.get_event_loop().run_in_executor(
- self.executor,
- run_with_callback
- )
-
- if result.get("stopped"):
- # 被 /stop 命令中断,不计入失败
- log.info("注册被用户停止")
- with step_lock:
- current_step[0] = "已停止"
- break
- elif result.get("success"):
- success_count += 1
- results.append({
- "account": result["account"],
- "password": result["password"],
- "token": result["token"],
- "account_id": result.get("account_id", "")
- })
- with step_lock:
- current_account[0] = result["account"]
- else:
- fail_count += 1
- log.warning(f"注册失败: {result.get('error', '未知错误')}")
- except asyncio.CancelledError:
- log.info("注册任务被取消")
- with step_lock:
- current_step[0] = "已取消"
- break
- except Exception as e:
- fail_count += 1
- log.error(f"注册异常: {e}")
+ # 清理浏览器进程
+ cleanup_chrome_processes()
- # 清理浏览器进程
- cleanup_chrome_processes()
+ # 清理 worker 状态
+ with step_lock:
+ if worker_id in current_steps:
+ del current_steps[worker_id]
+
+ # 使用线程池并发执行
+ import concurrent.futures
+
+ with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as executor:
+ futures = [executor.submit(worker_task, i) for i in range(workers)]
- # 每次注册后检查停止请求
- try:
- import run
- if run._shutdown_requested:
- with step_lock:
- current_step[0] = "用户请求停止..."
- break
- except:
- pass
+ # 等待所有任务完成
+ for future in concurrent.futures.as_completed(futures):
+ try:
+ future.result()
+ except Exception as e:
+ log.error(f"Worker 异常: {e}")
# 检查是否被停止
stopped = False
@@ -3856,35 +3865,12 @@ class ProvisionerBot:
@admin_only
async def cmd_autogptplus(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""AutoGPTPlus 配置管理 - 交互式菜单"""
- keyboard = [
- [
- InlineKeyboardButton("📋 查看配置", callback_data="autogptplus:config"),
- InlineKeyboardButton("🔑 设置 Token", callback_data="autogptplus:set_token"),
- ],
- [
- InlineKeyboardButton("📧 域名管理", callback_data="autogptplus:domains"),
- InlineKeyboardButton("💳 IBAN 管理", callback_data="autogptplus:ibans"),
- ],
- [
- InlineKeyboardButton("🎭 随机指纹", callback_data="autogptplus:fingerprint"),
- InlineKeyboardButton("📊 统计信息", callback_data="autogptplus:stats"),
- ],
- [
- InlineKeyboardButton("📧 测试邮件", callback_data="autogptplus:test_email"),
- InlineKeyboardButton("🔄 测试 API", callback_data="autogptplus:test_api"),
- ],
- [
- InlineKeyboardButton("🚀 开始注册", callback_data="autogptplus:register"),
- ],
- ]
- reply_markup = InlineKeyboardMarkup(keyboard)
-
await update.message.reply_text(
"🤖 AutoGPTPlus 管理面板\n\n"
"ChatGPT 订阅自动化配置管理\n\n"
"请选择功能:",
parse_mode="HTML",
- reply_markup=reply_markup
+ reply_markup=self._get_autogptplus_main_keyboard()
)
def _get_autogptplus_main_keyboard(self):