""" /go 命令处理器 - 生成支付链接并发送 JSON 文件 """ import asyncio from telegram import Update from telegram.ext import ContextTypes from bot.middlewares.auth import require_auth from bot.services.task_manager import TaskStatus from bot.services.file_sender import send_results_as_json from bot.handlers.settings import get_user_settings, DEFAULT_WORKERS PLAN_MAPPING = { "plus": "chatgptplusplan", "pro": "chatgptproplan", "chatgptplusplan": "chatgptplusplan", "chatgptproplan": "chatgptproplan", } @require_auth async def go_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """ 处理 /go 命令 - 注册账号并生成支付链接 用法: /go - 生成 1 个 Plus 支付链接 /go 5 - 生成 5 个 Plus 支付链接 /go 5 plus - 生成 5 个 Plus 支付链接 /go 3 pro - 生成 3 个 Pro 支付链接 """ # 解析参数 num_accounts = 1 plan_input = "plus" if context.args: # 第一个参数:数量 try: num_accounts = int(context.args[0]) if num_accounts < 1: num_accounts = 1 elif num_accounts > 10: await update.message.reply_text( "⚠️ 单次最多处理 10 个账号,已自动调整为 10" ) num_accounts = 10 except ValueError: # 可能第一个参数是 plan if context.args[0].lower() in PLAN_MAPPING: plan_input = context.args[0].lower() else: await update.message.reply_text( "❌ 无效参数\n\n" "用法: `/go [数量] [plan]`\n" "示例:\n" "• `/go` - 1个 Plus 计划\n" "• `/go 5` - 5个 Plus 计划\n" "• `/go 3 pro` - 3个 Pro 计划", parse_mode="Markdown", ) return # 第二个参数:plan if len(context.args) > 1: plan_input = context.args[1].lower() # 验证 plan plan_name = PLAN_MAPPING.get(plan_input) if not plan_name: await update.message.reply_text( f"❌ 无效的订阅计划: `{plan_input}`\n\n" f"可选值: `plus`, `pro`", parse_mode="Markdown", ) return plan_display = "Plus" if "plus" in plan_name else "Pro" # 获取配置和任务管理器 config = context.bot_data["config"] task_manager = context.bot_data["task_manager"] user_id = update.effective_user.id # 获取用户设置的并发数 user_settings = get_user_settings(context, user_id) workers = user_settings.get("workers", DEFAULT_WORKERS) # 创建任务 task_id = task_manager.create_task( user_id=user_id, task_type="go", total=num_accounts, description=f"{num_accounts} x {plan_display} 支付链接", ) await update.message.reply_text( f"🚀 **开始生成支付链接**\n\n" f"📦 计划: {plan_display}\n" f"📊 数量: {num_accounts}\n" f"👷 并发: {workers}\n" f"📋 任务 ID: `{task_id}`\n\n" f"使用 `/status {task_id}` 查看进度", parse_mode="Markdown", ) # 在后台执行工作流 asyncio.create_task( _run_go_workflow( config, task_manager, task_id, num_accounts, plan_name, workers, update, context ) ) async def _run_go_workflow( config, task_manager, task_id: str, num_accounts: int, plan_name: str, workers: int, update: Update, context: ContextTypes.DEFAULT_TYPE, ): """ 后台执行工作流:并发注册 + 登录 + 获取支付链接 """ plan_display = "Plus" if "plus" in plan_name else "Pro" semaphore = asyncio.Semaphore(workers) async def process_one(index: int): async with semaphore: account_num = index + 1 try: task_manager.update_progress( task_id, current_item=f"账号 {account_num}/{num_accounts}", ) result = await _register_with_checkout(config, account_num, plan_name) return result except Exception as e: return { "status": "failed", "failed_stage": "exception", "error": str(e), } # 并发执行所有任务 tasks = [process_one(i) for i in range(num_accounts)] results = await asyncio.gather(*tasks) # 更新进度 success_count = sum(1 for r in results if r.get("status") == "success") failed_count = len(results) - success_count task_manager.update_progress(task_id, completed=num_accounts) # 完成任务 task_manager.complete_task( task_id, status=TaskStatus.COMPLETED if failed_count == 0 else TaskStatus.PARTIAL, result={ "success": success_count, "failed": failed_count, "total": num_accounts, "plan": plan_display, }, ) # 发送 JSON 文件结果 await send_results_as_json( update=update, context=context, task_id=task_id, plan=plan_display, results=results, ) async def _register_with_checkout(config, task_id: int, plan_name: str): """ 执行完整的注册 + 登录 + checkout 流程 返回包含所有信息的结果字典 """ from core.session import OAISession from core.flow import RegisterFlow from core.login_flow import LoginFlow from core.checkout import CheckoutFlow from utils.logger import logger import re def _mask_proxy(proxy: str) -> str: return re.sub(r"://([^:]+):([^@]+)@", r"://***:***@", proxy) # 选择代理 proxy = config.proxy.get_next_proxy() if proxy: logger.info(f"[Go {task_id}] Using proxy: {_mask_proxy(proxy)}") session = None try: session = OAISession(proxy=proxy, impersonate=config.tls_impersonate) # Step 1: 注册 logger.info(f"[Go {task_id}] Step 1: Registering...") flow = RegisterFlow(session, config) reg_result = await flow.run() if reg_result.get("status") != "success": return { "status": "failed", "failed_stage": "register", "error": reg_result.get("error", "Registration failed"), } email = reg_result.get("email") password = reg_result.get("password") logger.info(f"[Go {task_id}] Registered: {email}") # Step 2: 登录获取 Token logger.info(f"[Go {task_id}] Step 2: Logging in...") login_flow = LoginFlow(session, email, password) login_result = await login_flow.run() if login_result.get("status") != "success": return { "status": "partial", "failed_stage": "login", "email": email, "password": password, "error": login_result.get("error", "Login failed"), } session.access_token = login_result.get("access_token") session.session_token = login_result.get("session_token") logger.info(f"[Go {task_id}] Logged in, token obtained") # Step 3: 获取支付链接 logger.info(f"[Go {task_id}] Step 3: Creating checkout session for {plan_name}...") checkout = CheckoutFlow(session, plan_name=plan_name) checkout_result = await checkout.create_checkout_session() result = { "status": "success", "email": email, "password": password, "access_token": login_result.get("access_token"), "plan_name": plan_name, } if checkout_result.get("status") == "success": result["checkout_session_id"] = checkout_result.get("checkout_session_id") result["checkout_url"] = checkout_result.get("url", "") result["client_secret"] = checkout_result.get("client_secret") logger.info(f"[Go {task_id}] Checkout session created") # 保存结果 from main import save_account, save_checkout_result await save_account(result, config.accounts_output_file) checkout_result["email"] = email await save_checkout_result(checkout_result) else: # Checkout 失败但账号创建成功 result["status"] = "partial" result["checkout_error"] = checkout_result.get("error") logger.warning(f"[Go {task_id}] Checkout failed: {checkout_result.get('error')}") from main import save_account await save_account(result, config.accounts_output_file) return result except Exception as e: logger.exception(f"[Go {task_id}] Unexpected error") return { "status": "failed", "failed_stage": "exception", "error": str(e), } finally: if session: try: session.close() except Exception as e: logger.warning(f"[Go {task_id}] Error closing session: {e}")