feat(telegram_bot): Add concurrent team registration with multi-worker support

- Implement concurrent registration processing with configurable worker count
- Add CONCURRENT_ENABLED and CONCURRENT_WORKERS config options for parallel execution
- Replace single-threaded loop with task queue and worker thread pool architecture
- Add per-worker step tracking and status display in progress messages
- Implement thread-safe result collection with results_lock for concurrent access
- Update progress UI to show individual worker status and concurrent worker count
- Refactor step callback to support multiple workers with worker_id tracking
- Add graceful shutdown handling for concurrent workers
- Improve progress message updates to only refresh when content changes
- Optimize performance by allowing multiple registrations to run in parallel
This commit is contained in:
2026-01-30 10:50:25 +08:00
parent 85949d8ede
commit e4f330500d

View File

@@ -3439,34 +3439,43 @@ class ProvisionerBot:
)
async def _run_team_registration(self, chat_id: int, count: int, output_type: str):
"""执行 GPT Team 注册任务"""
"""执行 GPT Team 注册任务 (支持并发)"""
from auto_gpt_team import run_single_registration_auto, cleanup_chrome_processes, get_register_mode
from config import CONCURRENT_ENABLED, CONCURRENT_WORKERS
import json
import threading
results = []
success_count = 0
fail_count = 0
results_lock = threading.Lock()
# 获取当前注册模式
# 获取当前注册模式和并发数
current_mode = get_register_mode()
mode_display = "🌐 协议模式" if current_mode == "api" else "🖥️ 浏览器模式"
# 当前步骤 (用于显示)
current_step = ["初始化..."]
current_account = [""]
step_lock = threading.Lock()
# 确定并发数
workers = CONCURRENT_WORKERS if CONCURRENT_ENABLED else 1
workers = min(workers, count) # 不超过总数
def step_callback(step: str):
"""步骤回调 - 更新当前步骤"""
with step_lock:
current_step[0] = step
# 当前步骤 (用于显示)
current_steps = {} # worker_id -> step
step_lock = threading.Lock()
completed_count = [0] # 使用列表以便在闭包中修改
def make_step_callback(worker_id):
"""创建步骤回调"""
def callback(step: str):
with step_lock:
current_steps[worker_id] = step
return callback
# 发送开始消息
progress_msg = await self.app.bot.send_message(
chat_id,
f"<b>🚀 开始注册</b>\n\n"
f"模式: {mode_display}\n"
f"并发: {workers}\n"
f"进度: 0/{count}\n"
f"{'' * 20}",
parse_mode="HTML"
@@ -3475,35 +3484,40 @@ class ProvisionerBot:
# 进度更新任务
async def update_progress_loop():
"""定期更新进度消息"""
last_step = ""
last_text = ""
while True:
await asyncio.sleep(1.5) # 每 1.5 秒更新一次
await asyncio.sleep(1.5)
try:
with results_lock:
s_count = success_count
f_count = fail_count
with step_lock:
step = current_step[0]
account = current_account[0]
steps_copy = dict(current_steps)
# 只有步骤变化时才更新
if step != last_step:
last_step = step
progress = int((success_count + fail_count) / count * 20) if count > 0 else 0
progress_bar = '' * progress + '' * (20 - progress)
total_done = s_count + f_count
progress = int(total_done / count * 20) if count > 0 else 0
progress_bar = '' * progress + '' * (20 - progress)
text = (
f"<b>🚀 注册中...</b>\n\n"
f"模式: {mode_display}\n"
f"进度: {success_count + fail_count}/{count}\n"
f"{progress_bar}\n\n"
f"✅ 成功: {success_count}\n"
f"❌ 失败: {fail_count}\n"
)
text = (
f"<b>🚀 注册中...</b>\n\n"
f"模式: {mode_display}\n"
f"并发: {workers}\n"
f"进度: {total_done}/{count}\n"
f"{progress_bar}\n\n"
f"✅ 成功: {s_count}\n"
f"❌ 失败: {f_count}\n"
)
if account:
text += f"\n⏳ 账号: <code>{account[:20]}...</code>"
if step:
text += f"\n{step}"
# 显示各 worker 状态
if steps_copy:
text += "\n<b>工作状态:</b>\n"
for wid, step in sorted(steps_copy.items()):
if step:
text += f" #{wid+1}: {step[:30]}\n"
if text != last_text:
last_text = text
try:
await progress_msg.edit_text(text, parse_mode="HTML")
except:
@@ -3516,85 +3530,80 @@ class ProvisionerBot:
# 启动进度更新任务
progress_task = asyncio.create_task(update_progress_loop())
for i in range(count):
# 检查停止请求
try:
import run
if run._shutdown_requested:
with step_lock:
current_step[0] = "用户请求停止..."
break
except:
pass
# 任务队列
task_queue = list(range(count))
queue_lock = threading.Lock()
# 执行注册
try:
# 使用 functools.partial 传递回调
import functools
def worker_task(worker_id: int):
"""单个 worker 的任务"""
nonlocal success_count, fail_count
def run_with_callback():
# 在执行前再次检查停止请求
try:
import run as run_module
if run_module._shutdown_requested:
return {"success": False, "error": "用户停止", "stopped": True}
except:
pass
return run_single_registration_auto(
step_callback = make_step_callback(worker_id)
while True:
# 检查停止请求
try:
import run
if run._shutdown_requested:
step_callback("已停止")
break
except:
pass
# 获取任务
with queue_lock:
if not task_queue:
break
task_idx = task_queue.pop(0)
step_callback(f"{task_idx + 1} 个...")
try:
result = run_single_registration_auto(
progress_callback=None,
step_callback=step_callback
)
# 更新当前账号
with step_lock:
current_step[0] = "生成账号信息..."
current_account[0] = f"{i+1}"
with results_lock:
if result.get("stopped"):
step_callback("已停止")
break
elif result.get("success"):
success_count += 1
results.append({
"account": result["account"],
"password": result["password"],
"token": result["token"],
"account_id": result.get("account_id", "")
})
else:
fail_count += 1
log.warning(f"Worker {worker_id}: 注册失败: {result.get('error', '未知错误')}")
except Exception as e:
with results_lock:
fail_count += 1
log.error(f"Worker {worker_id}: 注册异常: {e}")
result = await asyncio.get_event_loop().run_in_executor(
self.executor,
run_with_callback
)
# 清理浏览器进程
cleanup_chrome_processes()
if result.get("stopped"):
# 被 /stop 命令中断,不计入失败
log.info("注册被用户停止")
with step_lock:
current_step[0] = "已停止"
break
elif result.get("success"):
success_count += 1
results.append({
"account": result["account"],
"password": result["password"],
"token": result["token"],
"account_id": result.get("account_id", "")
})
with step_lock:
current_account[0] = result["account"]
else:
fail_count += 1
log.warning(f"注册失败: {result.get('error', '未知错误')}")
except asyncio.CancelledError:
log.info("注册任务被取消")
with step_lock:
current_step[0] = "已取消"
break
except Exception as e:
fail_count += 1
log.error(f"注册异常: {e}")
# 清理 worker 状态
with step_lock:
if worker_id in current_steps:
del current_steps[worker_id]
# 清理浏览器进程
cleanup_chrome_processes()
# 使用线程池并发执行
import concurrent.futures
# 每次注册后检查停止请求
try:
import run
if run._shutdown_requested:
with step_lock:
current_step[0] = "用户请求停止..."
break
except:
pass
with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as executor:
futures = [executor.submit(worker_task, i) for i in range(workers)]
# 等待所有任务完成
for future in concurrent.futures.as_completed(futures):
try:
future.result()
except Exception as e:
log.error(f"Worker 异常: {e}")
# 检查是否被停止
stopped = False
@@ -3856,35 +3865,12 @@ class ProvisionerBot:
@admin_only
async def cmd_autogptplus(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""AutoGPTPlus 配置管理 - 交互式菜单"""
keyboard = [
[
InlineKeyboardButton("📋 查看配置", callback_data="autogptplus:config"),
InlineKeyboardButton("🔑 设置 Token", callback_data="autogptplus:set_token"),
],
[
InlineKeyboardButton("📧 域名管理", callback_data="autogptplus:domains"),
InlineKeyboardButton("💳 IBAN 管理", callback_data="autogptplus:ibans"),
],
[
InlineKeyboardButton("🎭 随机指纹", callback_data="autogptplus:fingerprint"),
InlineKeyboardButton("📊 统计信息", callback_data="autogptplus:stats"),
],
[
InlineKeyboardButton("📧 测试邮件", callback_data="autogptplus:test_email"),
InlineKeyboardButton("🔄 测试 API", callback_data="autogptplus:test_api"),
],
[
InlineKeyboardButton("🚀 开始注册", callback_data="autogptplus:register"),
],
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
"<b>🤖 AutoGPTPlus 管理面板</b>\n\n"
"ChatGPT 订阅自动化配置管理\n\n"
"请选择功能:",
parse_mode="HTML",
reply_markup=reply_markup
reply_markup=self._get_autogptplus_main_keyboard()
)
def _get_autogptplus_main_keyboard(self):