# ==================== Telegram Bot 主程序 ==================== # 通过 Telegram 远程控制 OpenAI Team 批量注册任务 import asyncio import sys import time from concurrent.futures import ThreadPoolExecutor from functools import wraps from pathlib import Path from typing import Optional from telegram import Update, Bot, BotCommand, InlineKeyboardButton, InlineKeyboardMarkup from telegram.ext import ( Application, CommandHandler, MessageHandler, CallbackQueryHandler, filters, ContextTypes, ) from config import ( TELEGRAM_BOT_TOKEN, TELEGRAM_ADMIN_CHAT_IDS, TEAMS, AUTH_PROVIDER, TEAM_JSON_FILE, TEAM_TRACKER_FILE, CSV_FILE, TELEGRAM_CHECK_INTERVAL, TELEGRAM_LOW_STOCK_THRESHOLD, CONFIG_FILE, EMAIL_PROVIDER, ACCOUNTS_PER_TEAM, PROXY_ENABLED, PROXIES, S2A_API_BASE, CPA_API_BASE, CRS_API_BASE, get_gptmail_keys, add_gptmail_key, remove_gptmail_key, GPTMAIL_API_KEYS, INCLUDE_TEAM_OWNERS, reload_config, S2A_CONCURRENCY, S2A_PRIORITY, S2A_GROUP_NAMES, S2A_GROUP_IDS, S2A_ADMIN_KEY, BROWSER_RANDOM_FINGERPRINT, batch_remove_teams_by_names, ) from utils import load_team_tracker, get_all_incomplete_accounts, save_team_tracker, get_completed_teams, batch_remove_completed_teams from bot_notifier import BotNotifier, set_notifier, progress_finish from s2a_service import ( s2a_get_dashboard_stats, format_dashboard_stats, s2a_get_keys_with_usage, format_keys_usage, s2a_get_error_accounts, s2a_delete_account, s2a_batch_delete_error_accounts ) from email_service import gptmail_service, unified_create_email from logger import log def admin_only(func): """装饰器: 仅允许管理员执行命令""" @wraps(func) async def wrapper(self, update: Update, context: ContextTypes.DEFAULT_TYPE): user_id = update.effective_user.id if user_id not in TELEGRAM_ADMIN_CHAT_IDS: # 兼容 message 和 callback_query message = update.message or (update.callback_query.message if update.callback_query else None) if message: await message.reply_text("⛔ 无权限,你的 ID 不在管理员列表中") return return await func(self, update, context) return wrapper def get_message(update: Update): """获取可用于回复的 message 对象,兼容普通消息和 callback_query""" return update.message or (update.callback_query.message if update.callback_query else None) class ProvisionerBot: """OpenAI Team Provisioner Telegram Bot""" def __init__(self): self.executor = ThreadPoolExecutor(max_workers=1) self.current_task: Optional[asyncio.Task] = None self.current_team: Optional[str] = None self.app: Optional[Application] = None self.notifier: Optional[BotNotifier] = None self._shutdown_event = asyncio.Event() # JSON 导入批量进度跟踪 self._import_progress_message = None # 进度消息对象 self._import_progress_lock = asyncio.Lock() # 并发锁 self._import_batch_stats = { # 批量统计 "total_files": 0, "processed_files": 0, "total_added": 0, "total_skipped": 0, "current_file": "", "errors": [] } self._import_batch_timeout_task = None # 超时任务 async def start(self): """启动 Bot""" if not TELEGRAM_BOT_TOKEN: log.error("Telegram Bot Token not configured") return # 创建 Application self.app = Application.builder().token(TELEGRAM_BOT_TOKEN).build() # 初始化通知器 self.notifier = BotNotifier(self.app.bot, TELEGRAM_ADMIN_CHAT_IDS) set_notifier(self.notifier) # 注册命令处理器 handlers = [ ("start", self.cmd_help), ("help", self.cmd_help), ("status", self.cmd_status), ("team", self.cmd_team), ("list", self.cmd_list), ("config", self.cmd_config), ("fingerprint", self.cmd_fingerprint), ("concurrent", self.cmd_concurrent), ("run", self.cmd_run), ("run_all", self.cmd_run_all), ("resume", self.cmd_resume), ("stop", self.cmd_stop), ("logs", self.cmd_logs), ("logs_live", self.cmd_logs_live), ("logs_stop", self.cmd_logs_stop), ("dashboard", self.cmd_dashboard), ("import", self.cmd_import), ("stock", self.cmd_stock), ("gptmail_keys", self.cmd_gptmail_keys), ("gptmail_add", self.cmd_gptmail_add), ("gptmail_del", self.cmd_gptmail_del), ("iban_list", self.cmd_iban_list), ("iban_add", self.cmd_iban_add), ("iban_clear", self.cmd_iban_clear), ("domain_list", self.cmd_domain_list), ("domain_add", self.cmd_domain_add), ("domain_del", self.cmd_domain_del), ("domain_clear", self.cmd_domain_clear), ("team_fingerprint", self.cmd_team_fingerprint), ("team_register", self.cmd_team_register), ("test_email", self.cmd_test_email), ("include_owners", self.cmd_include_owners), ("reload", self.cmd_reload), ("s2a_config", self.cmd_s2a_config), ("clean", self.cmd_clean), ("clean_errors", self.cmd_clean_errors), ("clean_teams", self.cmd_clean_teams), ("keys_usage", self.cmd_keys_usage), ("autogptplus", self.cmd_autogptplus), ] for cmd, handler in handlers: self.app.add_handler(CommandHandler(cmd, handler)) # 注册文件上传处理器 (JSON 文件) self.app.add_handler(MessageHandler( filters.Document.MimeType("application/json"), self.handle_json_file )) # 注册回调查询处理器 (InlineKeyboard) self.app.add_handler(CallbackQueryHandler( self.callback_keys_usage, pattern="^keys_usage:" )) self.app.add_handler(CallbackQueryHandler( self.callback_clean_errors, pattern="^clean_errors:" )) self.app.add_handler(CallbackQueryHandler( self.callback_clean_teams, pattern="^clean_teams:" )) self.app.add_handler(CallbackQueryHandler( self.callback_team_register, pattern="^team_reg:" )) self.app.add_handler(CallbackQueryHandler( self.callback_autogptplus, pattern="^autogptplus:" )) # 注册自定义数量输入处理器 (GPT Team 注册) self.app.add_handler(MessageHandler( filters.TEXT & ~filters.COMMAND, self.handle_team_custom_count )) # 注册定时检查任务 if TELEGRAM_CHECK_INTERVAL > 0 and AUTH_PROVIDER == "s2a": self.app.job_queue.run_repeating( self.scheduled_stock_check, interval=TELEGRAM_CHECK_INTERVAL, first=60, # 启动后1分钟执行第一次 name="stock_check" ) log.info(f"Stock check scheduled every {TELEGRAM_CHECK_INTERVAL}s") # 启动通知器 await self.notifier.start() log.success("Telegram Bot started") log.info(f"Admin Chat IDs: {TELEGRAM_ADMIN_CHAT_IDS}") # 发送启动通知 await self.notifier.notify("🤖 Bot 已启动\n准备就绪,发送 /help 查看帮助") # 运行 Bot await self.app.initialize() await self.app.start() # 设置命令菜单提示 await self._set_commands() await self.app.updater.start_polling(drop_pending_updates=True) # 等待关闭信号 await self._shutdown_event.wait() # 清理 await self.app.updater.stop() await self.app.stop() await self.app.shutdown() await self.notifier.stop() def request_shutdown(self): """请求关闭 Bot""" self._shutdown_event.set() async def _set_commands(self): """设置 Bot 命令菜单提示""" commands = [ # 基础信息 BotCommand("help", "查看帮助信息"), BotCommand("list", "查看 team.json 账号列表"), BotCommand("status", "查看任务处理状态"), BotCommand("team", "查看指定 Team 详情"), BotCommand("config", "查看系统配置"), BotCommand("logs", "查看最近日志"), BotCommand("logs_live", "启用实时日志推送"), BotCommand("logs_stop", "停止实时日志推送"), # 任务控制 BotCommand("run", "处理指定 Team"), BotCommand("run_all", "处理所有 Team"), BotCommand("resume", "继续处理未完成账号"), BotCommand("stop", "停止当前任务"), # 配置管理 BotCommand("fingerprint", "开启/关闭随机指纹"), BotCommand("concurrent", "开启/关闭并发处理"), BotCommand("include_owners", "开启/关闭 Owner 入库"), BotCommand("reload", "重载配置文件"), # 清理管理 BotCommand("clean", "清理已完成账号"), BotCommand("clean_errors", "清理错误状态账号"), BotCommand("clean_teams", "清理已完成 Team"), # S2A BotCommand("dashboard", "查看 S2A 仪表盘"), BotCommand("keys_usage", "查看 API 密钥用量"), BotCommand("stock", "查看账号库存"), BotCommand("s2a_config", "配置 S2A 参数"), BotCommand("import", "导入账号到 team.json"), # GPTMail BotCommand("gptmail_keys", "查看 GPTMail API Keys"), BotCommand("gptmail_add", "添加 GPTMail API Key"), BotCommand("gptmail_del", "删除 GPTMail API Key"), BotCommand("test_email", "测试邮箱创建"), # IBAN 管理 BotCommand("iban_list", "查看 IBAN 列表"), BotCommand("iban_add", "添加 IBAN"), BotCommand("iban_clear", "清空 IBAN 列表"), # 域名管理 BotCommand("domain_list", "查看邮箱域名列表"), BotCommand("domain_add", "添加邮箱域名"), BotCommand("domain_del", "删除指定域名"), BotCommand("domain_clear", "清空域名列表"), # GPT Team BotCommand("team_fingerprint", "GPT Team 随机指纹"), BotCommand("team_register", "GPT Team 自动注册"), # AutoGPTPlus BotCommand("autogptplus", "AutoGPTPlus 管理面板"), ] try: await self.app.bot.set_my_commands(commands) log.info("Bot 命令菜单已设置") except Exception as e: log.warning(f"设置命令菜单失败: {e}") @admin_only async def cmd_help(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """显示帮助信息""" help_text = """🤖 OpenAI Team 批量注册 Bot 📋 查看信息: /list - 查看 team.json 账号列表 /status - 查看任务处理状态 /team <n> - 查看第 n 个 Team 处理详情 /config - 查看系统配置 /logs [n] - 查看最近 n 条日志 /logs_live - 启用实时日志推送 /logs_stop - 停止实时日志推送 🚀 任务控制: /run <n> - 开始处理第 n 个 Team /run_all - 开始处理所有 Team /resume - 继续处理未完成账号 /stop - 停止当前任务 ⚙️ 配置管理: /fingerprint - 开启/关闭随机指纹 /concurrent - 开启/关闭并发处理 /concurrent <n> - 设置并发数 (1-10) /include_owners - 开启/关闭 Owner 入库 /reload - 重载配置文件 (无需重启) /clean - 清理 team.json 和 tracker 数据 📊 S2A 专属: /dashboard - 查看 S2A 仪表盘 /keys_usage - 查看 API 密钥用量 /stock - 查看账号库存 /s2a_config - 配置 S2A 参数 /clean_errors - 清理错误状态账号 🧹 清理管理: /clean - 清理已完成账号 (team.json) /clean_teams - 清理已完成 Team (tracker) 📤 导入账号: /import - 导入账号到 team.json 或直接发送 JSON 文件 📧 GPTMail 管理: /gptmail_keys - 查看所有 API Keys /gptmail_add <key> - 添加 API Key /gptmail_del <key> - 删除 API Key /test_email - 测试邮箱创建 💳 IBAN 管理 (GPT Team): /iban_list - 查看 IBAN 列表 /iban_add <ibans> - 添加 IBAN (每行一个或逗号分隔) /iban_clear - 清空 IBAN 列表 📧 域名管理 (GPT Team): /domain_list - 查看邮箱域名列表 /domain_add <domains> - 添加域名 (每行一个或逗号分隔) /domain_del <domain> - 删除指定域名 /domain_clear - 清空域名列表 🤖 GPT Team: /team_fingerprint - 开启/关闭随机指纹 /team_register - 开始自动订阅注册 🔧 AutoGPTPlus: /autogptplus - ChatGPT 订阅自动化管理面板 💡 示例: /list - 查看所有待处理账号 /run 0 - 处理第一个 Team /concurrent 4 - 开启 4 并发处理 /gptmail_add my-api-key - 添加 Key /iban_add DE123...,DE456... - 添加 IBAN""" await update.message.reply_text(help_text, parse_mode="HTML") @admin_only async def cmd_status(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """查看所有 Team 状态""" tracker = load_team_tracker() teams_data = tracker.get("teams", {}) if not teams_data: await update.message.reply_text("📭 暂无数据,请先运行任务") return lines = ["📊 Team 状态总览\n"] for team_name, accounts in teams_data.items(): total = len(accounts) completed = sum(1 for a in accounts if a.get("status") == "completed") failed = sum(1 for a in accounts if "fail" in a.get("status", "").lower()) pending = total - completed - failed status_icon = "✅" if completed == total else ("❌" if failed > 0 else "⏳") lines.append( f"{status_icon} {team_name}: {completed}/{total} " f"(失败:{failed} 待处理:{pending})" ) # 当前任务状态 if self.current_task and not self.current_task.done(): lines.append(f"\n🔄 运行中: {self.current_team or '未知'}") await update.message.reply_text("\n".join(lines), parse_mode="HTML") @admin_only async def cmd_team(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """查看指定 Team 详情""" if not context.args: await update.message.reply_text("用法: /team <序号>\n示例: /team 0") return try: team_idx = int(context.args[0]) except ValueError: await update.message.reply_text("❌ 无效的序号,必须是数字") return if team_idx < 0 or team_idx >= len(TEAMS): await update.message.reply_text(f"❌ 序号超出范围,有效范围: 0-{len(TEAMS)-1}") return team = TEAMS[team_idx] team_name = team.get("name", f"Team{team_idx}") tracker = load_team_tracker() accounts = tracker.get("teams", {}).get(team_name, []) lines = [f"📁 Team {team_idx}: {team_name}\n"] lines.append(f"👤 Owner: {team.get('owner_email', '无')}") lines.append(f"📊 账号数: {len(accounts)}\n") if accounts: for acc in accounts: email = acc.get("email", "") status = acc.get("status", "unknown") role = acc.get("role", "member") icon = {"completed": "✅", "authorized": "🔐", "registered": "📝"}.get( status, "❌" if "fail" in status.lower() else "⏳" ) role_tag = " [Owner]" if role == "owner" else "" lines.append(f"{icon} {email}{role_tag}") else: lines.append("📭 暂无已处理的账号") await update.message.reply_text("\n".join(lines), parse_mode="HTML") @admin_only async def cmd_list(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """查看 team.json 中的账号列表""" if not TEAMS: await update.message.reply_text("📭 team.json 中没有账号") return # 加载 tracker 以获取成员账号完成状态 tracker = load_team_tracker() tracker_teams = tracker.get("teams", {}) lines = [f"📋 team.json 账号列表 (共 {len(TEAMS)} 个)\n"] total_completed = 0 total_accounts = 0 for i, team in enumerate(TEAMS): email = team.get("owner_email", "") team_name = team.get("name", "") has_token = "🔑" if team.get("auth_token") else "🔒" needs_login = " [需登录]" if team.get("needs_login") else "" # 从 tracker 获取该 Team 的成员完成情况 team_accounts = tracker_teams.get(team_name, []) completed_count = sum(1 for acc in team_accounts if acc.get("status") == "completed") account_count = len(team_accounts) total_completed += completed_count total_accounts += account_count # 显示完成进度 if account_count > 0: progress = f" [{completed_count}/{account_count}]" else: progress = "" lines.append(f"{i}. {has_token} {email}{progress}{needs_login}") # 统计 with_token = sum(1 for t in TEAMS if t.get("auth_token")) lines.append(f"\n📊 统计:") lines.append(f"有 Token: {with_token}/{len(TEAMS)}") lines.append(f"已完成: {total_completed}/{total_accounts}") # 消息太长时分段发送 text = "\n".join(lines) if len(text) > 4000: # 分段 for i in range(0, len(lines), 30): chunk = "\n".join(lines[i:i+30]) await update.message.reply_text(chunk, parse_mode="HTML") else: await update.message.reply_text(text, parse_mode="HTML") @admin_only async def cmd_config(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """查看当前系统配置""" from config import CONCURRENT_ENABLED, CONCURRENT_WORKERS # 授权服务地址 if AUTH_PROVIDER == "s2a": auth_url = S2A_API_BASE or "未配置" elif AUTH_PROVIDER == "cpa": auth_url = CPA_API_BASE or "未配置" else: auth_url = CRS_API_BASE or "未配置" # 代理信息 if PROXY_ENABLED and PROXIES: proxy_info = f"已启用 ({len(PROXIES)} 个)" else: proxy_info = "未启用" # Owner 入库状态 include_owners_status = "✅ 已开启" if INCLUDE_TEAM_OWNERS else "❌ 未开启" # 随机指纹状态 fingerprint_status = "✅ 已开启" if BROWSER_RANDOM_FINGERPRINT else "❌ 未开启" # 并发处理状态 if CONCURRENT_ENABLED: concurrent_status = f"✅ 已开启 ({CONCURRENT_WORKERS} 并发)" else: concurrent_status = "❌ 未开启" lines = [ "⚙️ 系统配置", "", "📧 邮箱服务", f" 提供商: {EMAIL_PROVIDER}", "", "🔐 授权服务", f" 模式: {AUTH_PROVIDER.upper()}", f" 地址: {auth_url}", f" Owner 入库: {include_owners_status}", "", "🌐 浏览器", f" 随机指纹: {fingerprint_status}", f" 并发处理: {concurrent_status}", "", "👥 账号设置", f" 每 Team 账号数: {ACCOUNTS_PER_TEAM}", f" team.json 账号: {len(TEAMS)}", "", "🔗 代理", f" 状态: {proxy_info}", "", "💡 提示:", "/fingerprint - 切换随机指纹", "/concurrent - 切换并发处理", "/include_owners - 切换 Owner 入库", "/s2a_config - 配置 S2A 参数", ] await update.message.reply_text("\n".join(lines), parse_mode="HTML") @admin_only async def cmd_fingerprint(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """切换随机指纹""" import tomli_w try: # 读取当前配置 with open(CONFIG_FILE, "rb") as f: import tomllib config = tomllib.load(f) # 获取当前状态 current = config.get("browser", {}).get("random_fingerprint", True) new_value = not current # 更新配置 if "browser" not in config: config["browser"] = {} config["browser"]["random_fingerprint"] = new_value # 写回文件 with open(CONFIG_FILE, "wb") as f: tomli_w.dump(config, f) status = "✅ 已开启" if new_value else "❌ 已关闭" await update.message.reply_text( f"🎭 随机指纹\n\n" f"状态: {status}\n\n" f"开启后每次启动浏览器将随机使用不同的:\n" f"• User-Agent (Chrome 133-144)\n" f"• WebGL 显卡指纹\n" f"• 屏幕分辨率\n\n" f"💡 使用 /reload 立即生效", parse_mode="HTML" ) except ImportError: await update.message.reply_text( "❌ 缺少 tomli_w 依赖\n" "请运行: uv add tomli_w" ) except Exception as e: await update.message.reply_text(f"❌ 修改配置失败: {e}") @admin_only async def cmd_include_owners(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """切换 Owner 入库开关""" import tomli_w try: # 读取当前配置 with open(CONFIG_FILE, "rb") as f: import tomllib config = tomllib.load(f) # 获取当前状态 (顶层配置) current = config.get("include_team_owners", False) new_value = not current # 更新配置 (写到顶层) config["include_team_owners"] = new_value # 写回文件 with open(CONFIG_FILE, "wb") as f: tomli_w.dump(config, f) status = "✅ 已开启" if new_value else "❌ 已关闭" desc = "运行任务时会将 Team Owner 账号也入库到授权服务" if new_value else "运行任务时不会入库 Team Owner 账号" await update.message.reply_text( f"👤 Owner 入库开关\n\n" f"状态: {status}\n" f"{desc}\n\n" f"💡 使用 /reload 立即生效", parse_mode="HTML" ) except ImportError: await update.message.reply_text( "❌ 缺少 tomli_w 依赖\n" "请运行: uv add tomli_w" ) except Exception as e: await update.message.reply_text(f"❌ 修改配置失败: {e}") @admin_only async def cmd_concurrent(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """切换并发处理开关或设置并发数""" import tomli_w from config import CONCURRENT_ENABLED, CONCURRENT_WORKERS try: # 读取当前配置 with open(CONFIG_FILE, "rb") as f: import tomllib config = tomllib.load(f) # 确保 concurrent section 存在 if "concurrent" not in config: config["concurrent"] = {"enabled": False, "workers": 4} # 解析参数 args = context.args if context.args else [] if not args: # 无参数: 切换开关 current = config["concurrent"].get("enabled", False) new_value = not current config["concurrent"]["enabled"] = new_value # 写回文件 with open(CONFIG_FILE, "wb") as f: tomli_w.dump(config, f) workers = config["concurrent"].get("workers", 4) status = "✅ 已开启" if new_value else "❌ 已关闭" await update.message.reply_text( f"⚡ 并发处理\n\n" f"状态: {status}\n" f"并发数: {workers}\n\n" f"说明:\n" f"• 注册流程并发执行\n" f"• 授权回调串行执行 (避免端口冲突)\n\n" f"💡 设置并发数:\n" f"/concurrent 4 - 设置为 4 并发\n\n" f"使用 /reload 立即生效", parse_mode="HTML" ) else: # 有参数: 设置并发数 try: workers = int(args[0]) if workers < 1 or workers > 10: await update.message.reply_text("❌ 并发数范围: 1-10") return config["concurrent"]["workers"] = workers config["concurrent"]["enabled"] = True # 设置并发数时自动开启 # 写回文件 with open(CONFIG_FILE, "wb") as f: tomli_w.dump(config, f) await update.message.reply_text( f"⚡ 并发处理\n\n" f"状态: ✅ 已开启\n" f"并发数: {workers}\n\n" f"说明:\n" f"• 注册流程并发执行\n" f"• 授权回调串行执行 (避免端口冲突)\n\n" f"💡 使用 /reload 立即生效", parse_mode="HTML" ) except ValueError: await update.message.reply_text( "❌ 无效的并发数\n\n" "用法:\n" "/concurrent - 切换开关\n" "/concurrent 4 - 设置为 4 并发" ) except ImportError: await update.message.reply_text( "❌ 缺少 tomli_w 依赖\n" "请运行: uv add tomli_w" ) except Exception as e: await update.message.reply_text(f"❌ 修改配置失败: {e}") @admin_only async def cmd_reload(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """重载配置文件""" # 检查是否有任务正在运行 if self.current_task and not self.current_task.done(): await update.message.reply_text( f"⚠️ 有任务正在运行: {self.current_team}\n" "请等待任务完成或使用 /stop 停止后再重载配置" ) return await update.message.reply_text("⏳ 正在重载配置...") try: # 调用重载函数 result = reload_config() if result["success"]: # 重新导入更新后的配置变量 from config import ( EMAIL_PROVIDER as new_email_provider, AUTH_PROVIDER as new_auth_provider, INCLUDE_TEAM_OWNERS as new_include_owners, ACCOUNTS_PER_TEAM as new_accounts_per_team, BROWSER_RANDOM_FINGERPRINT as new_random_fingerprint, ) lines = [ "✅ 配置重载成功", "", f"📁 team.json: {result['teams_count']} 个账号", f"📄 config.toml: {'已更新' if result['config_updated'] else '未变化'}", "", "当前配置:", f" 邮箱服务: {new_email_provider}", f" 授权服务: {new_auth_provider}", f" Owner 入库: {'✅' if new_include_owners else '❌'}", f" 随机指纹: {'✅' if new_random_fingerprint else '❌'}", f" 每 Team 账号: {new_accounts_per_team}", ] await update.message.reply_text("\n".join(lines), parse_mode="HTML") else: await update.message.reply_text( f"❌ 配置重载失败\n\n{result['message']}", parse_mode="HTML" ) except Exception as e: await update.message.reply_text(f"❌ 重载配置失败: {e}") @admin_only async def cmd_clean(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """清理 team.json 和 team_tracker.json 数据""" # 检查是否有任务正在运行 if self.current_task and not self.current_task.done(): await update.message.reply_text( f"⚠️ 有任务正在运行: {self.current_team}\n" "请等待任务完成或使用 /stop 停止后再清理" ) return # 解析参数 args = [a.lower() for a in context.args] if context.args else [] include_csv = "all" in args confirmed = "confirm" in args # 需要确认参数 if not confirmed: # 统计当前数据 team_count = len(TEAMS) tracker = load_team_tracker() tracker_accounts = sum(len(accs) for accs in tracker.get("teams", {}).values()) # 统计 CSV 行数 csv_count = 0 csv_path = Path(CSV_FILE) if csv_path.exists(): try: with open(csv_path, "r", encoding="utf-8") as f: csv_count = max(0, sum(1 for _ in f) - 1) # 减去表头 except: pass msg = ( f"⚠️ 确认清理数据?\n\n" f"将清空以下文件:\n" f"• team.json ({team_count} 个 Team)\n" f"• team_tracker.json ({tracker_accounts} 个账号记录)\n" ) if include_csv: msg += f"• accounts.csv ({csv_count} 条记录)\n" msg += ( f"\n此操作不可恢复!\n\n" f"确认清理请发送:\n" f"/clean confirm\n\n" f"如需同时清理 CSV:\n" f"/clean all confirm" ) await update.message.reply_text(msg, parse_mode="HTML") return # 执行清理 cleaned = [] errors = [] # 清理 team.json try: if TEAM_JSON_FILE.exists(): with open(TEAM_JSON_FILE, "w", encoding="utf-8") as f: f.write("[]") cleaned.append("team.json") except Exception as e: errors.append(f"team.json: {e}") # 清理 team_tracker.json try: tracker_file = Path(TEAM_TRACKER_FILE) if tracker_file.exists(): with open(tracker_file, "w", encoding="utf-8") as f: f.write('{"teams": {}}') cleaned.append("team_tracker.json") except Exception as e: errors.append(f"team_tracker.json: {e}") # 清理 accounts.csv (可选) if include_csv: try: csv_path = Path(CSV_FILE) if csv_path.exists(): with open(csv_path, "w", encoding="utf-8") as f: f.write("email,password,team,status,crs_id,timestamp\n") cleaned.append("accounts.csv") except Exception as e: errors.append(f"accounts.csv: {e}") # 重载配置以清空内存中的 TEAMS reload_config() # 返回结果 if errors: await update.message.reply_text( f"⚠️ 部分清理失败\n\n" f"已清理: {', '.join(cleaned) or '无'}\n" f"失败: {'; '.join(errors)}", parse_mode="HTML" ) else: await update.message.reply_text( f"✅ 清理完成\n\n" f"已清空: {', '.join(cleaned)}\n\n" f"现在可以导入新的 team.json 了", parse_mode="HTML" ) @admin_only async def cmd_s2a_config(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """配置 S2A 服务参数""" import tomli_w # 无参数时显示当前配置 if not context.args: # 脱敏显示 API Key key_display = "未配置" if S2A_ADMIN_KEY: if len(S2A_ADMIN_KEY) > 10: key_display = f"{S2A_ADMIN_KEY[:4]}...{S2A_ADMIN_KEY[-4:]}" else: key_display = S2A_ADMIN_KEY[:4] + "..." groups_display = ", ".join(S2A_GROUP_NAMES) if S2A_GROUP_NAMES else "默认分组" group_ids_display = ", ".join(str(x) for x in S2A_GROUP_IDS) if S2A_GROUP_IDS else "无" lines = [ "📊 S2A 服务配置", "", f"API 地址: {S2A_API_BASE or '未配置'}", f"Admin Key: {key_display}", f"并发数: {S2A_CONCURRENCY}", f"优先级: {S2A_PRIORITY}", f"分组名称: {groups_display}", f"分组 ID: {group_ids_display}", "", "💡 修改配置:", "/s2a_config concurrency 10", "/s2a_config priority 50", "/s2a_config groups 分组1,分组2", "/s2a_config api_base https://...", "/s2a_config admin_key sk-xxx", ] await update.message.reply_text("\n".join(lines), parse_mode="HTML") return # 解析参数 param = context.args[0].lower() value = " ".join(context.args[1:]) if len(context.args) > 1 else None if not value: await update.message.reply_text( f"❌ 缺少值\n用法: /s2a_config {param} <值>" ) return try: # 读取当前配置 with open(CONFIG_FILE, "rb") as f: import tomllib config = tomllib.load(f) # 确保 s2a section 存在 if "s2a" not in config: config["s2a"] = {} # 根据参数类型处理 updated_key = None updated_value = None if param == "concurrency": try: new_val = int(value) if new_val < 1 or new_val > 100: await update.message.reply_text("❌ 并发数范围: 1-100") return config["s2a"]["concurrency"] = new_val updated_key = "并发数" updated_value = str(new_val) except ValueError: await update.message.reply_text("❌ 并发数必须是数字") return elif param == "priority": try: new_val = int(value) if new_val < 0 or new_val > 100: await update.message.reply_text("❌ 优先级范围: 0-100") return config["s2a"]["priority"] = new_val updated_key = "优先级" updated_value = str(new_val) except ValueError: await update.message.reply_text("❌ 优先级必须是数字") return elif param in ("groups", "group_names"): # 支持逗号分隔的分组名称 groups = [g.strip() for g in value.split(",") if g.strip()] config["s2a"]["group_names"] = groups updated_key = "分组名称" updated_value = ", ".join(groups) if groups else "默认分组" elif param == "group_ids": # 支持逗号分隔的分组 ID ids = [i.strip() for i in value.split(",") if i.strip()] config["s2a"]["group_ids"] = ids updated_key = "分组 ID" updated_value = ", ".join(ids) if ids else "无" elif param == "api_base": config["s2a"]["api_base"] = value updated_key = "API 地址" updated_value = value elif param == "admin_key": config["s2a"]["admin_key"] = value updated_key = "Admin Key" # 脱敏显示 if len(value) > 10: updated_value = f"{value[:4]}...{value[-4:]}" else: updated_value = value[:4] + "..." else: await update.message.reply_text( f"❌ 未知参数: {param}\n\n" "可用参数:\n" "• concurrency - 并发数\n" "• priority - 优先级\n" "• groups - 分组名称\n" "• group_ids - 分组 ID\n" "• api_base - API 地址\n" "• admin_key - Admin Key" ) return # 写回文件 with open(CONFIG_FILE, "wb") as f: tomli_w.dump(config, f) await update.message.reply_text( f"✅ S2A 配置已更新\n\n" f"{updated_key}: {updated_value}\n\n" f"💡 使用 /reload 立即生效", parse_mode="HTML" ) except ImportError: await update.message.reply_text( "❌ 缺少 tomli_w 依赖\n" "请运行: uv add tomli_w" ) except Exception as e: await update.message.reply_text(f"❌ 修改配置失败: {e}") @admin_only async def cmd_run(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """启动处理指定 Team""" if self.current_task and not self.current_task.done(): await update.message.reply_text( f"⚠️ 任务正在运行: {self.current_team}\n使用 /stop 停止" ) return if not context.args: await update.message.reply_text("用法: /run <序号>\n示例: /run 0") return try: team_idx = int(context.args[0]) except ValueError: await update.message.reply_text("❌ 无效的序号,必须是数字") return if team_idx < 0 or team_idx >= len(TEAMS): await update.message.reply_text(f"❌ 序号超出范围,有效范围: 0-{len(TEAMS)-1}") return team_name = TEAMS[team_idx].get("name", f"Team{team_idx}") self.current_team = team_name # 重置停止标志,确保新任务可以正常运行 try: import run run._shutdown_requested = False except Exception: pass await update.message.reply_text(f"🚀 开始处理 Team {team_idx}: {team_name}...") # 在后台线程执行任务 loop = asyncio.get_event_loop() self.current_task = loop.run_in_executor( self.executor, self._run_team_task, team_idx ) # 添加完成回调 self.current_task = asyncio.ensure_future(self._wrap_task(self.current_task, team_name)) @admin_only async def cmd_run_all(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """启动处理所有 Team""" if self.current_task and not self.current_task.done(): await update.message.reply_text( f"⚠️ 任务正在运行: {self.current_team}\n使用 /stop 停止" ) return self.current_team = "全部" # 重置停止标志,确保新任务可以正常运行 try: import run run._shutdown_requested = False except Exception: pass await update.message.reply_text(f"🚀 开始处理所有 Team (共 {len(TEAMS)} 个)...") loop = asyncio.get_event_loop() self.current_task = loop.run_in_executor( self.executor, self._run_all_teams_task ) self.current_task = asyncio.ensure_future(self._wrap_task(self.current_task, "全部")) @admin_only async def cmd_resume(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """继续处理未完成的账号""" if self.current_task and not self.current_task.done(): await update.message.reply_text( f"⚠️ 任务正在运行: {self.current_team}\n使用 /stop 停止" ) return # 检查是否有未完成的账号 tracker = load_team_tracker() all_incomplete = get_all_incomplete_accounts(tracker) if not all_incomplete: await update.message.reply_text("✅ 没有待处理的账号,所有任务已完成") return # 统计未完成账号 total_incomplete = sum(len(accs) for accs in all_incomplete.values()) teams_count = len(all_incomplete) # 构建消息 lines = [ f"⏳ 发现 {total_incomplete} 个未完成账号", f"涉及 {teams_count} 个 Team:", "" ] for team_name, accounts in all_incomplete.items(): lines.append(f" • {team_name}: {len(accounts)} 个") lines.append("") lines.append("🚀 开始继续处理...") await update.message.reply_text("\n".join(lines), parse_mode="HTML") # 启动任务 (run_all_teams 会自动处理未完成的账号) self.current_team = "继续处理" # 重置停止标志,确保新任务可以正常运行 try: import run run._shutdown_requested = False except Exception: pass loop = asyncio.get_event_loop() self.current_task = loop.run_in_executor( self.executor, self._run_all_teams_task ) self.current_task = asyncio.ensure_future(self._wrap_task(self.current_task, "继续处理")) async def _wrap_task(self, task, team_name: str): """包装任务以处理完成通知""" try: result = await task # 收集成功和失败的账号 success_accounts = [r.get("email") for r in (result or []) if r.get("status") == "success"] failed_accounts = [r.get("email") for r in (result or []) if r.get("status") != "success"] log.info(f"任务完成: {team_name}, 成功: {len(success_accounts)}, 失败: {len(failed_accounts)}") await self.notifier.notify_task_completed(team_name, success_accounts, failed_accounts) except Exception as e: log.error(f"任务异常: {team_name}, 错误: {e}") await self.notifier.notify_error(f"任务失败: {team_name}", str(e)) finally: self.current_team = None # 清理进度跟踪 progress_finish() # 注意: 不在这里重置 _shutdown_requested # 让标志保持 True,直到下次任务启动时再重置 # 这样可以确保线程池中的任务能够正确检测到停止信号 def _run_team_task(self, team_idx: int): """执行单个 Team 任务 (在线程池中运行)""" # 延迟导入避免循环依赖 from run import run_single_team from team_service import preload_all_account_ids from utils import load_team_tracker, save_team_tracker, add_team_owners_to_tracker from config import DEFAULT_PASSWORD # 预加载 account_id preload_all_account_ids() _tracker = load_team_tracker() add_team_owners_to_tracker(_tracker, DEFAULT_PASSWORD) save_team_tracker(_tracker) return run_single_team(team_idx) def _run_all_teams_task(self): """执行所有 Team 任务 (在线程池中运行)""" from run import run_all_teams from team_service import preload_all_account_ids from utils import load_team_tracker, save_team_tracker, add_team_owners_to_tracker from config import DEFAULT_PASSWORD # 预加载 account_id preload_all_account_ids() _tracker = load_team_tracker() add_team_owners_to_tracker(_tracker, DEFAULT_PASSWORD) save_team_tracker(_tracker) return run_all_teams() @admin_only async def cmd_stop(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """强制停止当前任务""" if not self.current_task or self.current_task.done(): await update.message.reply_text("📭 当前没有运行中的任务") return task_name = self.current_team or "未知任务" await update.message.reply_text(f"🛑 正在强制停止: {task_name}...") try: # 1. 设置全局停止标志 try: import run run._shutdown_requested = True # 获取当前运行结果 current_results = run._current_results.copy() if run._current_results else [] except Exception: current_results = [] # 2. 取消 asyncio 任务 if self.current_task and not self.current_task.done(): self.current_task.cancel() # 3. 强制关闭浏览器进程 try: from browser_automation import cleanup_chrome_processes cleanup_chrome_processes() except Exception as e: log.warning(f"清理浏览器进程失败: {e}") # 4. 重置状态 self.current_team = None # 注意:不在这里重置 _shutdown_requested,让任务完成后在 _wrap_task 中重置 # 这样可以确保线程池中的任务有足够时间检测到停止信号 # 清理进度跟踪 progress_finish() # 6. 生成停止报告 report_lines = [ f"🛑 任务已停止", f"任务: {task_name}", "", ] # 本次运行结果 if current_results: success_count = sum(1 for r in current_results if r.get("status") == "success") failed_count = len(current_results) - success_count report_lines.append(f"📊 本次运行结果:") report_lines.append(f" 成功: {success_count}") report_lines.append(f" 失败: {failed_count}") report_lines.append("") # 获取未完成账号信息 tracker = load_team_tracker() all_incomplete = get_all_incomplete_accounts(tracker) if all_incomplete: total_incomplete = sum(len(accs) for accs in all_incomplete.values()) report_lines.append(f"⏳ 待继续处理: {total_incomplete} 个账号") # 显示每个 Team 的未完成账号 (最多显示 3 个 Team) shown_teams = 0 for team_name, accounts in all_incomplete.items(): if shown_teams >= 3: remaining = len(all_incomplete) - 3 report_lines.append(f" ... 还有 {remaining} 个 Team") break report_lines.append(f" {team_name}: {len(accounts)} 个") # 显示第一个待处理账号 if accounts: first_acc = accounts[0] report_lines.append(f" 下一个: {first_acc['email']}") shown_teams += 1 report_lines.append("") report_lines.append("💡 使用 /resume 继续处理") else: report_lines.append("✅ 没有待处理的账号") await update.message.reply_text("\n".join(report_lines), parse_mode="HTML") except Exception as e: await update.message.reply_text(f"❌ 停止任务时出错: {e}") @admin_only async def cmd_logs(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """查看最近日志""" try: n = int(context.args[0]) if context.args else 10 except ValueError: n = 10 n = min(n, 50) # 限制最大条数 try: from config import BASE_DIR log_file = BASE_DIR / "logs" / "app.log" if not log_file.exists(): await update.message.reply_text("📭 日志文件不存在") return with open(log_file, "r", encoding="utf-8", errors="ignore") as f: lines = f.readlines() recent = lines[-n:] if len(lines) >= n else lines if not recent: await update.message.reply_text("📭 日志文件为空") return # 格式化日志 (移除 ANSI 颜色码) import re ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])') clean_lines = [ansi_escape.sub('', line.strip()) for line in recent] log_text = "\n".join(clean_lines) if len(log_text) > 4000: log_text = log_text[-4000:] await update.message.reply_text(f"{log_text}", parse_mode="HTML") except Exception as e: await update.message.reply_text(f"❌ 读取日志失败: {e}") @admin_only async def cmd_logs_live(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """启用实时日志推送""" from logger import log if log.is_telegram_logging_enabled(): await update.message.reply_text("📡 实时日志已经在运行中") return # 启用 Telegram 日志推送 def log_callback(message: str, level: str): """日志回调函数""" if self.notifier: self.notifier.queue_message(message, level) log.enable_telegram_logging(log_callback) await update.message.reply_text( "✅ 实时日志已启用\n" "所有日志将实时推送到此聊天\n" "使用 /logs_stop 停止推送" ) @admin_only async def cmd_logs_stop(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """停止实时日志推送""" from logger import log if not log.is_telegram_logging_enabled(): await update.message.reply_text("📭 实时日志未启用") return log.disable_telegram_logging() await update.message.reply_text("✅ 实时日志已停止") @admin_only async def cmd_dashboard(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """查看 S2A 仪表盘统计""" if AUTH_PROVIDER != "s2a": await update.message.reply_text( f"⚠️ 仪表盘仅支持 S2A 模式\n" f"当前模式: {AUTH_PROVIDER}" ) return await update.message.reply_text("⏳ 正在获取仪表盘数据...") try: stats = s2a_get_dashboard_stats() if stats: text = format_dashboard_stats(stats) await update.message.reply_text(text, parse_mode="HTML") else: await update.message.reply_text( "❌ 获取仪表盘数据失败\n" "请检查 S2A 配置和 API 连接" ) except Exception as e: await update.message.reply_text(f"❌ 错误: {e}") @admin_only async def cmd_keys_usage(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """查看 API 密钥用量 - 显示时间选择菜单""" if AUTH_PROVIDER != "s2a": await update.message.reply_text( f"⚠️ 密钥用量查询仅支持 S2A 模式\n" f"当前模式: {AUTH_PROVIDER}" ) return # 创建时间选择按钮 keyboard = [ [ InlineKeyboardButton("📍 今天", callback_data="keys_usage:today"), InlineKeyboardButton("◀ 昨天", callback_data="keys_usage:yesterday"), ], [ InlineKeyboardButton("◀ 近 7 天", callback_data="keys_usage:7d"), InlineKeyboardButton("◀ 近 14 天", callback_data="keys_usage:14d"), ], [ InlineKeyboardButton("◀ 近 30 天", callback_data="keys_usage:30d"), InlineKeyboardButton("📅 本月", callback_data="keys_usage:this_month"), ], [ InlineKeyboardButton("📅 上月", callback_data="keys_usage:last_month"), ], ] reply_markup = InlineKeyboardMarkup(keyboard) await update.message.reply_text( "🔑 API 密钥用量查询\n\n请选择时间范围:", reply_markup=reply_markup, parse_mode="HTML" ) async def callback_keys_usage(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """处理密钥用量查询的回调""" query = update.callback_query await query.answer() # 验证权限 user_id = update.effective_user.id if user_id not in TELEGRAM_ADMIN_CHAT_IDS: await query.edit_message_text("⛔ 无权限") return from datetime import datetime, timedelta # 解析回调数据 data = query.data.replace("keys_usage:", "") today = datetime.now() if data == "today": start_date = today.strftime("%Y-%m-%d") end_date = today.strftime("%Y-%m-%d") period_text = "今天" elif data == "yesterday": yesterday = today - timedelta(days=1) start_date = yesterday.strftime("%Y-%m-%d") end_date = yesterday.strftime("%Y-%m-%d") period_text = "昨天" elif data == "7d": start_date = (today - timedelta(days=6)).strftime("%Y-%m-%d") end_date = today.strftime("%Y-%m-%d") period_text = "近 7 天" elif data == "14d": start_date = (today - timedelta(days=13)).strftime("%Y-%m-%d") end_date = today.strftime("%Y-%m-%d") period_text = "近 14 天" elif data == "30d": start_date = (today - timedelta(days=29)).strftime("%Y-%m-%d") end_date = today.strftime("%Y-%m-%d") period_text = "近 30 天" elif data == "this_month": start_date = today.replace(day=1).strftime("%Y-%m-%d") end_date = today.strftime("%Y-%m-%d") period_text = "本月" elif data == "last_month": first_of_this_month = today.replace(day=1) last_month_end = first_of_this_month - timedelta(days=1) last_month_start = last_month_end.replace(day=1) start_date = last_month_start.strftime("%Y-%m-%d") end_date = last_month_end.strftime("%Y-%m-%d") period_text = "上月" else: await query.edit_message_text("❌ 未知的时间选项") return await query.edit_message_text(f"⏳ 正在获取密钥用量 ({period_text})...") try: keys = s2a_get_keys_with_usage(start_date, end_date) if keys is not None: text = format_keys_usage(keys, period_text) # 消息太长时分段发送 if len(text) > 4000: # 先更新原消息 await query.edit_message_text(text[:4000], parse_mode="HTML") # 剩余部分新消息发送 remaining = text[4000:] while remaining: chunk = remaining[:4000] remaining = remaining[4000:] await context.bot.send_message( chat_id=update.effective_chat.id, text=chunk, parse_mode="HTML" ) else: await query.edit_message_text(text, parse_mode="HTML") else: await query.edit_message_text( "❌ 获取密钥用量失败\n" "请检查 S2A 配置和 API 连接" ) except Exception as e: await query.edit_message_text(f"❌ 错误: {e}") @admin_only async def cmd_stock(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """查看账号存货""" if AUTH_PROVIDER != "s2a": await update.message.reply_text( f"⚠️ 库存查询仅支持 S2A 模式\n" f"当前模式: {AUTH_PROVIDER}" ) return stats = s2a_get_dashboard_stats() if not stats: await update.message.reply_text("❌ 获取库存信息失败") return text = self._format_stock_message(stats) await update.message.reply_text(text, parse_mode="HTML") async def scheduled_stock_check(self, context: ContextTypes.DEFAULT_TYPE): """定时检查账号存货""" try: stats = s2a_get_dashboard_stats() if not stats: return normal = stats.get("normal_accounts", 0) total = stats.get("total_accounts", 0) # 只在低库存时发送通知 if normal <= TELEGRAM_LOW_STOCK_THRESHOLD: text = self._format_stock_message(stats, is_alert=True) for chat_id in TELEGRAM_ADMIN_CHAT_IDS: try: await context.bot.send_message( chat_id=chat_id, text=text, parse_mode="HTML" ) except Exception: pass except Exception as e: log.warning(f"Stock check failed: {e}") def _format_stock_message(self, stats: dict, is_alert: bool = False) -> str: """格式化存货消息""" total = stats.get("total_accounts", 0) normal = stats.get("normal_accounts", 0) error = stats.get("error_accounts", 0) ratelimit = stats.get("ratelimit_accounts", 0) overload = stats.get("overload_accounts", 0) # 计算健康度 health_pct = (normal / total * 100) if total > 0 else 0 # 状态图标 if normal <= TELEGRAM_LOW_STOCK_THRESHOLD: status_icon = "⚠️ 库存不足" status_line = f"{status_icon}" elif health_pct >= 80: status_icon = "✅ 正常" status_line = f"{status_icon}" elif health_pct >= 50: status_icon = "⚠️ 警告" status_line = f"{status_icon}" else: status_icon = "🔴 严重" status_line = f"{status_icon}" title = "🚨 库存不足警报" if is_alert else "📦 账号库存" lines = [ f"{title}", "", f"状态: {status_line}", f"健康度: {health_pct:.1f}%", "", f"正常: {normal}", f"异常: {error}", f"限流: {ratelimit}", f"总计: {total}", ] if is_alert: lines.append("") lines.append(f"预警阈值: {TELEGRAM_LOW_STOCK_THRESHOLD}") return "\n".join(lines) @admin_only async def cmd_clean_errors(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """清理错误状态的账号""" if AUTH_PROVIDER != "s2a": await update.message.reply_text( f"⚠️ 清理错误账号仅支持 S2A 模式\n" f"当前模式: {AUTH_PROVIDER}" ) return # 获取错误账号 error_accounts, total = s2a_get_error_accounts() if total == 0: await update.message.reply_text("✅ 没有错误状态的账号需要清理") return # 存储账号数据到 context.bot_data 供分页使用 context.bot_data["clean_errors_accounts"] = error_accounts context.bot_data["clean_errors_total"] = total # 显示第一页 text, keyboard = self._build_clean_errors_page(error_accounts, total, page=0) await update.message.reply_text(text, reply_markup=keyboard, parse_mode="HTML") def _build_clean_errors_page(self, accounts: list, total: int, page: int = 0, page_size: int = 10): """构建错误账号预览页面""" total_pages = (total + page_size - 1) // page_size start_idx = page * page_size end_idx = min(start_idx + page_size, total) page_accounts = accounts[start_idx:end_idx] # 按错误类型分组统计(全部账号) error_types = {} for acc in accounts: error_msg = acc.get("error_message", "Unknown") error_key = error_msg[:50] if error_msg else "Unknown" error_types[error_key] = error_types.get(error_key, 0) + 1 lines = [ "🗑️ 清理错误账号 (预览)", "", f"共发现 {total} 个错误状态账号", "", "错误类型统计:", ] # 显示前5种错误类型 sorted_errors = sorted(error_types.items(), key=lambda x: x[1], reverse=True)[:5] for error_msg, count in sorted_errors: lines.append(f"• {count}x: {error_msg}...") if len(error_types) > 5: lines.append(f"• ... 还有 {len(error_types) - 5} 种其他错误") lines.extend([ "", f"账号列表 (第 {page + 1}/{total_pages} 页):", ]) # 显示当前页的账号 for i, acc in enumerate(page_accounts, start=start_idx + 1): name = acc.get("name", "Unknown")[:25] error_msg = acc.get("error_message", "")[:30] lines.append(f"{i}. {name} - {error_msg}") lines.extend([ "", "⚠️ 此操作不可撤销!", ]) text = "\n".join(lines) # 构建分页按钮 nav_buttons = [] if page > 0: nav_buttons.append(InlineKeyboardButton("⬅️ 上一页", callback_data=f"clean_errors:page:{page - 1}")) if page < total_pages - 1: nav_buttons.append(InlineKeyboardButton("下一页 ➡️", callback_data=f"clean_errors:page:{page + 1}")) keyboard = [ nav_buttons, [InlineKeyboardButton(f"🗑️ 确认删除全部 ({total})", callback_data="clean_errors:confirm")], [InlineKeyboardButton("❌ 取消", callback_data="clean_errors:cancel")], ] # 过滤空行 keyboard = [row for row in keyboard if row] return text, InlineKeyboardMarkup(keyboard) async def callback_clean_errors(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """处理清理错误账号的回调""" query = update.callback_query await query.answer() # 验证权限 user_id = update.effective_user.id if user_id not in TELEGRAM_ADMIN_CHAT_IDS: await query.edit_message_text("⛔ 无权限") return # 解析回调数据 data = query.data.replace("clean_errors:", "") if data.startswith("page:"): # 分页浏览 page = int(data.replace("page:", "")) accounts = context.bot_data.get("clean_errors_accounts", []) total = context.bot_data.get("clean_errors_total", 0) if not accounts: await query.edit_message_text("❌ 数据已过期,请重新使用 /clean_errors") return text, keyboard = self._build_clean_errors_page(accounts, total, page) await query.edit_message_text(text, reply_markup=keyboard, parse_mode="HTML") elif data == "cancel": # 取消操作 context.bot_data.pop("clean_errors_accounts", None) context.bot_data.pop("clean_errors_total", None) await query.edit_message_text("✅ 已取消清理操作") elif data == "confirm": # 执行删除 total = context.bot_data.get("clean_errors_total", 0) await query.edit_message_text( f"🗑️ 正在删除 {total} 个错误账号...\n\n" "进度: 0%", parse_mode="HTML" ) # 同步执行删除 results = s2a_batch_delete_error_accounts() # 清理缓存数据 context.bot_data.pop("clean_errors_accounts", None) context.bot_data.pop("clean_errors_total", None) # 显示结果 lines = [ "✅ 清理完成", "", f"成功删除: {results['success']}", f"删除失败: {results['failed']}", f"总计: {results['total']}", ] # 如果有失败的,显示部分失败详情 failed_details = [d for d in results.get("details", []) if d.get("status") == "failed"] if failed_details: lines.append("") lines.append("失败详情:") for detail in failed_details[:5]: lines.append(f"• {detail.get('name', '')}: {detail.get('message', '')}") if len(failed_details) > 5: lines.append(f"• ... 还有 {len(failed_details) - 5} 个失败") await query.edit_message_text("\n".join(lines), parse_mode="HTML") @admin_only async def cmd_clean_teams(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """清理处理成功的 Team""" # 获取已完成的 teams tracker = load_team_tracker() completed_teams = get_completed_teams(tracker) if not completed_teams: await update.message.reply_text("✅ 没有处理成功的 Team 需要清理") return # 存储数据供分页使用 context.bot_data["clean_teams_data"] = completed_teams context.bot_data["clean_teams_total"] = len(completed_teams) # 显示第一页 text, keyboard = self._build_clean_teams_page(completed_teams, page=0) await update.message.reply_text(text, reply_markup=keyboard, parse_mode="HTML") def _build_clean_teams_page(self, teams: list, page: int = 0, page_size: int = 10): """构建已完成 Team 预览页面""" total = len(teams) total_pages = (total + page_size - 1) // page_size start_idx = page * page_size end_idx = min(start_idx + page_size, total) page_teams = teams[start_idx:end_idx] # 统计总账号数 total_accounts = sum(t["total"] for t in teams) lines = [ "🧹 清理已完成 Team (预览)", "", f"共发现 {total} 个已完成的 Team", f"涉及 {total_accounts} 个账号记录", "", f"Team 列表 (第 {page + 1}/{total_pages} 页):", ] # 显示当前页的 Team for i, team in enumerate(page_teams, start=start_idx + 1): name = team["name"][:25] count = team["total"] lines.append(f"{i}. ✅ {name} ({count} 个账号)") lines.extend([ "", "⚠️ 此操作将同时清理:", "• team_tracker.json (账号处理记录)", "• team.json (Team 配置)", ]) text = "\n".join(lines) # 构建分页按钮 nav_buttons = [] if page > 0: nav_buttons.append(InlineKeyboardButton("⬅️ 上一页", callback_data=f"clean_teams:page:{page - 1}")) if page < total_pages - 1: nav_buttons.append(InlineKeyboardButton("下一页 ➡️", callback_data=f"clean_teams:page:{page + 1}")) keyboard = [ nav_buttons, [InlineKeyboardButton(f"🧹 确认清理全部 ({total})", callback_data="clean_teams:confirm")], [InlineKeyboardButton("❌ 取消", callback_data="clean_teams:cancel")], ] # 过滤空行 keyboard = [row for row in keyboard if row] return text, InlineKeyboardMarkup(keyboard) async def callback_clean_teams(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """处理清理已完成 Team 的回调""" query = update.callback_query await query.answer() # 验证权限 user_id = update.effective_user.id if user_id not in TELEGRAM_ADMIN_CHAT_IDS: await query.edit_message_text("⛔ 无权限") return # 解析回调数据 data = query.data.replace("clean_teams:", "") if data.startswith("page:"): # 分页浏览 page = int(data.replace("page:", "")) teams = context.bot_data.get("clean_teams_data", []) if not teams: await query.edit_message_text("❌ 数据已过期,请重新使用 /clean_teams") return text, keyboard = self._build_clean_teams_page(teams, page) await query.edit_message_text(text, reply_markup=keyboard, parse_mode="HTML") elif data == "cancel": # 取消操作 context.bot_data.pop("clean_teams_data", None) context.bot_data.pop("clean_teams_total", None) await query.edit_message_text("✅ 已取消清理操作") elif data == "confirm": # 执行清理 teams_data = context.bot_data.get("clean_teams_data", []) total = context.bot_data.get("clean_teams_total", 0) await query.edit_message_text( f"🧹 正在清理 {total} 个已完成 Team...", parse_mode="HTML" ) # 获取要删除的 team 名称列表 team_names = [t["name"] for t in teams_data] # 1. 清理 tracker tracker = load_team_tracker() tracker_results = batch_remove_completed_teams(tracker) save_team_tracker(tracker) # 2. 清理 team.json json_results = batch_remove_teams_by_names(team_names) # 清理缓存数据 context.bot_data.pop("clean_teams_data", None) context.bot_data.pop("clean_teams_total", None) # 统计清理的账号数 total_accounts = sum(d.get("accounts", 0) for d in tracker_results.get("details", []) if d.get("status") == "success") # 显示结果 lines = [ "✅ 清理完成", "", f"清理 Team: {tracker_results['success']}", f"清理账号记录: {total_accounts}", "", "📁 文件清理:", f"• tracker: {tracker_results['success']} 个 Team", f"• team.json: {json_results['success']} 个 Team", ] if tracker_results['failed'] > 0 or json_results['failed'] > 0: lines.append("") lines.append(f"失败: tracker={tracker_results['failed']}, json={json_results['failed']}") await query.edit_message_text("\n".join(lines), parse_mode="HTML") @admin_only async def cmd_import(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """上传账号到 team.json""" # 获取命令后的 JSON 数据 if not context.args: await update.message.reply_text( "📤 导入账号到 team.json\n\n" "用法:\n" "1. 直接发送 JSON 文件\n" "2. /import 后跟 JSON 数据\n\n" "JSON 格式:\n" "[{\"account\":\"邮箱\",\"password\":\"密码\",\"token\":\"jwt\"},...]\n\n" "导入后使用 /run 开始处理", parse_mode="HTML" ) return # 尝试解析 JSON json_text = " ".join(context.args) await self._process_import_json(update, json_text) def _reset_import_batch_stats(self): """重置批量导入统计""" self._import_batch_stats = { "total_files": 0, "processed_files": 0, "total_added": 0, "total_skipped": 0, "current_file": "", "errors": [], "team_json_total": 0 } def _get_import_progress_text(self, is_processing: bool = True) -> str: """生成导入进度消息文本""" stats = self._import_batch_stats if is_processing: lines = [ "⏳ 正在处理 JSON 文件...", "", f"📁 文件: {stats['processed_files']}/{stats['total_files']}", ] if stats['current_file']: lines.append(f"📄 当前: {stats['current_file']}") lines.extend([ "", f"新增: {stats['total_added']}", f"跳过 (重复): {stats['total_skipped']}", ]) else: # 完成状态 lines = [ "✅ 导入完成", "", f"📁 处理文件: {stats['processed_files']} 个", f"📄 已更新 team.json", f"新增: {stats['total_added']}", f"跳过 (重复): {stats['total_skipped']}", f"team.json 总数: {stats['team_json_total']}", ] if stats['errors']: lines.append("") lines.append(f"⚠️ 错误 ({len(stats['errors'])} 个):") for err in stats['errors'][:3]: # 最多显示3个错误 lines.append(f" • {err}") if len(stats['errors']) > 3: lines.append(f" ... 还有 {len(stats['errors']) - 3} 个错误") lines.extend([ "", "✅ 配置已自动刷新", "使用 /run_all 或 /run <n> 开始处理" ]) return "\n".join(lines) async def _update_import_progress(self, chat_id: int, is_final: bool = False): """更新导入进度消息""" text = self._get_import_progress_text(is_processing=not is_final) try: if self._import_progress_message: await self.app.bot.edit_message_text( chat_id=chat_id, message_id=self._import_progress_message.message_id, text=text, parse_mode="HTML" ) except Exception: pass # 忽略编辑失败 async def _finalize_import_batch(self, chat_id: int): """完成批量导入,发送最终结果""" async with self._import_progress_lock: if self._import_progress_message is None: return # 取消超时任务 (job_queue job) if self._import_batch_timeout_task: try: self._import_batch_timeout_task.schedule_removal() except Exception: pass self._import_batch_timeout_task = None # 更新最终进度 await self._update_import_progress(chat_id, is_final=True) # 重置状态 self._import_progress_message = None self._reset_import_batch_stats() async def _import_batch_timeout_callback(self, context: ContextTypes.DEFAULT_TYPE): """批量导入超时回调 - 由 job_queue 调用""" chat_id = context.job.data.get("chat_id") if chat_id: await self._finalize_import_batch(chat_id) def _schedule_import_finalize(self, context: ContextTypes.DEFAULT_TYPE, chat_id: int, delay: float = 1.5): """调度批量导入完成任务""" # 取消之前的超时任务 if self._import_batch_timeout_task: self._import_batch_timeout_task.schedule_removal() self._import_batch_timeout_task = None # 使用 job_queue 调度新的超时任务 self._import_batch_timeout_task = context.job_queue.run_once( self._import_batch_timeout_callback, when=delay, data={"chat_id": chat_id}, name="import_batch_finalize" ) @admin_only async def handle_json_file(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """处理上传的 JSON 文件 - 支持批量导入进度更新""" # 检查是否是管理员 user_id = update.effective_user.id if user_id not in TELEGRAM_ADMIN_CHAT_IDS: await update.message.reply_text("⛔ 无权限") return document = update.message.document if not document: return chat_id = update.effective_chat.id file_name = document.file_name or "unknown.json" async with self._import_progress_lock: # 取消之前的超时任务(如果有) if self._import_batch_timeout_task: self._import_batch_timeout_task.schedule_removal() self._import_batch_timeout_task = None # 更新统计 self._import_batch_stats["total_files"] += 1 self._import_batch_stats["current_file"] = file_name # 如果是新批次,发送初始进度消息 if self._import_progress_message is None: self._reset_import_batch_stats() self._import_batch_stats["total_files"] = 1 self._import_batch_stats["current_file"] = file_name self._import_progress_message = await update.message.reply_text( self._get_import_progress_text(is_processing=True), parse_mode="HTML" ) else: # 更新进度消息 await self._update_import_progress(chat_id) try: # 下载文件 file = await document.get_file() file_bytes = await file.download_as_bytearray() json_text = file_bytes.decode("utf-8") # 处理导入并获取结果 result = await self._process_import_json_batch(json_text) async with self._import_progress_lock: self._import_batch_stats["processed_files"] += 1 self._import_batch_stats["total_added"] += result.get("added", 0) self._import_batch_stats["total_skipped"] += result.get("skipped", 0) self._import_batch_stats["team_json_total"] = result.get("total", 0) self._import_batch_stats["current_file"] = "" if result.get("error"): self._import_batch_stats["errors"].append(f"{file_name}: {result['error']}") # 更新进度 await self._update_import_progress(chat_id) # 检查是否所有文件都已处理,如果是则调度完成任务 stats = self._import_batch_stats if stats["processed_files"] >= stats["total_files"]: # 所有文件处理完成,短延迟后完成批次(防止更多文件到来) self._schedule_import_finalize(context, chat_id, delay=1.0) else: # 还有文件在处理,设置较长的超时 self._schedule_import_finalize(context, chat_id, delay=3.0) except Exception as e: async with self._import_progress_lock: self._import_batch_stats["processed_files"] += 1 self._import_batch_stats["errors"].append(f"{file_name}: {str(e)}") self._import_batch_stats["current_file"] = "" await self._update_import_progress(chat_id) # 调度完成任务 stats = self._import_batch_stats if stats["processed_files"] >= stats["total_files"]: self._schedule_import_finalize(context, chat_id, delay=1.0) else: self._schedule_import_finalize(context, chat_id, delay=3.0) async def _process_import_json_batch(self, json_text: str) -> dict: """处理导入的 JSON 数据,保存到 team.json (批量版本,返回结果) Returns: dict: {"added": int, "skipped": int, "total": int, "error": str|None} """ import json from pathlib import Path result = {"added": 0, "skipped": 0, "total": 0, "error": None} try: new_accounts = json.loads(json_text) except json.JSONDecodeError as e: result["error"] = f"JSON 格式错误: {e}" return result if not isinstance(new_accounts, list): new_accounts = [new_accounts] if not new_accounts: result["error"] = "JSON 数据中没有账号" return result # 验证格式 valid_accounts = [] for acc in new_accounts: if not isinstance(acc, dict): continue email = acc.get("account") or acc.get("email", "") token = acc.get("token", "") password = acc.get("password", "") if email and token: valid_accounts.append({ "account": email, "password": password, "token": token }) if not valid_accounts: result["error"] = "未找到有效账号" return result # 读取现有 team.json team_json_path = Path(TEAM_JSON_FILE) existing_accounts = [] if team_json_path.exists(): try: with open(team_json_path, "r", encoding="utf-8") as f: existing_accounts = json.load(f) if not isinstance(existing_accounts, list): existing_accounts = [existing_accounts] except Exception: existing_accounts = [] # 检查重复 existing_emails = set() for acc in existing_accounts: email = acc.get("account") or acc.get("user", {}).get("email", "") if email: existing_emails.add(email.lower()) added = 0 skipped = 0 for acc in valid_accounts: email = acc.get("account", "").lower() if email in existing_emails: skipped += 1 else: existing_accounts.append(acc) existing_emails.add(email) added += 1 # 保存到 team.json try: team_json_path.parent.mkdir(parents=True, exist_ok=True) with open(team_json_path, "w", encoding="utf-8") as f: json.dump(existing_accounts, f, ensure_ascii=False, indent=2) # 重载配置 reload_config() result["added"] = added result["skipped"] = skipped result["total"] = len(existing_accounts) except Exception as e: result["error"] = f"保存失败: {e}" return result async def _process_import_json(self, update: Update, json_text: str): """处理导入的 JSON 数据,保存到 team.json""" import json from pathlib import Path try: new_accounts = json.loads(json_text) except json.JSONDecodeError as e: await update.message.reply_text(f"❌ JSON 格式错误: {e}") return if not isinstance(new_accounts, list): # 如果是单个对象,转成列表 new_accounts = [new_accounts] if not new_accounts: await update.message.reply_text("📭 JSON 数据中没有账号") return # 验证格式 valid_accounts = [] for acc in new_accounts: if not isinstance(acc, dict): continue # 支持 account 或 email 字段 email = acc.get("account") or acc.get("email", "") token = acc.get("token", "") password = acc.get("password", "") if email and token: valid_accounts.append({ "account": email, "password": password, "token": token }) if not valid_accounts: await update.message.reply_text("❌ 未找到有效账号 (需要 account/email 和 token 字段)") return # 读取现有 team.json (不存在则自动创建) team_json_path = Path(TEAM_JSON_FILE) existing_accounts = [] is_new_file = not team_json_path.exists() if not is_new_file: try: with open(team_json_path, "r", encoding="utf-8") as f: existing_accounts = json.load(f) if not isinstance(existing_accounts, list): existing_accounts = [existing_accounts] except Exception: existing_accounts = [] # 检查重复 existing_emails = set() for acc in existing_accounts: email = acc.get("account") or acc.get("user", {}).get("email", "") if email: existing_emails.add(email.lower()) added = 0 skipped = 0 for acc in valid_accounts: email = acc.get("account", "").lower() if email in existing_emails: skipped += 1 else: existing_accounts.append(acc) existing_emails.add(email) added += 1 # 保存到 team.json (自动创建文件) try: # 确保父目录存在 team_json_path.parent.mkdir(parents=True, exist_ok=True) with open(team_json_path, "w", encoding="utf-8") as f: json.dump(existing_accounts, f, ensure_ascii=False, indent=2) file_status = "📄 已创建 team.json" if is_new_file else "📄 已更新 team.json" # 自动重载配置,同步内存状态,避免 save_team_json() 覆盖新导入的数据 reload_result = reload_config() await update.message.reply_text( f"✅ 导入完成\n\n" f"{file_status}\n" f"新增: {added}\n" f"跳过 (重复): {skipped}\n" f"team.json 总数: {len(existing_accounts)}\n\n" f"✅ 配置已自动刷新\n" f"使用 /run_all 或 /run <n> 开始处理", parse_mode="HTML" ) except Exception as e: await update.message.reply_text(f"❌ 保存到 team.json 失败: {e}") # ==================== GPTMail Key 管理命令 ==================== @admin_only async def cmd_gptmail_keys(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """查看所有 GPTMail API Keys""" keys = get_gptmail_keys() config_keys = set(GPTMAIL_API_KEYS) if not keys: await update.message.reply_text( "📧 GPTMail API Keys\n\n" "📭 暂无配置 Key\n\n" "使用 /gptmail_add <key> 添加", parse_mode="HTML" ) return lines = [f"📧 GPTMail API Keys (共 {len(keys)} 个)\n"] for i, key in enumerate(keys): # 脱敏显示 if len(key) > 10: masked = f"{key[:4]}...{key[-4:]}" else: masked = key[:4] + "..." if len(key) > 4 else key # 标记来源 source = "📁 配置" if key in config_keys else "🔧 动态" lines.append(f"{i+1}. {masked} {source}") lines.append(f"\n💡 管理:") lines.append(f"/gptmail_add <key> - 添加 Key") lines.append(f"/gptmail_del <key> - 删除动态 Key") lines.append(f"/test_email - 测试邮箱创建") await update.message.reply_text("\n".join(lines), parse_mode="HTML") @admin_only async def cmd_gptmail_add(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """添加 GPTMail API Key (支持批量导入)""" message = get_message(update) if not message: return if not context.args: await message.reply_text( "📧 添加 GPTMail API Key\n\n" "单个添加:\n" "/gptmail_add gpt-xxx\n\n" "批量添加 (空格分隔):\n" "/gptmail_add key1 key2 key3\n\n" "批量添加 (换行分隔):\n" "/gptmail_add key1\nkey2\nkey3", parse_mode="HTML" ) return # 合并所有参数,支持空格和换行分隔 raw_input = " ".join(context.args) # 按空格和换行分割 keys = [] for part in raw_input.replace("\n", " ").split(): key = part.strip() if key: keys.append(key) if not keys: await message.reply_text("❌ Key 不能为空") return # 获取现有 keys existing_keys = set(get_gptmail_keys()) # 统计结果 added = [] skipped = [] invalid = [] await message.reply_text(f"⏳ 正在验证 {len(keys)} 个 Key...") for key in keys: # 检查是否已存在 if key in existing_keys: skipped.append(key) continue # 测试 Key 是否有效 success, msg = gptmail_service.test_api_key(key) if not success: invalid.append(key) continue # 添加 Key if add_gptmail_key(key): added.append(key) existing_keys.add(key) # 生成结果报告 lines = ["📧 GPTMail Key 导入结果\n"] if added: lines.append(f"✅ 成功添加: {len(added)}") for k in added[:5]: # 最多显示5个 masked = f"{k[:4]}...{k[-4:]}" if len(k) > 10 else k lines.append(f" • {masked}") if len(added) > 5: lines.append(f" • ... 等 {len(added)} 个") if skipped: lines.append(f"\n⏭️ 已跳过 (已存在): {len(skipped)}") if invalid: lines.append(f"\n❌ 无效 Key: {len(invalid)}") for k in invalid[:3]: # 最多显示3个 masked = f"{k[:4]}...{k[-4:]}" if len(k) > 10 else k lines.append(f" • {masked}") lines.append(f"\n当前 Key 总数: {len(get_gptmail_keys())}") await message.reply_text("\n".join(lines), parse_mode="HTML") @admin_only async def cmd_gptmail_del(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """删除 GPTMail API Key""" if not context.args: await update.message.reply_text( "📧 删除 GPTMail API Key\n\n" "用法: /gptmail_del <key>\n\n" "注意: 只能删除动态添加的 Key,配置文件中的 Key 请直接修改 config.toml", parse_mode="HTML" ) return key = context.args[0].strip() # 检查是否是配置文件中的 Key if key in GPTMAIL_API_KEYS: await update.message.reply_text( "⚠️ 该 Key 在配置文件中,无法通过 Bot 删除\n" "请直接修改 config.toml" ) return # 删除 Key if remove_gptmail_key(key): await update.message.reply_text( f"✅ Key 已删除\n\n" f"当前 Key 总数: {len(get_gptmail_keys())}", parse_mode="HTML" ) else: await update.message.reply_text("❌ Key 不存在或删除失败") @admin_only async def cmd_test_email(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """测试邮箱创建功能""" if EMAIL_PROVIDER != "gptmail": await update.message.reply_text( f"⚠️ 当前邮箱提供商: {EMAIL_PROVIDER}\n" f"测试功能仅支持 GPTMail 模式" ) return await update.message.reply_text("⏳ 正在测试邮箱创建...") try: # 测试创建邮箱 email, password = unified_create_email() if email: await update.message.reply_text( f"✅ 邮箱创建成功\n\n" f"邮箱: {email}\n" f"密码: {password}\n\n" f"当前 Key 数量: {len(get_gptmail_keys())}", parse_mode="HTML" ) else: await update.message.reply_text( "❌ 邮箱创建失败\n\n" "请检查 GPTMail API Key 配置", parse_mode="HTML" ) except Exception as e: await update.message.reply_text(f"❌ 测试失败: {e}") @admin_only async def cmd_iban_list(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """查看 IBAN 列表""" try: from auto_gpt_team import get_sepa_ibans ibans = get_sepa_ibans() if not ibans: await update.message.reply_text( "💳 SEPA IBAN 列表\n\n" "📭 暂无 IBAN\n\n" "使用 /iban_add 添加 IBAN", parse_mode="HTML" ) return # 显示 IBAN 列表 lines = [f"💳 SEPA IBAN 列表 ({len(ibans)} 个)\n"] for i, iban in enumerate(ibans[:50], 1): # 最多显示 50 个 lines.append(f"{i}. {iban}") if len(ibans) > 50: lines.append(f"\n... 还有 {len(ibans) - 50} 个未显示") await update.message.reply_text("\n".join(lines), parse_mode="HTML") except ImportError: await update.message.reply_text("❌ auto_gpt_team 模块未找到") except Exception as e: await update.message.reply_text(f"❌ 获取 IBAN 列表失败: {e}") @admin_only async def cmd_iban_add(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """添加 IBAN""" if not context.args: await update.message.reply_text( "💳 添加 IBAN\n\n" "用法:\n" "/iban_add DE123... DE456...\n" "/iban_add DE123...,DE456...\n\n" "支持空格或逗号分隔,每行一个也可以", parse_mode="HTML" ) return try: from auto_gpt_team import add_sepa_ibans # 解析输入 (支持空格、逗号、换行分隔) raw_input = " ".join(context.args) # 替换逗号和换行为空格,然后按空格分割 ibans = [s.strip() for s in raw_input.replace(",", " ").replace("\n", " ").split() if s.strip()] if not ibans: await update.message.reply_text("❌ 未提供有效的 IBAN") return added, skipped, total = add_sepa_ibans(ibans) await update.message.reply_text( f"✅ IBAN 导入完成\n\n" f"新增: {added}\n" f"跳过 (重复): {skipped}\n" f"当前总数: {total}", parse_mode="HTML" ) except ImportError: await update.message.reply_text("❌ auto_gpt_team 模块未找到") except Exception as e: await update.message.reply_text(f"❌ 添加 IBAN 失败: {e}") @admin_only async def cmd_iban_clear(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """清空 IBAN 列表""" # 需要确认 if not context.args or context.args[0].lower() != "confirm": try: from auto_gpt_team import get_sepa_ibans count = len(get_sepa_ibans()) except: count = 0 await update.message.reply_text( f"⚠️ 确认清空 IBAN 列表?\n\n" f"当前共有 {count} 个 IBAN\n\n" f"确认请发送:\n" f"/iban_clear confirm", parse_mode="HTML" ) return try: from auto_gpt_team import clear_sepa_ibans clear_sepa_ibans() await update.message.reply_text("✅ IBAN 列表已清空", parse_mode="HTML") except ImportError: await update.message.reply_text("❌ auto_gpt_team 模块未找到") except Exception as e: await update.message.reply_text(f"❌ 清空 IBAN 失败: {e}") @admin_only async def cmd_domain_list(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """查看邮箱域名列表""" try: from auto_gpt_team import get_email_domains domains = get_email_domains() if not domains: await update.message.reply_text( "📧 邮箱域名列表\n\n" "📭 暂无域名\n\n" "使用 /domain_add 添加域名", parse_mode="HTML" ) return # 显示域名列表 lines = [f"📧 邮箱域名列表 ({len(domains)} 个)\n"] for i, domain in enumerate(domains[:50], 1): # 最多显示 50 个 lines.append(f"{i}. {domain}") if len(domains) > 50: lines.append(f"\n... 还有 {len(domains) - 50} 个未显示") await update.message.reply_text("\n".join(lines), parse_mode="HTML") except ImportError: await update.message.reply_text("❌ auto_gpt_team 模块未找到") except Exception as e: await update.message.reply_text(f"❌ 获取域名列表失败: {e}") @admin_only async def cmd_domain_add(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """添加邮箱域名""" if not context.args: await update.message.reply_text( "📧 添加邮箱域名\n\n" "用法:\n" "/domain_add @example.com\n" "/domain_add @a.com,@b.com\n\n" "支持空格或逗号分隔\n" "@ 符号可省略,会自动添加\n\n" "格式要求:\n" "• 域名需包含至少一个点号\n" "• 只能包含字母、数字、连字符\n" "• 顶级域名至少2个字符", parse_mode="HTML" ) return try: from auto_gpt_team import add_email_domains # 解析输入 (支持空格、逗号、换行分隔) raw_input = " ".join(context.args) # 替换逗号和换行为空格,然后按空格分割 domains = [s.strip() for s in raw_input.replace(",", " ").replace("\n", " ").split() if s.strip()] if not domains: await update.message.reply_text("❌ 未提供有效的域名") return added, skipped, invalid, total = add_email_domains(domains) # 构建响应消息 lines = ["✅ 域名导入完成\n"] lines.append(f"新增: {added}") lines.append(f"跳过 (重复): {skipped}") if invalid > 0: lines.append(f"无效 (格式错误): {invalid}") lines.append(f"当前总数: {total}") await update.message.reply_text("\n".join(lines), parse_mode="HTML") except ImportError: await update.message.reply_text("❌ auto_gpt_team 模块未找到") except Exception as e: await update.message.reply_text(f"❌ 添加域名失败: {e}") @admin_only async def cmd_domain_del(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """删除指定域名""" if not context.args: await update.message.reply_text( "📧 删除域名\n\n" "用法:\n" "/domain_del @example.com\n\n" "@ 符号可省略", parse_mode="HTML" ) return try: from auto_gpt_team import remove_email_domain, get_email_domains domain = context.args[0].strip() if remove_email_domain(domain): total = len(get_email_domains()) await update.message.reply_text( f"✅ 域名已删除\n\n" f"已删除: {domain}\n" f"剩余: {total} 个", parse_mode="HTML" ) else: await update.message.reply_text( f"❌ 域名不存在: {domain}", parse_mode="HTML" ) except ImportError: await update.message.reply_text("❌ auto_gpt_team 模块未找到") except Exception as e: await update.message.reply_text(f"❌ 删除域名失败: {e}") @admin_only async def cmd_domain_clear(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """清空域名列表 (只清空txt文件中的域名,保留config配置)""" # 需要确认 if not context.args or context.args[0].lower() != "confirm": try: from auto_gpt_team import get_file_domains_count, EMAIL_DOMAINS file_count = get_file_domains_count() config_count = len(EMAIL_DOMAINS) if EMAIL_DOMAINS else 0 except: file_count = 0 config_count = 0 if file_count == 0: await update.message.reply_text( "📧 域名列表\n\n" "txt文件中没有可清空的域名\n" f"config配置中的域名: {config_count} 个 (不会被清空)", parse_mode="HTML" ) return await update.message.reply_text( f"⚠️ 确认清空域名列表?\n\n" f"将清空txt文件中的域名: {file_count} 个\n" f"config配置中的域名: {config_count} 个 (不会被清空)\n\n" f"确认请发送:\n" f"/domain_clear confirm", parse_mode="HTML" ) return try: from auto_gpt_team import clear_email_domains, EMAIL_DOMAINS cleared_count = clear_email_domains() config_count = len(EMAIL_DOMAINS) if EMAIL_DOMAINS else 0 await update.message.reply_text( f"✅ 域名列表已清空\n\n" f"已清空: {cleared_count} 个域名\n" f"保留 (config配置): {config_count} 个", parse_mode="HTML" ) except ImportError: await update.message.reply_text("❌ auto_gpt_team 模块未找到") except Exception as e: await update.message.reply_text(f"❌ 清空域名失败: {e}") @admin_only async def cmd_team_fingerprint(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """切换 GPT Team 随机指纹""" import tomli_w try: # 读取当前配置 with open(CONFIG_FILE, "rb") as f: import tomllib config = tomllib.load(f) # 确保 GPT Team section 存在 if "GPT Team" not in config: config["GPT Team"] = {} # 获取当前状态 current = config.get("GPT Team", {}).get("random_fingerprint", True) new_value = not current # 更新配置 config["GPT Team"]["random_fingerprint"] = new_value # 写回文件 with open(CONFIG_FILE, "wb") as f: tomli_w.dump(config, f) status = "✅ 已开启" if new_value else "❌ 已关闭" await update.message.reply_text( f"🎭 GPT Team 随机指纹\n\n" f"状态: {status}\n\n" f"开启后每次运行将随机使用不同的:\n" f"• User-Agent (Chrome 139-144)\n" f"• WebGL 显卡指纹 (NVIDIA/AMD/Intel)\n" f"• 屏幕分辨率 (1080p/1440p/4K)\n\n" f"💡 下次运行 auto_gpt_team.py 时生效", parse_mode="HTML" ) except ImportError: await update.message.reply_text( "❌ 缺少 tomli_w 依赖\n" "请运行: uv add tomli_w" ) except Exception as e: await update.message.reply_text(f"❌ 修改配置失败: {e}") async def _test_mail_api_connection(self, mail_api_base: str, mail_api_token: str, domain: str) -> tuple[bool, str]: """测试邮件 API 连接 Args: mail_api_base: 邮件 API 地址 mail_api_token: 邮件 API Token domain: 测试用的邮箱域名 Returns: tuple: (success, message) """ import requests import random import string try: # 生成测试邮箱 random_str = ''.join(random.choices(string.ascii_lowercase + string.digits, k=8)) test_email = f"test-{random_str}@{domain.lstrip('@')}" # 测试 API 连接 url = f"{mail_api_base}/api/public/emailList" headers = { "Authorization": mail_api_token, "Content-Type": "application/json" } payload = { "toEmail": test_email, "timeSort": "desc", "size": 1 } response = requests.post(url, headers=headers, json=payload, timeout=10) data = response.json() if data.get("code") == 200: return True, "邮件 API 连接正常" else: error_msg = data.get("message", "未知错误") return False, f"API 响应异常: {error_msg}" except requests.exceptions.Timeout: return False, "连接超时,无法连接到邮件 API 服务器" except requests.exceptions.ConnectionError: return False, "连接失败,请检查 mail_api_base 配置" except Exception as e: return False, f"测试失败: {e}" @admin_only async def cmd_team_register(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """开始 GPT Team 自动订阅注册""" # 检查是否有任务正在运行 if self.current_task and not self.current_task.done(): await update.message.reply_text( f"⚠️ 有任务正在运行: {self.current_team}\n" "请等待任务完成或使用 /stop 停止后再开始注册" ) return # 检查配置 try: from auto_gpt_team import MAIL_API_TOKEN, MAIL_API_BASE, get_email_domains, get_sepa_ibans if not MAIL_API_TOKEN or not MAIL_API_BASE: await update.message.reply_text( "❌ 配置错误\n\n" "请在 config.toml 中配置 [GPT Team] 段:\n" "• mail_api_token\n" "• mail_api_base", parse_mode="HTML" ) return domains = get_email_domains() if not domains: await update.message.reply_text( "❌ 没有可用的邮箱域名\n\n" "请先使用 /domain_add 导入域名", parse_mode="HTML" ) return ibans = get_sepa_ibans() if not ibans: await update.message.reply_text( "❌ 没有可用的 IBAN\n\n" "请先使用 /iban_add 导入 IBAN", parse_mode="HTML" ) return # 测试邮件 API 连接 await update.message.reply_text("⏳ 正在检测邮件 API 连接...") import random test_domain = random.choice(domains) success, message = await self._test_mail_api_connection(MAIL_API_BASE, MAIL_API_TOKEN, test_domain) if not success: await update.message.reply_text( f"❌ 邮件 API 连接失败\n\n" f"错误: {message}\n\n" f"请检查配置后重试:\n" f"• mail_api_base: {MAIL_API_BASE}\n" f"• mail_api_token: {'已配置' if MAIL_API_TOKEN else '未配置'}", parse_mode="HTML" ) return except ImportError: await update.message.reply_text("❌ auto_gpt_team 模块未找到") return # 显示数量选择 keyboard = [ [ InlineKeyboardButton("1 个", callback_data="team_reg:count:1"), InlineKeyboardButton("3 个", callback_data="team_reg:count:3"), InlineKeyboardButton("5 个", callback_data="team_reg:count:5"), ], [ InlineKeyboardButton("10 个", callback_data="team_reg:count:10"), InlineKeyboardButton("20 个", callback_data="team_reg:count:20"), InlineKeyboardButton("自定义", callback_data="team_reg:count:custom"), ], ] reply_markup = InlineKeyboardMarkup(keyboard) await update.message.reply_text( "🚀 GPT Team 自动订阅\n\n" f"📧 邮箱域名: {len(domains)} 个\n" f"💳 可用 IBAN: {len(ibans)} 个\n\n" "请选择注册数量:", parse_mode="HTML", reply_markup=reply_markup ) async def callback_team_register(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """处理 GPT Team 注册回调""" query = update.callback_query # 权限检查 user_id = update.effective_user.id if user_id not in TELEGRAM_ADMIN_CHAT_IDS: await query.answer("⛔ 无权限", show_alert=True) return await query.answer() data = query.data.split(":") action = data[1] if len(data) > 1 else "" value = data[2] if len(data) > 2 else "" if action == "count": if value == "custom": await query.edit_message_text( "📝 自定义数量\n\n" "请发送数量 (1-50):\n" "直接回复一个数字即可\n\n" "例如: 20", parse_mode="HTML" ) # 设置等待输入状态 context.user_data["team_waiting_count"] = True return count = int(value) # 显示输出方式选择 keyboard = [ [ InlineKeyboardButton("📄 JSON 文件", callback_data=f"team_reg:output:json:{count}"), ], [ InlineKeyboardButton("📥 添加到 team.json", callback_data=f"team_reg:output:team:{count}"), ], ] reply_markup = InlineKeyboardMarkup(keyboard) await query.edit_message_text( f"⚙️ 配置完成\n\n" f"注册数量: {count} 个\n\n" f"请选择输出方式:", parse_mode="HTML", reply_markup=reply_markup ) elif action == "output": output_type = value # json 或 team count = int(data[3]) if len(data) > 3 else 1 await query.edit_message_text( f"⚙️ 配置完成\n\n" f"注册数量: {count} 个\n" f"输出方式: {'📄 JSON 文件' if output_type == 'json' else '📥 team.json'}\n\n" f"即将开始完整注册流程...", parse_mode="HTML" ) # 开始注册任务 self.current_team = f"GPT Team 注册 ({count}个)" # 重置停止标志,确保新任务可以正常运行 try: import run run._shutdown_requested = False except Exception: pass self.current_task = asyncio.create_task( self._run_team_registration(query.message.chat_id, count, output_type) ) async def _run_team_registration(self, chat_id: int, count: int, output_type: str): """执行 GPT Team 注册任务""" from auto_gpt_team import run_single_registration_auto, cleanup_chrome_processes, get_register_mode import json import threading results = [] success_count = 0 fail_count = 0 # 获取当前注册模式 current_mode = get_register_mode() mode_display = "🌐 协议模式" if current_mode == "api" else "🖥️ 浏览器模式" # 当前步骤 (用于显示) current_step = ["初始化..."] current_account = [""] step_lock = threading.Lock() def step_callback(step: str): """步骤回调 - 更新当前步骤""" with step_lock: current_step[0] = step # 发送开始消息 progress_msg = await self.app.bot.send_message( chat_id, f"🚀 开始注册\n\n" f"模式: {mode_display}\n" f"进度: 0/{count}\n" f"{'▱' * 20}", parse_mode="HTML" ) # 进度更新任务 async def update_progress_loop(): """定期更新进度消息""" last_step = "" while True: await asyncio.sleep(1.5) # 每 1.5 秒更新一次 try: with step_lock: step = current_step[0] account = current_account[0] # 只有步骤变化时才更新 if step != last_step: last_step = step progress = int((success_count + fail_count) / count * 20) if count > 0 else 0 progress_bar = '▰' * progress + '▱' * (20 - progress) text = ( f"🚀 注册中...\n\n" f"模式: {mode_display}\n" f"进度: {success_count + fail_count}/{count}\n" f"{progress_bar}\n\n" f"✅ 成功: {success_count}\n" f"❌ 失败: {fail_count}\n" ) if account: text += f"\n⏳ 账号: {account[:20]}..." if step: text += f"\n ▸ {step}" try: await progress_msg.edit_text(text, parse_mode="HTML") except: pass except asyncio.CancelledError: break except: pass # 启动进度更新任务 progress_task = asyncio.create_task(update_progress_loop()) for i in range(count): # 检查停止请求 try: import run if run._shutdown_requested: with step_lock: current_step[0] = "用户请求停止..." break except: pass # 执行注册 try: # 使用 functools.partial 传递回调 import functools def run_with_callback(): # 在执行前再次检查停止请求 try: import run as run_module if run_module._shutdown_requested: return {"success": False, "error": "用户停止", "stopped": True} except: pass return run_single_registration_auto( progress_callback=None, step_callback=step_callback ) # 更新当前账号 with step_lock: current_step[0] = "生成账号信息..." current_account[0] = f"第 {i+1} 个" result = await asyncio.get_event_loop().run_in_executor( self.executor, run_with_callback ) if result.get("stopped"): # 被 /stop 命令中断,不计入失败 log.info("注册被用户停止") with step_lock: current_step[0] = "已停止" break elif result.get("success"): success_count += 1 results.append({ "account": result["account"], "password": result["password"], "token": result["token"], "account_id": result.get("account_id", "") }) with step_lock: current_account[0] = result["account"] else: fail_count += 1 log.warning(f"注册失败: {result.get('error', '未知错误')}") except asyncio.CancelledError: log.info("注册任务被取消") with step_lock: current_step[0] = "已取消" break except Exception as e: fail_count += 1 log.error(f"注册异常: {e}") # 清理浏览器进程 cleanup_chrome_processes() # 每次注册后检查停止请求 try: import run if run._shutdown_requested: with step_lock: current_step[0] = "用户请求停止..." break except: pass # 检查是否被停止 stopped = False try: import run stopped = run._shutdown_requested except: pass # 停止进度更新任务 progress_task.cancel() try: await progress_task except asyncio.CancelledError: pass # 完成进度 progress_bar = '▰' * 20 completed = success_count + fail_count if stopped: status_text = f"🛑 注册已停止 {completed}/{count}" else: status_text = f"🎉 注册完成! {success_count}/{count}" await progress_msg.edit_text( f"{status_text}\n" f"{progress_bar}\n\n" f"✅ 成功: {success_count}\n" f"❌ 失败: {fail_count}", parse_mode="HTML" ) # 处理结果 if results: if output_type == "json": # 生成 JSON 文件 timestamp = time.strftime("%Y%m%d_%H%M%S") filename = f"team_accounts_{timestamp}.json" filepath = Path(filename) with open(filepath, "w", encoding="utf-8") as f: json.dump(results, f, ensure_ascii=False, indent=2) # 发送文件 await self.app.bot.send_document( chat_id, document=open(filepath, "rb"), filename=filename, caption=f"📄 注册结果 ({success_count} 个账号)" ) # 删除临时文件 filepath.unlink() elif output_type == "team": # 添加到 team.json try: team_file = TEAM_JSON_FILE existing = [] if team_file.exists(): with open(team_file, "r", encoding="utf-8") as f: existing = json.load(f) existing.extend(results) with open(team_file, "w", encoding="utf-8") as f: json.dump(existing, f, ensure_ascii=False, indent=2) # 重载配置 reload_config() await self.app.bot.send_message( chat_id, f"✅ 已添加到 team.json\n\n" f"新增: {success_count} 个账号\n" f"当前总数: {len(existing)} 个", parse_mode="HTML" ) except Exception as e: await self.app.bot.send_message( chat_id, f"❌ 保存到 team.json 失败: {e}" ) self.current_task = None self.current_team = None @admin_only async def cmd_autogptplus(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """AutoGPTPlus 配置管理 - 交互式菜单""" keyboard = [ [ InlineKeyboardButton("📋 查看配置", callback_data="autogptplus:config"), InlineKeyboardButton("🔑 设置 Token", callback_data="autogptplus:set_token"), ], [ InlineKeyboardButton("📧 域名管理", callback_data="autogptplus:domains"), InlineKeyboardButton("💳 IBAN 管理", callback_data="autogptplus:ibans"), ], [ InlineKeyboardButton("🎭 随机指纹", callback_data="autogptplus:fingerprint"), InlineKeyboardButton("📊 统计信息", callback_data="autogptplus:stats"), ], [ InlineKeyboardButton("📧 测试邮件", callback_data="autogptplus:test_email"), InlineKeyboardButton("🔄 测试 API", callback_data="autogptplus:test_api"), ], [ InlineKeyboardButton("🚀 开始注册", callback_data="autogptplus:register"), ], ] reply_markup = InlineKeyboardMarkup(keyboard) await update.message.reply_text( "🤖 AutoGPTPlus 管理面板\n\n" "ChatGPT 订阅自动化配置管理\n\n" "请选择功能:", parse_mode="HTML", reply_markup=reply_markup ) def _get_autogptplus_main_keyboard(self): """获取 AutoGPTPlus 主菜单键盘""" # 检查协议模式是否可用 try: from auto_gpt_team import is_api_mode_supported, get_register_mode api_supported = is_api_mode_supported() current_mode = get_register_mode() except ImportError: api_supported = False current_mode = "browser" keyboard = [ [ InlineKeyboardButton("📋 查看配置", callback_data="autogptplus:config"), InlineKeyboardButton("🔑 设置 Token", callback_data="autogptplus:set_token"), ], [ InlineKeyboardButton("📧 域名管理", callback_data="autogptplus:domains"), InlineKeyboardButton("💳 IBAN 管理", callback_data="autogptplus:ibans"), ], [ InlineKeyboardButton("🎭 随机指纹", callback_data="autogptplus:fingerprint"), InlineKeyboardButton("📊 统计信息", callback_data="autogptplus:stats"), ], [ InlineKeyboardButton("📧 测试邮件", callback_data="autogptplus:test_email"), InlineKeyboardButton("🔄 测试 API", callback_data="autogptplus:test_api"), ], ] # 添加注册模式选择按钮 mode_icon = "🌐" if current_mode == "api" else "🖥️" mode_text = "协议模式" if current_mode == "api" else "浏览器模式" keyboard.append([ InlineKeyboardButton(f"⚙️ 注册模式: {mode_icon} {mode_text}", callback_data="autogptplus:select_mode"), ]) keyboard.append([ InlineKeyboardButton("🚀 开始注册", callback_data="autogptplus:register"), ]) return InlineKeyboardMarkup(keyboard) async def callback_autogptplus(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """处理 AutoGPTPlus 回调""" query = update.callback_query # 权限检查 user_id = update.effective_user.id if user_id not in TELEGRAM_ADMIN_CHAT_IDS: await query.answer("⛔ 无权限", show_alert=True) return await query.answer() data = query.data.split(":") action = data[1] if len(data) > 1 else "" sub_action = data[2] if len(data) > 2 else "" if action == "config": await self._show_autogptplus_config(query) elif action == "set_token": await self._prompt_autogptplus_token(query, context) elif action == "test_email": await self._test_autogptplus_email(query) elif action == "test_api": await self._test_autogptplus_api(query) elif action == "domains": await self._show_autogptplus_domains(query, sub_action, context) elif action == "ibans": await self._show_autogptplus_ibans(query, sub_action, context) elif action == "fingerprint": await self._toggle_autogptplus_fingerprint(query) elif action == "stats": await self._show_autogptplus_stats(query) elif action == "select_mode": await self._show_autogptplus_mode_selection(query) elif action == "set_mode": await self._set_autogptplus_mode(query, sub_action) elif action == "register": await self._start_autogptplus_register(query, context) elif action == "back": # 返回主菜单 await query.edit_message_text( "🤖 AutoGPTPlus 管理面板\n\n" "ChatGPT 订阅自动化配置管理\n\n" "请选择功能:", parse_mode="HTML", reply_markup=self._get_autogptplus_main_keyboard() ) async def _prompt_autogptplus_token(self, query, context: ContextTypes.DEFAULT_TYPE): """提示用户输入 Token""" try: from auto_gpt_team import MAIL_API_TOKEN # 脱敏显示当前 Token current_display = "未配置" if MAIL_API_TOKEN: if len(MAIL_API_TOKEN) > 10: current_display = f"{MAIL_API_TOKEN[:8]}...{MAIL_API_TOKEN[-4:]}" else: current_display = MAIL_API_TOKEN[:4] + "..." keyboard = [[InlineKeyboardButton("❌ 取消", callback_data="autogptplus:back")]] reply_markup = InlineKeyboardMarkup(keyboard) await query.edit_message_text( "🔑 设置 Cloud Mail API Token\n\n" f"当前 Token: {current_display}\n\n" "请直接发送新的 Token:\n" "(发送后将自动保存到 config.toml)", parse_mode="HTML", reply_markup=reply_markup ) # 设置等待输入状态 context.user_data["autogptplus_waiting_token"] = True except ImportError: keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")]] reply_markup = InlineKeyboardMarkup(keyboard) await query.edit_message_text( "❌ 模块未找到\n\n" "auto_gpt_team 模块未安装或导入失败", parse_mode="HTML", reply_markup=reply_markup ) async def _show_autogptplus_config(self, query): """显示 AutoGPTPlus 配置""" try: from auto_gpt_team import ( MAIL_API_TOKEN, MAIL_API_BASE, EMAIL_DOMAINS, SEPA_IBANS, RANDOM_FINGERPRINT, get_email_domains, get_sepa_ibans, REGISTER_MODE, API_PROXY, is_api_mode_supported ) # 脱敏显示 Token token_display = "未配置" if MAIL_API_TOKEN: if len(MAIL_API_TOKEN) > 10: token_display = f"{MAIL_API_TOKEN[:8]}...{MAIL_API_TOKEN[-4:]}" else: token_display = MAIL_API_TOKEN[:4] + "..." # 获取域名列表 domains = get_email_domains() domains_display = ", ".join(domains[:3]) if domains else "未配置" if len(domains) > 3: domains_display += f" (+{len(domains) - 3})" # 获取 IBAN 列表 ibans = get_sepa_ibans() # 随机指纹状态 fingerprint_status = "✅ 已开启" if RANDOM_FINGERPRINT else "❌ 已关闭" # 注册模式 api_supported = is_api_mode_supported() if REGISTER_MODE == "api": mode_display = "🌐 协议模式 (API)" else: mode_display = "🖥️ 浏览器模式" if not api_supported: mode_display += " (协议模式不可用)" # 代理配置 proxy_display = API_PROXY if API_PROXY else "未配置" lines = [ "📋 AutoGPTPlus 配置", "", "🔑 Cloud Mail API", f" Token: {token_display}", f" 地址: {MAIL_API_BASE or '未配置'}", "", "📧 邮箱域名", f" 数量: {len(domains)} 个", f" 域名: {domains_display}", "", "💳 SEPA IBAN", f" 数量: {len(ibans)} 个", "", "⚙️ 注册设置", f" 模式: {mode_display}", f" 随机指纹: {fingerprint_status}", f" API 代理: {proxy_display}", ] # 配置状态检查 config_ok = bool(MAIL_API_TOKEN and MAIL_API_BASE and domains) if config_ok: lines.append("\n✅ 配置完整,可以使用") else: lines.append("\n⚠️ 配置不完整:") if not MAIL_API_TOKEN: lines.append(" • 缺少 mail_api_token") if not MAIL_API_BASE: lines.append(" • 缺少 mail_api_base") if not domains: lines.append(" • 缺少 email_domains") keyboard = [ [ InlineKeyboardButton("🔑 设置 Token", callback_data="autogptplus:set_token"), InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back"), ] ] reply_markup = InlineKeyboardMarkup(keyboard) await query.edit_message_text( "\n".join(lines), parse_mode="HTML", reply_markup=reply_markup ) except ImportError: keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")]] reply_markup = InlineKeyboardMarkup(keyboard) await query.edit_message_text( "❌ 模块未找到\n\n" "auto_gpt_team 模块未安装或导入失败", parse_mode="HTML", reply_markup=reply_markup ) except Exception as e: keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")]] reply_markup = InlineKeyboardMarkup(keyboard) await query.edit_message_text( f"❌ 获取配置失败: {e}", reply_markup=reply_markup ) async def _test_autogptplus_email(self, query): """测试 AutoGPTPlus 邮件创建""" await query.edit_message_text("⏳ 正在测试邮件创建...") try: from auto_gpt_team import ( MAIL_API_TOKEN, MAIL_API_BASE, get_email_domains ) import requests import random import string # 检查配置 if not MAIL_API_TOKEN or not MAIL_API_BASE: keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")]] reply_markup = InlineKeyboardMarkup(keyboard) await query.edit_message_text( "❌ 配置不完整\n\n" "请先在 config.toml 中配置:\n" "• mail_api_token\n" "• mail_api_base", parse_mode="HTML", reply_markup=reply_markup ) return domains = get_email_domains() if not domains: keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")]] reply_markup = InlineKeyboardMarkup(keyboard) await query.edit_message_text( "❌ 邮箱域名未配置\n\n" "请先在 config.toml 中配置 email_domains\n" "或使用 /domain_add 添加域名", parse_mode="HTML", reply_markup=reply_markup ) return # 生成测试邮箱 random_str = ''.join(random.choices(string.ascii_lowercase + string.digits, k=8)) domain = random.choice(domains) test_email = f"test-{random_str}@{domain.lstrip('@')}" # 测试创建邮箱 (通过查询邮件列表来验证 API 连接) url = f"{MAIL_API_BASE}/api/public/emailList" headers = { "Authorization": MAIL_API_TOKEN, "Content-Type": "application/json" } payload = { "toEmail": test_email, "timeSort": "desc", "size": 1 } response = requests.post(url, headers=headers, json=payload, timeout=10) data = response.json() keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")]] reply_markup = InlineKeyboardMarkup(keyboard) if data.get("code") == 200: await query.edit_message_text( "✅ 邮件 API 测试成功\n\n" f"测试邮箱: {test_email}\n" f"API 响应: 正常\n\n" f"邮件系统已就绪,可以接收验证码", parse_mode="HTML", reply_markup=reply_markup ) else: error_msg = data.get("message", "未知错误") await query.edit_message_text( f"⚠️ API 响应异常\n\n" f"状态码: {data.get('code')}\n" f"错误: {error_msg}", parse_mode="HTML", reply_markup=reply_markup ) except ImportError: keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")]] reply_markup = InlineKeyboardMarkup(keyboard) await query.edit_message_text( "❌ 模块未找到\n\n" "auto_gpt_team 模块未安装或导入失败", parse_mode="HTML", reply_markup=reply_markup ) except requests.exceptions.Timeout: keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")]] reply_markup = InlineKeyboardMarkup(keyboard) await query.edit_message_text( "❌ 连接超时\n\n" "无法连接到邮件 API 服务器\n" "请检查 mail_api_base 配置", parse_mode="HTML", reply_markup=reply_markup ) except Exception as e: keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")]] reply_markup = InlineKeyboardMarkup(keyboard) await query.edit_message_text( f"❌ 测试失败\n\n{e}", parse_mode="HTML", reply_markup=reply_markup ) async def _test_autogptplus_api(self, query): """测试 AutoGPTPlus API 连接""" await query.edit_message_text("⏳ 正在测试 API 连接...") try: from auto_gpt_team import MAIL_API_TOKEN, MAIL_API_BASE import requests # 检查配置 if not MAIL_API_TOKEN or not MAIL_API_BASE: keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")]] reply_markup = InlineKeyboardMarkup(keyboard) await query.edit_message_text( "❌ 配置不完整\n\n" "请先在 config.toml 中配置:\n" "• mail_api_token\n" "• mail_api_base", parse_mode="HTML", reply_markup=reply_markup ) return # 测试 API 连接 url = f"{MAIL_API_BASE}/api/public/emailList" headers = { "Authorization": MAIL_API_TOKEN, "Content-Type": "application/json" } payload = { "toEmail": "test@test.com", "timeSort": "desc", "size": 1 } start_time = time.time() response = requests.post(url, headers=headers, json=payload, timeout=10) elapsed = time.time() - start_time data = response.json() keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")]] reply_markup = InlineKeyboardMarkup(keyboard) if response.status_code == 200 and data.get("code") == 200: await query.edit_message_text( "✅ API 连接测试成功\n\n" f"服务器: {MAIL_API_BASE}\n" f"响应时间: {elapsed*1000:.0f}ms\n" f"状态: 正常\n\n" "Cloud Mail API 服务运行正常", parse_mode="HTML", reply_markup=reply_markup ) else: error_msg = data.get("message", "未知错误") await query.edit_message_text( f"⚠️ API 响应异常\n\n" f"HTTP 状态: {response.status_code}\n" f"API 状态: {data.get('code')}\n" f"错误: {error_msg}\n" f"响应时间: {elapsed*1000:.0f}ms", parse_mode="HTML", reply_markup=reply_markup ) except ImportError: keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")]] reply_markup = InlineKeyboardMarkup(keyboard) await query.edit_message_text( "❌ 模块未找到\n\n" "auto_gpt_team 模块未安装或导入失败", parse_mode="HTML", reply_markup=reply_markup ) except requests.exceptions.ConnectionError: keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")]] reply_markup = InlineKeyboardMarkup(keyboard) await query.edit_message_text( "❌ 连接失败\n\n" "无法连接到邮件 API 服务器\n" "请检查:\n" "• mail_api_base 地址是否正确\n" "• 服务器是否在线\n" "• 网络连接是否正常", parse_mode="HTML", reply_markup=reply_markup ) except requests.exceptions.Timeout: keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")]] reply_markup = InlineKeyboardMarkup(keyboard) await query.edit_message_text( "❌ 连接超时\n\n" "API 服务器响应超时 (>10s)\n" "请检查服务器状态", parse_mode="HTML", reply_markup=reply_markup ) except Exception as e: keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")]] reply_markup = InlineKeyboardMarkup(keyboard) await query.edit_message_text( f"❌ 测试失败\n\n{e}", parse_mode="HTML", reply_markup=reply_markup ) async def _show_autogptplus_domains(self, query, sub_action: str = "", context=None): """显示/管理域名""" try: from auto_gpt_team import ( get_email_domains, get_file_domains_count, EMAIL_DOMAINS, clear_email_domains ) if sub_action == "clear": # 清空域名 cleared = clear_email_domains() config_count = len(EMAIL_DOMAINS) if EMAIL_DOMAINS else 0 keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:domains")]] reply_markup = InlineKeyboardMarkup(keyboard) await query.edit_message_text( f"✅ 域名已清空\n\n" f"已清空: {cleared} 个\n" f"保留 (config): {config_count} 个", parse_mode="HTML", reply_markup=reply_markup ) return if sub_action == "add": # 提示添加域名 keyboard = [[InlineKeyboardButton("❌ 取消", callback_data="autogptplus:domains")]] reply_markup = InlineKeyboardMarkup(keyboard) await query.edit_message_text( "➕ 添加邮箱域名\n\n" "请直接发送要添加的域名:\n\n" "支持格式:\n" "• 单个: @example.com\n" "• 多个: @a.com @b.com\n" "• 逗号分隔: @a.com,@b.com\n\n" "@ 符号可省略,会自动添加", parse_mode="HTML", reply_markup=reply_markup ) if context: context.user_data["autogptplus_waiting_domain"] = True return # 显示域名列表 domains = get_email_domains() file_count = get_file_domains_count() config_count = len(EMAIL_DOMAINS) if EMAIL_DOMAINS else 0 lines = ["📧 邮箱域名管理\n"] lines.append(f"总计: {len(domains)} 个") lines.append(f" • txt文件: {file_count} 个") lines.append(f" • config配置: {config_count} 个\n") if domains: lines.append("域名列表:") for i, domain in enumerate(domains[:15], 1): lines.append(f" {i}. {domain}") if len(domains) > 15: lines.append(f" ... 还有 {len(domains) - 15} 个") keyboard = [ [ InlineKeyboardButton("➕ 添加域名", callback_data="autogptplus:domains:add"), InlineKeyboardButton("🗑️ 清空txt域名", callback_data="autogptplus:domains:clear"), ], [InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")], ] reply_markup = InlineKeyboardMarkup(keyboard) await query.edit_message_text( "\n".join(lines), parse_mode="HTML", reply_markup=reply_markup ) except ImportError: keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")]] reply_markup = InlineKeyboardMarkup(keyboard) await query.edit_message_text( "❌ 模块未找到", parse_mode="HTML", reply_markup=reply_markup ) except Exception as e: keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")]] reply_markup = InlineKeyboardMarkup(keyboard) await query.edit_message_text( f"❌ 操作失败\n\n{e}", parse_mode="HTML", reply_markup=reply_markup ) async def _show_autogptplus_ibans(self, query, sub_action: str = "", context=None): """显示/管理 IBAN""" try: from auto_gpt_team import get_sepa_ibans, load_ibans_from_file, SEPA_IBANS, clear_sepa_ibans if sub_action == "clear": # 清空 IBAN clear_sepa_ibans() keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:ibans")]] reply_markup = InlineKeyboardMarkup(keyboard) await query.edit_message_text( "✅ IBAN 列表已清空", parse_mode="HTML", reply_markup=reply_markup ) return if sub_action == "add": # 提示添加 IBAN keyboard = [[InlineKeyboardButton("❌ 取消", callback_data="autogptplus:ibans")]] reply_markup = InlineKeyboardMarkup(keyboard) await query.edit_message_text( "➕ 添加 IBAN\n\n" "请直接发送要添加的 IBAN:\n\n" "支持格式:\n" "• 单个: DE89370400440532013000\n" "• 多个: 每行一个或逗号分隔\n\n" "只支持德国 IBAN (DE开头)", parse_mode="HTML", reply_markup=reply_markup ) if context: context.user_data["autogptplus_waiting_iban"] = True return # 显示 IBAN 列表 ibans = get_sepa_ibans() file_ibans = load_ibans_from_file() config_count = len(SEPA_IBANS) if SEPA_IBANS else 0 lines = ["💳 IBAN 管理\n"] lines.append(f"总计: {len(ibans)} 个") lines.append(f" • txt文件: {len(file_ibans)} 个") lines.append(f" • config配置: {config_count} 个\n") if ibans: lines.append("IBAN 列表:") for i, iban in enumerate(ibans[:10], 1): # 脱敏显示 masked = f"{iban[:8]}...{iban[-4:]}" if len(iban) > 12 else iban lines.append(f" {i}. {masked}") if len(ibans) > 10: lines.append(f" ... 还有 {len(ibans) - 10} 个") keyboard = [ [ InlineKeyboardButton("➕ 添加IBAN", callback_data="autogptplus:ibans:add"), InlineKeyboardButton("🗑️ 清空IBAN", callback_data="autogptplus:ibans:clear"), ], [InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")], ] reply_markup = InlineKeyboardMarkup(keyboard) await query.edit_message_text( "\n".join(lines), parse_mode="HTML", reply_markup=reply_markup ) except ImportError: keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")]] reply_markup = InlineKeyboardMarkup(keyboard) await query.edit_message_text( "❌ 模块未找到", parse_mode="HTML", reply_markup=reply_markup ) except Exception as e: keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")]] reply_markup = InlineKeyboardMarkup(keyboard) await query.edit_message_text( f"❌ 操作失败\n\n{e}", parse_mode="HTML", reply_markup=reply_markup ) async def _toggle_autogptplus_fingerprint(self, query): """切换随机指纹""" import tomli_w try: # 读取当前配置 with open(CONFIG_FILE, "rb") as f: import tomllib config = tomllib.load(f) # 确保 autogptplus section 存在 if "autogptplus" not in config: config["autogptplus"] = {} # 获取当前状态并切换 current = config.get("autogptplus", {}).get("random_fingerprint", True) new_value = not current # 更新配置 config["autogptplus"]["random_fingerprint"] = new_value # 写回文件 with open(CONFIG_FILE, "wb") as f: tomli_w.dump(config, f) # 重新加载模块 import importlib import auto_gpt_team importlib.reload(auto_gpt_team) status = "✅ 已开启" if new_value else "❌ 已关闭" keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")]] reply_markup = InlineKeyboardMarkup(keyboard) await query.edit_message_text( f"🎭 随机指纹设置\n\n" f"状态: {status}\n\n" f"{'每次注册将使用随机浏览器指纹' if new_value else '将使用固定浏览器指纹'}", parse_mode="HTML", reply_markup=reply_markup ) except Exception as e: keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")]] reply_markup = InlineKeyboardMarkup(keyboard) await query.edit_message_text( f"❌ 设置失败\n\n{e}", parse_mode="HTML", reply_markup=reply_markup ) async def _show_autogptplus_mode_selection(self, query): """显示注册模式选择界面""" try: from auto_gpt_team import is_api_mode_supported, get_register_mode api_supported = is_api_mode_supported() current_mode = get_register_mode() lines = [ "⚙️ 选择注册模式\n", "请选择 ChatGPT Team 注册使用的方式:\n", ] # 浏览器模式说明 browser_check = "✅" if current_mode == "browser" else "⬜" lines.append(f"{browser_check} 🖥️ 浏览器模式") lines.append("全程使用 DrissionPage 浏览器自动化") lines.append("• 兼容性好,无需额外依赖") lines.append("• 速度较慢,资源占用较高") lines.append("") # 协议模式说明 api_check = "✅" if current_mode == "api" else "⬜" api_status = "" if api_supported else " (不可用)" lines.append(f"{api_check} 🌐 协议模式{api_status}") lines.append("使用 API 快速注册,仅支付环节用浏览器") lines.append("• 速度快,资源占用少") lines.append("• 需要 curl_cffi 依赖") if not api_supported: lines.append("• 请安装: pip install curl_cffi") # 构建按钮 keyboard = [] # 浏览器模式按钮 browser_icon = "✅" if current_mode == "browser" else "🖥️" keyboard.append([ InlineKeyboardButton( f"{browser_icon} 浏览器模式", callback_data="autogptplus:set_mode:browser" ) ]) # 协议模式按钮 if api_supported: api_icon = "✅" if current_mode == "api" else "🌐" keyboard.append([ InlineKeyboardButton( f"{api_icon} 协议模式 (推荐)", callback_data="autogptplus:set_mode:api" ) ]) else: keyboard.append([ InlineKeyboardButton( "🌐 协议模式 (需安装依赖)", callback_data="autogptplus:set_mode:api_unavailable" ) ]) keyboard.append([ InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back") ]) reply_markup = InlineKeyboardMarkup(keyboard) await query.edit_message_text( "\n".join(lines), parse_mode="HTML", reply_markup=reply_markup ) except ImportError as e: keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")]] reply_markup = InlineKeyboardMarkup(keyboard) await query.edit_message_text( f"❌ 模块导入失败\n\n{e}", parse_mode="HTML", reply_markup=reply_markup ) except Exception as e: keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")]] reply_markup = InlineKeyboardMarkup(keyboard) await query.edit_message_text( f"❌ 获取配置失败\n\n{e}", parse_mode="HTML", reply_markup=reply_markup ) async def _set_autogptplus_mode(self, query, mode: str): """设置注册模式""" import tomli_w try: from auto_gpt_team import is_api_mode_supported, get_register_mode, set_register_mode # 处理协议模式不可用的情况 if mode == "api_unavailable": keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:select_mode")]] reply_markup = InlineKeyboardMarkup(keyboard) await query.edit_message_text( "❌ 协议模式不可用\n\n" "需要安装 curl_cffi 依赖:\n" "pip install curl_cffi\n\n" "安装后重启程序即可使用协议模式", parse_mode="HTML", reply_markup=reply_markup ) return # 检查是否已经是当前模式 current_mode = get_register_mode() if mode == current_mode: await query.answer(f"当前已是{'协议' if mode == 'api' else '浏览器'}模式", show_alert=False) return # 检查协议模式是否可用 if mode == "api" and not is_api_mode_supported(): keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:select_mode")]] reply_markup = InlineKeyboardMarkup(keyboard) await query.edit_message_text( "❌ 协议模式不可用\n\n" "需要安装 curl_cffi 依赖:\n" "pip install curl_cffi\n\n" "安装后重启程序即可使用协议模式", parse_mode="HTML", reply_markup=reply_markup ) return # 读取当前配置 with open(CONFIG_FILE, "rb") as f: import tomllib config = tomllib.load(f) # 确保 autogptplus section 存在 if "autogptplus" not in config: config["autogptplus"] = {} # 更新配置 config["autogptplus"]["register_mode"] = mode # 写回文件 with open(CONFIG_FILE, "wb") as f: tomli_w.dump(config, f) # 更新运行时配置 set_register_mode(mode) # 重新加载模块 import importlib import auto_gpt_team importlib.reload(auto_gpt_team) # 显示成功消息 if mode == "api": mode_name = "🌐 协议模式" mode_desc = ( "使用 API 快速完成注册流程,仅支付环节使用浏览器\n\n" "特点:\n" "• 注册速度更快\n" "• 资源占用更少\n" "• 更稳定可靠" ) else: mode_name = "🖥️ 浏览器模式" mode_desc = ( "全程使用 DrissionPage 浏览器自动化\n\n" "特点:\n" "• 兼容性更好\n" "• 无需额外依赖\n" "• 可视化调试方便" ) keyboard = [[InlineKeyboardButton("◀️ 返回主菜单", callback_data="autogptplus:back")]] reply_markup = InlineKeyboardMarkup(keyboard) await query.edit_message_text( f"✅ 注册模式已设置\n\n" f"当前模式: {mode_name}\n\n" f"{mode_desc}", parse_mode="HTML", reply_markup=reply_markup ) except ImportError as e: keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")]] reply_markup = InlineKeyboardMarkup(keyboard) await query.edit_message_text( f"❌ 模块导入失败\n\n{e}", parse_mode="HTML", reply_markup=reply_markup ) except Exception as e: keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")]] reply_markup = InlineKeyboardMarkup(keyboard) await query.edit_message_text( f"❌ 设置失败\n\n{e}", parse_mode="HTML", reply_markup=reply_markup ) async def _show_autogptplus_stats(self, query): """显示统计信息""" try: from auto_gpt_team import get_email_domains, get_sepa_ibans, RANDOM_FINGERPRINT import json from pathlib import Path # 读取账号文件统计 accounts_file = Path("accounts.json") accounts_count = 0 if accounts_file.exists(): try: with open(accounts_file, "r", encoding="utf-8") as f: accounts = json.load(f) accounts_count = len(accounts) except: pass domains = get_email_domains() ibans = get_sepa_ibans() lines = ["📊 AutoGPTPlus 统计信息\n"] # 资源统计 lines.append("📦 可用资源:") lines.append(f" • 邮箱域名: {len(domains)} 个") lines.append(f" • IBAN: {len(ibans)} 个") lines.append(f" • 随机指纹: {'开启' if RANDOM_FINGERPRINT else '关闭'}") lines.append("") # 账号统计 lines.append("👥 已注册账号:") lines.append(f" • 总计: {accounts_count} 个") # 配置状态 lines.append("") lines.append("⚙️ 配置状态:") if len(domains) > 0 and len(ibans) > 0: lines.append(" ✅ 已就绪,可以开始注册") else: if len(domains) == 0: lines.append(" ⚠️ 缺少邮箱域名") if len(ibans) == 0: lines.append(" ⚠️ 缺少 IBAN") keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")]] reply_markup = InlineKeyboardMarkup(keyboard) await query.edit_message_text( "\n".join(lines), parse_mode="HTML", reply_markup=reply_markup ) except ImportError: keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")]] reply_markup = InlineKeyboardMarkup(keyboard) await query.edit_message_text( "❌ 模块未找到", parse_mode="HTML", reply_markup=reply_markup ) except Exception as e: keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")]] reply_markup = InlineKeyboardMarkup(keyboard) await query.edit_message_text( f"❌ 获取统计失败\n\n{e}", parse_mode="HTML", reply_markup=reply_markup ) async def _start_autogptplus_register(self, query, context): """快速开始注册 (跳转到 team_register)""" try: from auto_gpt_team import get_email_domains, get_sepa_ibans, MAIL_API_TOKEN, MAIL_API_BASE # 检查配置 domains = get_email_domains() ibans = get_sepa_ibans() missing = [] if not MAIL_API_TOKEN: missing.append("mail_api_token") if not MAIL_API_BASE: missing.append("mail_api_base") if not domains: missing.append("邮箱域名") if not ibans: missing.append("IBAN") if missing: keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")]] reply_markup = InlineKeyboardMarkup(keyboard) await query.edit_message_text( "⚠️ 配置不完整\n\n" "缺少以下配置:\n" + "\n".join(f" • {m}" for m in missing) + "\n\n请先完成配置后再开始注册", parse_mode="HTML", reply_markup=reply_markup ) return # 显示注册选项 keyboard = [ [ InlineKeyboardButton("1️⃣ 注册1个", callback_data="team_reg:1"), InlineKeyboardButton("3️⃣ 注册3个", callback_data="team_reg:3"), ], [ InlineKeyboardButton("5️⃣ 注册5个", callback_data="team_reg:5"), InlineKeyboardButton("🔢 自定义", callback_data="team_reg:custom"), ], [InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")], ] reply_markup = InlineKeyboardMarkup(keyboard) await query.edit_message_text( "🚀 开始 ChatGPT Team 注册\n\n" f"可用资源:\n" f" • 邮箱域名: {len(domains)} 个\n" f" • IBAN: {len(ibans)} 个\n\n" "选择注册数量:", parse_mode="HTML", reply_markup=reply_markup ) except ImportError: keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")]] reply_markup = InlineKeyboardMarkup(keyboard) await query.edit_message_text( "❌ 模块未找到", parse_mode="HTML", reply_markup=reply_markup ) except Exception as e: keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")]] reply_markup = InlineKeyboardMarkup(keyboard) await query.edit_message_text( f"❌ 操作失败\n\n{e}", parse_mode="HTML", reply_markup=reply_markup ) @admin_only async def handle_team_custom_count(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """处理文本输入 (GPT Team 自定义数量 / AutoGPTPlus Token / 域名 / IBAN)""" # 处理 AutoGPTPlus Token 输入 if context.user_data.get("autogptplus_waiting_token"): context.user_data["autogptplus_waiting_token"] = False await self._handle_autogptplus_token_input(update, context) return # 处理域名添加输入 if context.user_data.get("autogptplus_waiting_domain"): context.user_data["autogptplus_waiting_domain"] = False await self._handle_autogptplus_domain_input(update, context) return # 处理 IBAN 添加输入 if context.user_data.get("autogptplus_waiting_iban"): context.user_data["autogptplus_waiting_iban"] = False await self._handle_autogptplus_iban_input(update, context) return # 处理 GPT Team 自定义数量输入 if not context.user_data.get("team_waiting_count"): return # 不在等待状态,忽略消息 # 清除等待状态 context.user_data["team_waiting_count"] = False text = update.message.text.strip() # 验证输入 try: count = int(text) if count < 1 or count > 50: await update.message.reply_text( "❌ 数量必须在 1-50 之间\n\n" "请重新使用 /team_register 开始" ) return except ValueError: await update.message.reply_text( "❌ 请输入有效的数字\n\n" "请重新使用 /team_register 开始" ) return # 显示输出方式选择 keyboard = [ [ InlineKeyboardButton("📄 JSON 文件", callback_data=f"team_reg:output:json:{count}"), ], [ InlineKeyboardButton("📥 添加到 team.json", callback_data=f"team_reg:output:team:{count}"), ], ] reply_markup = InlineKeyboardMarkup(keyboard) await update.message.reply_text( f"⚙️ 配置完成\n\n" f"注册数量: {count} 个\n\n" f"请选择输出方式:", parse_mode="HTML", reply_markup=reply_markup ) async def _handle_autogptplus_token_input(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """处理 AutoGPTPlus Token 输入 - 保存后立即测试""" import tomli_w import requests token = update.message.text.strip() if not token: await update.message.reply_text("❌ Token 不能为空") return # 脱敏显示 if len(token) > 10: token_display = f"{token[:8]}...{token[-4:]}" else: token_display = token[:4] + "..." # 先发送保存中的消息 status_msg = await update.message.reply_text( f"⏳ 正在保存并验证 Token...\n\n" f"Token: {token_display}", parse_mode="HTML" ) try: # 读取当前配置获取 API 地址 with open(CONFIG_FILE, "rb") as f: import tomllib config = tomllib.load(f) mail_api_base = config.get("autogptplus", {}).get("mail_api_base", "") if not mail_api_base: keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")]] reply_markup = InlineKeyboardMarkup(keyboard) await status_msg.edit_text( "❌ 无法验证 Token\n\n" "mail_api_base 未配置\n" "请先在 config.toml 中配置 API 地址", parse_mode="HTML", reply_markup=reply_markup ) return # 测试 Token 是否有效 url = f"{mail_api_base}/api/public/emailList" headers = { "Authorization": token, "Content-Type": "application/json" } payload = { "toEmail": "test@test.com", "timeSort": "desc", "size": 1 } start_time = time.time() response = requests.post(url, headers=headers, json=payload, timeout=10) elapsed = time.time() - start_time data = response.json() # 检查 Token 是否有效 if response.status_code == 200 and data.get("code") == 200: # Token 有效,保存到配置文件 if "autogptplus" not in config: config["autogptplus"] = {} config["autogptplus"]["mail_api_token"] = token with open(CONFIG_FILE, "wb") as f: tomli_w.dump(config, f) keyboard = [ [ InlineKeyboardButton("📋 查看配置", callback_data="autogptplus:config"), InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back"), ], ] reply_markup = InlineKeyboardMarkup(keyboard) await status_msg.edit_text( f"✅ Token 验证成功并已保存\n\n" f"Token: {token_display}\n" f"响应时间: {elapsed*1000:.0f}ms\n\n" f"💡 使用 /reload 重载配置使其生效", parse_mode="HTML", reply_markup=reply_markup ) else: # Token 无效 error_msg = data.get("message", "未知错误") keyboard = [ [ InlineKeyboardButton("🔑 重新设置", callback_data="autogptplus:set_token"), InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back"), ], ] reply_markup = InlineKeyboardMarkup(keyboard) await status_msg.edit_text( f"❌ Token 验证失败\n\n" f"Token: {token_display}\n" f"错误: {error_msg}\n\n" f"Token 未保存,请检查后重试", parse_mode="HTML", reply_markup=reply_markup ) except requests.exceptions.ConnectionError: keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")]] reply_markup = InlineKeyboardMarkup(keyboard) await status_msg.edit_text( "❌ 连接失败\n\n" "无法连接到 API 服务器\n" "请检查 mail_api_base 配置", parse_mode="HTML", reply_markup=reply_markup ) except requests.exceptions.Timeout: keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")]] reply_markup = InlineKeyboardMarkup(keyboard) await status_msg.edit_text( "❌ 连接超时\n\n" "API 服务器响应超时", parse_mode="HTML", reply_markup=reply_markup ) except ImportError: await status_msg.edit_text( "❌ 缺少 tomli_w 依赖\n" "请运行: uv add tomli_w" ) except Exception as e: keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")]] reply_markup = InlineKeyboardMarkup(keyboard) await status_msg.edit_text( f"❌ 验证失败\n\n{e}", parse_mode="HTML", reply_markup=reply_markup ) async def _handle_autogptplus_domain_input(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """处理域名添加输入""" try: from auto_gpt_team import add_email_domains text = update.message.text.strip() if not text: await update.message.reply_text("❌ 域名不能为空") return # 解析输入 (支持空格、逗号、换行分隔) domains = [s.strip() for s in text.replace(",", " ").replace("\n", " ").split() if s.strip()] if not domains: await update.message.reply_text("❌ 未提供有效的域名") return added, skipped, invalid, total = add_email_domains(domains) # 构建响应消息 lines = ["✅ 域名导入完成\n"] lines.append(f"新增: {added}") lines.append(f"跳过 (重复): {skipped}") if invalid > 0: lines.append(f"无效 (格式错误): {invalid}") lines.append(f"当前总数: {total}") keyboard = [[InlineKeyboardButton("📧 域名管理", callback_data="autogptplus:domains")]] reply_markup = InlineKeyboardMarkup(keyboard) await update.message.reply_text( "\n".join(lines), parse_mode="HTML", reply_markup=reply_markup ) except ImportError: await update.message.reply_text("❌ auto_gpt_team 模块未找到") except Exception as e: await update.message.reply_text(f"❌ 添加域名失败: {e}") async def _handle_autogptplus_iban_input(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """处理 IBAN 添加输入""" try: from auto_gpt_team import add_sepa_ibans text = update.message.text.strip() if not text: await update.message.reply_text("❌ IBAN 不能为空") return # 解析输入 (支持空格、逗号、换行分隔) ibans = [s.strip() for s in text.replace(",", " ").replace("\n", " ").split() if s.strip()] if not ibans: await update.message.reply_text("❌ 未提供有效的 IBAN") return added, skipped, total = add_sepa_ibans(ibans) # 构建响应消息 lines = ["✅ IBAN 导入完成\n"] lines.append(f"新增: {added}") lines.append(f"跳过 (重复/无效): {skipped}") lines.append(f"当前总数: {total}") keyboard = [[InlineKeyboardButton("💳 IBAN 管理", callback_data="autogptplus:ibans")]] reply_markup = InlineKeyboardMarkup(keyboard) await update.message.reply_text( "\n".join(lines), parse_mode="HTML", reply_markup=reply_markup ) except ImportError: await update.message.reply_text("❌ auto_gpt_team 模块未找到") except Exception as e: await update.message.reply_text(f"❌ 添加 IBAN 失败: {e}") async def main(): """主函数""" if not TELEGRAM_BOT_TOKEN: print("Telegram Bot Token 未配置,请在 config.toml 中设置 telegram.bot_token") sys.exit(1) if not TELEGRAM_ADMIN_CHAT_IDS: print("管理员 Chat ID 未配置,请在 config.toml 中设置 telegram.admin_chat_ids") sys.exit(1) bot = ProvisionerBot() # 处理 Ctrl+C import signal def signal_handler(sig, frame): log.info("正在关闭...") bot.request_shutdown() signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) await bot.start() if __name__ == "__main__": asyncio.run(main())