Files
autoPlus/bot/handlers/go.py
2026-01-30 10:48:56 +08:00

291 lines
9.2 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
/go 命令处理器 - 生成支付链接并发送 JSON 文件
"""
import asyncio
from telegram import Update
from telegram.ext import ContextTypes
from bot.middlewares.auth import require_auth
from bot.services.task_manager import TaskStatus
from bot.services.file_sender import send_results_as_json
from bot.handlers.settings import get_user_settings, DEFAULT_WORKERS
PLAN_MAPPING = {
"plus": "chatgptplusplan",
"pro": "chatgptproplan",
"chatgptplusplan": "chatgptplusplan",
"chatgptproplan": "chatgptproplan",
}
@require_auth
async def go_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""
处理 /go 命令 - 注册账号并生成支付链接
用法:
/go - 生成 1 个 Plus 支付链接
/go 5 - 生成 5 个 Plus 支付链接
/go 5 plus - 生成 5 个 Plus 支付链接
/go 3 pro - 生成 3 个 Pro 支付链接
"""
# 解析参数
num_accounts = 1
plan_input = "plus"
if context.args:
# 第一个参数:数量
try:
num_accounts = int(context.args[0])
if num_accounts < 1:
num_accounts = 1
elif num_accounts > 10:
await update.message.reply_text(
"⚠️ 单次最多处理 10 个账号,已自动调整为 10"
)
num_accounts = 10
except ValueError:
# 可能第一个参数是 plan
if context.args[0].lower() in PLAN_MAPPING:
plan_input = context.args[0].lower()
else:
await update.message.reply_text(
"❌ 无效参数\n\n"
"用法: `/go [数量] [plan]`\n"
"示例:\n"
"• `/go` - 1个 Plus 计划\n"
"• `/go 5` - 5个 Plus 计划\n"
"• `/go 3 pro` - 3个 Pro 计划",
parse_mode="Markdown",
)
return
# 第二个参数plan
if len(context.args) > 1:
plan_input = context.args[1].lower()
# 验证 plan
plan_name = PLAN_MAPPING.get(plan_input)
if not plan_name:
await update.message.reply_text(
f"❌ 无效的订阅计划: `{plan_input}`\n\n" f"可选值: `plus`, `pro`",
parse_mode="Markdown",
)
return
plan_display = "Plus" if "plus" in plan_name else "Pro"
# 获取配置和任务管理器
config = context.bot_data["config"]
task_manager = context.bot_data["task_manager"]
user_id = update.effective_user.id
# 获取用户设置的并发数
user_settings = get_user_settings(context, user_id)
workers = user_settings.get("workers", DEFAULT_WORKERS)
# 创建任务
task_id = task_manager.create_task(
user_id=user_id,
task_type="go",
total=num_accounts,
description=f"{num_accounts} x {plan_display} 支付链接",
)
await update.message.reply_text(
f"🚀 **开始生成支付链接**\n\n"
f"📦 计划: {plan_display}\n"
f"📊 数量: {num_accounts}\n"
f"👷 并发: {workers}\n"
f"📋 任务 ID: `{task_id}`\n\n"
f"使用 `/status {task_id}` 查看进度",
parse_mode="Markdown",
)
# 在后台执行工作流
asyncio.create_task(
_run_go_workflow(
config, task_manager, task_id, num_accounts, plan_name, workers, update, context
)
)
async def _run_go_workflow(
config,
task_manager,
task_id: str,
num_accounts: int,
plan_name: str,
workers: int,
update: Update,
context: ContextTypes.DEFAULT_TYPE,
):
"""
后台执行工作流:并发注册 + 登录 + 获取支付链接
"""
plan_display = "Plus" if "plus" in plan_name else "Pro"
semaphore = asyncio.Semaphore(workers)
async def process_one(index: int):
async with semaphore:
account_num = index + 1
try:
task_manager.update_progress(
task_id,
current_item=f"账号 {account_num}/{num_accounts}",
)
result = await _register_with_checkout(config, account_num, plan_name)
return result
except Exception as e:
return {
"status": "failed",
"failed_stage": "exception",
"error": str(e),
}
# 并发执行所有任务
tasks = [process_one(i) for i in range(num_accounts)]
results = await asyncio.gather(*tasks)
# 更新进度
success_count = sum(1 for r in results if r.get("status") == "success")
failed_count = len(results) - success_count
task_manager.update_progress(task_id, completed=num_accounts)
# 完成任务
task_manager.complete_task(
task_id,
status=TaskStatus.COMPLETED if failed_count == 0 else TaskStatus.PARTIAL,
result={
"success": success_count,
"failed": failed_count,
"total": num_accounts,
"plan": plan_display,
},
)
# 发送 JSON 文件结果
await send_results_as_json(
update=update,
context=context,
task_id=task_id,
plan=plan_display,
results=results,
)
async def _register_with_checkout(config, task_id: int, plan_name: str):
"""
执行完整的注册 + 登录 + checkout 流程
返回包含所有信息的结果字典
"""
from core.session import OAISession
from core.flow import RegisterFlow
from core.login_flow import LoginFlow
from core.checkout import CheckoutFlow
from utils.logger import logger
import re
def _mask_proxy(proxy: str) -> str:
return re.sub(r"://([^:]+):([^@]+)@", r"://***:***@", proxy)
# 选择代理
proxy = config.proxy.get_next_proxy()
if proxy:
logger.info(f"[Go {task_id}] Using proxy: {_mask_proxy(proxy)}")
session = None
try:
session = OAISession(proxy=proxy, impersonate=config.tls_impersonate)
# Step 1: 注册
logger.info(f"[Go {task_id}] Step 1: Registering...")
flow = RegisterFlow(session, config)
reg_result = await flow.run()
if reg_result.get("status") != "success":
return {
"status": "failed",
"failed_stage": "register",
"error": reg_result.get("error", "Registration failed"),
}
email = reg_result.get("email")
password = reg_result.get("password")
logger.info(f"[Go {task_id}] Registered: {email}")
# Step 2: 登录获取 Token
logger.info(f"[Go {task_id}] Step 2: Logging in...")
login_flow = LoginFlow(session, email, password)
login_result = await login_flow.run()
if login_result.get("status") != "success":
return {
"status": "partial",
"failed_stage": "login",
"email": email,
"password": password,
"error": login_result.get("error", "Login failed"),
}
session.access_token = login_result.get("access_token")
session.session_token = login_result.get("session_token")
logger.info(f"[Go {task_id}] Logged in, token obtained")
# Step 3: 获取支付链接
logger.info(f"[Go {task_id}] Step 3: Creating checkout session for {plan_name}...")
checkout = CheckoutFlow(session, plan_name=plan_name)
checkout_result = await checkout.create_checkout_session()
result = {
"status": "success",
"email": email,
"password": password,
"access_token": login_result.get("access_token"),
"plan_name": plan_name,
}
if checkout_result.get("status") == "success":
result["checkout_session_id"] = checkout_result.get("checkout_session_id")
result["checkout_url"] = checkout_result.get("url", "")
result["client_secret"] = checkout_result.get("client_secret")
logger.info(f"[Go {task_id}] Checkout session created")
# 保存结果
from main import save_account, save_checkout_result
await save_account(result, config.accounts_output_file)
checkout_result["email"] = email
await save_checkout_result(checkout_result)
else:
# Checkout 失败但账号创建成功
result["status"] = "partial"
result["checkout_error"] = checkout_result.get("error")
logger.warning(f"[Go {task_id}] Checkout failed: {checkout_result.get('error')}")
from main import save_account
await save_account(result, config.accounts_output_file)
return result
except Exception as e:
logger.exception(f"[Go {task_id}] Unexpected error")
return {
"status": "failed",
"failed_stage": "exception",
"error": str(e),
}
finally:
if session:
try:
session.close()
except Exception as e:
logger.warning(f"[Go {task_id}] Error closing session: {e}")