From 0e8b5ba2372a35020ecbf37392d60fc3e688869d Mon Sep 17 00:00:00 2001 From: kyx236 Date: Sat, 24 Jan 2026 06:35:29 +0800 Subject: [PATCH] feat(telegram_bot): Add AutoGPTPlus management panel with configuration controls - Add /autogptplus command with interactive menu for ChatGPT subscription automation - Implement callback handler for AutoGPTPlus actions (config view, token setup, email/API testing) - Add _show_autogptplus_config() to display current configuration with masked sensitive data - Add _prompt_autogptplus_token() to handle Cloud Mail API token input and updates - Add _test_autogptplus_email() and _test_autogptplus_api() for testing functionality - Add _update_autogptplus_token() to persist token changes to config.toml - Register AutoGPTPlus command in bot command list with Chinese description - Update help text to include AutoGPTPlus management panel section - Add admin-only access control for all AutoGPTPlus operations - Provides centralized management interface for email domains, SEPA IBANs, and API configuration --- telegram_bot.py | 574 +++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 571 insertions(+), 3 deletions(-) diff --git a/telegram_bot.py b/telegram_bot.py index 24bd4b7..3719fa3 100644 --- a/telegram_bot.py +++ b/telegram_bot.py @@ -148,6 +148,7 @@ class ProvisionerBot: ("clean_errors", self.cmd_clean_errors), ("clean_teams", self.cmd_clean_teams), ("keys_usage", self.cmd_keys_usage), + ("autogptplus", self.cmd_autogptplus), ] for cmd, handler in handlers: self.app.add_handler(CommandHandler(cmd, handler)) @@ -175,6 +176,10 @@ class ProvisionerBot: self.callback_team_register, pattern="^team_reg:" )) + self.app.add_handler(CallbackQueryHandler( + self.callback_autogptplus, + pattern="^autogptplus:" + )) # 注册自定义数量输入处理器 (GPT Team 注册) self.app.add_handler(MessageHandler( @@ -251,6 +256,7 @@ class ProvisionerBot: BotCommand("gptmail_add", "添加 GPTMail API Key"), BotCommand("gptmail_del", "删除 GPTMail API Key"), BotCommand("test_email", "测试邮箱创建"), + BotCommand("autogptplus", "AutoGPTPlus 管理面板"), ] try: await self.app.bot.set_my_commands(commands) @@ -320,7 +326,10 @@ class ProvisionerBot: /team_fingerprint - 开启/关闭随机指纹 /team_register - 开始自动订阅注册 -💡 示例: +� AutoGPTPlus: +/autogptplus - ChatGPT 订阅自动化管理面板 + +�💡 示例: /list - 查看所有待处理账号 /run 0 - 处理第一个 Team /gptmail_add my-api-key - 添加 Key @@ -2991,10 +3000,428 @@ class ProvisionerBot: self.current_task = None self.current_team = None + @admin_only + async def cmd_autogptplus(self, update: Update, context: ContextTypes.DEFAULT_TYPE): + """AutoGPTPlus 配置管理 - 交互式菜单""" + keyboard = [ + [ + InlineKeyboardButton("📋 查看配置", callback_data="autogptplus:config"), + InlineKeyboardButton("� 设置 Token", callback_data="autogptplus:set_token"), + ], + [ + InlineKeyboardButton("� 测试邮件", callback_data="autogptplus:test_email"), + InlineKeyboardButton("🔄 测试 API", callback_data="autogptplus:test_api"), + ], + ] + reply_markup = InlineKeyboardMarkup(keyboard) + + await update.message.reply_text( + "🤖 AutoGPTPlus 管理面板\n\n" + "ChatGPT 订阅自动化配置管理\n\n" + "请选择功能:", + parse_mode="HTML", + reply_markup=reply_markup + ) + + async def callback_autogptplus(self, update: Update, context: ContextTypes.DEFAULT_TYPE): + """处理 AutoGPTPlus 回调""" + query = update.callback_query + + # 权限检查 + user_id = update.effective_user.id + if user_id not in TELEGRAM_ADMIN_CHAT_IDS: + await query.answer("⛔ 无权限", show_alert=True) + return + + await query.answer() + + data = query.data.split(":") + action = data[1] if len(data) > 1 else "" + + if action == "config": + await self._show_autogptplus_config(query) + elif action == "set_token": + await self._prompt_autogptplus_token(query, context) + elif action == "test_email": + await self._test_autogptplus_email(query) + elif action == "test_api": + await self._test_autogptplus_api(query) + elif action == "back": + # 返回主菜单 + keyboard = [ + [ + InlineKeyboardButton("📋 查看配置", callback_data="autogptplus:config"), + InlineKeyboardButton("� 设置 Token", callback_data="autogptplus:set_token"), + ], + [ + InlineKeyboardButton("📧 测试邮件", callback_data="autogptplus:test_email"), + InlineKeyboardButton("🔄 测试 API", callback_data="autogptplus:test_api"), + ], + ] + reply_markup = InlineKeyboardMarkup(keyboard) + + await query.edit_message_text( + "🤖 AutoGPTPlus 管理面板\n\n" + "ChatGPT 订阅自动化配置管理\n\n" + "请选择功能:", + parse_mode="HTML", + reply_markup=reply_markup + ) + + async def _prompt_autogptplus_token(self, query, context: ContextTypes.DEFAULT_TYPE): + """提示用户输入 Token""" + try: + from auto_gpt_team import MAIL_API_TOKEN + + # 脱敏显示当前 Token + current_display = "未配置" + if MAIL_API_TOKEN: + if len(MAIL_API_TOKEN) > 10: + current_display = f"{MAIL_API_TOKEN[:8]}...{MAIL_API_TOKEN[-4:]}" + else: + current_display = MAIL_API_TOKEN[:4] + "..." + + keyboard = [[InlineKeyboardButton("❌ 取消", callback_data="autogptplus:back")]] + reply_markup = InlineKeyboardMarkup(keyboard) + + await query.edit_message_text( + "🔑 设置 Cloud Mail API Token\n\n" + f"当前 Token: {current_display}\n\n" + "请直接发送新的 Token:\n" + "(发送后将自动保存到 config.toml)", + parse_mode="HTML", + reply_markup=reply_markup + ) + + # 设置等待输入状态 + context.user_data["autogptplus_waiting_token"] = True + + except ImportError: + keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")]] + reply_markup = InlineKeyboardMarkup(keyboard) + await query.edit_message_text( + "❌ 模块未找到\n\n" + "auto_gpt_team 模块未安装或导入失败", + parse_mode="HTML", + reply_markup=reply_markup + ) + + async def _show_autogptplus_config(self, query): + """显示 AutoGPTPlus 配置""" + try: + from auto_gpt_team import ( + MAIL_API_TOKEN, MAIL_API_BASE, EMAIL_DOMAINS, + SEPA_IBANS, RANDOM_FINGERPRINT, get_email_domains, get_sepa_ibans + ) + + # 脱敏显示 Token + token_display = "未配置" + if MAIL_API_TOKEN: + if len(MAIL_API_TOKEN) > 10: + token_display = f"{MAIL_API_TOKEN[:8]}...{MAIL_API_TOKEN[-4:]}" + else: + token_display = MAIL_API_TOKEN[:4] + "..." + + # 获取域名列表 + domains = get_email_domains() + domains_display = ", ".join(domains[:3]) if domains else "未配置" + if len(domains) > 3: + domains_display += f" (+{len(domains) - 3})" + + # 获取 IBAN 列表 + ibans = get_sepa_ibans() + + # 随机指纹状态 + fingerprint_status = "✅ 已开启" if RANDOM_FINGERPRINT else "❌ 已关闭" + + lines = [ + "📋 AutoGPTPlus 配置", + "", + "🔑 Cloud Mail API", + f" Token: {token_display}", + f" 地址: {MAIL_API_BASE or '未配置'}", + "", + "📧 邮箱域名", + f" 数量: {len(domains)} 个", + f" 域名: {domains_display}", + "", + "💳 SEPA IBAN", + f" 数量: {len(ibans)} 个", + "", + "🎭 随机指纹", + f" 状态: {fingerprint_status}", + ] + + # 配置状态检查 + config_ok = bool(MAIL_API_TOKEN and MAIL_API_BASE and domains) + if config_ok: + lines.append("\n✅ 配置完整,可以使用") + else: + lines.append("\n⚠️ 配置不完整:") + if not MAIL_API_TOKEN: + lines.append(" • 缺少 mail_api_token") + if not MAIL_API_BASE: + lines.append(" • 缺少 mail_api_base") + if not domains: + lines.append(" • 缺少 email_domains") + + keyboard = [ + [ + InlineKeyboardButton("🔑 设置 Token", callback_data="autogptplus:set_token"), + InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back"), + ] + ] + reply_markup = InlineKeyboardMarkup(keyboard) + + await query.edit_message_text( + "\n".join(lines), + parse_mode="HTML", + reply_markup=reply_markup + ) + + except ImportError: + keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")]] + reply_markup = InlineKeyboardMarkup(keyboard) + await query.edit_message_text( + "❌ 模块未找到\n\n" + "auto_gpt_team 模块未安装或导入失败", + parse_mode="HTML", + reply_markup=reply_markup + ) + except Exception as e: + keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")]] + reply_markup = InlineKeyboardMarkup(keyboard) + await query.edit_message_text( + f"❌ 获取配置失败: {e}", + reply_markup=reply_markup + ) + + async def _test_autogptplus_email(self, query): + """测试 AutoGPTPlus 邮件创建""" + await query.edit_message_text("⏳ 正在测试邮件创建...") + + try: + from auto_gpt_team import ( + MAIL_API_TOKEN, MAIL_API_BASE, get_email_domains + ) + import requests + import random + import string + + # 检查配置 + if not MAIL_API_TOKEN or not MAIL_API_BASE: + keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")]] + reply_markup = InlineKeyboardMarkup(keyboard) + await query.edit_message_text( + "❌ 配置不完整\n\n" + "请先在 config.toml 中配置:\n" + "• mail_api_token\n" + "• mail_api_base", + parse_mode="HTML", + reply_markup=reply_markup + ) + return + + domains = get_email_domains() + if not domains: + keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")]] + reply_markup = InlineKeyboardMarkup(keyboard) + await query.edit_message_text( + "❌ 邮箱域名未配置\n\n" + "请先在 config.toml 中配置 email_domains\n" + "或使用 /domain_add 添加域名", + parse_mode="HTML", + reply_markup=reply_markup + ) + return + + # 生成测试邮箱 + random_str = ''.join(random.choices(string.ascii_lowercase + string.digits, k=8)) + domain = random.choice(domains) + test_email = f"test-{random_str}@{domain.lstrip('@')}" + + # 测试创建邮箱 (通过查询邮件列表来验证 API 连接) + url = f"{MAIL_API_BASE}/api/public/emailList" + headers = { + "Authorization": MAIL_API_TOKEN, + "Content-Type": "application/json" + } + payload = { + "toEmail": test_email, + "timeSort": "desc", + "size": 1 + } + + response = requests.post(url, headers=headers, json=payload, timeout=10) + data = response.json() + + keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")]] + reply_markup = InlineKeyboardMarkup(keyboard) + + if data.get("code") == 200: + await query.edit_message_text( + "✅ 邮件 API 测试成功\n\n" + f"测试邮箱: {test_email}\n" + f"API 响应: 正常\n\n" + f"邮件系统已就绪,可以接收验证码", + parse_mode="HTML", + reply_markup=reply_markup + ) + else: + error_msg = data.get("message", "未知错误") + await query.edit_message_text( + f"⚠️ API 响应异常\n\n" + f"状态码: {data.get('code')}\n" + f"错误: {error_msg}", + parse_mode="HTML", + reply_markup=reply_markup + ) + + except ImportError: + keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")]] + reply_markup = InlineKeyboardMarkup(keyboard) + await query.edit_message_text( + "❌ 模块未找到\n\n" + "auto_gpt_team 模块未安装或导入失败", + parse_mode="HTML", + reply_markup=reply_markup + ) + except requests.exceptions.Timeout: + keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")]] + reply_markup = InlineKeyboardMarkup(keyboard) + await query.edit_message_text( + "❌ 连接超时\n\n" + "无法连接到邮件 API 服务器\n" + "请检查 mail_api_base 配置", + parse_mode="HTML", + reply_markup=reply_markup + ) + except Exception as e: + keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")]] + reply_markup = InlineKeyboardMarkup(keyboard) + await query.edit_message_text( + f"❌ 测试失败\n\n{e}", + parse_mode="HTML", + reply_markup=reply_markup + ) + + async def _test_autogptplus_api(self, query): + """测试 AutoGPTPlus API 连接""" + await query.edit_message_text("⏳ 正在测试 API 连接...") + + try: + from auto_gpt_team import MAIL_API_TOKEN, MAIL_API_BASE + import requests + + # 检查配置 + if not MAIL_API_TOKEN or not MAIL_API_BASE: + keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")]] + reply_markup = InlineKeyboardMarkup(keyboard) + await query.edit_message_text( + "❌ 配置不完整\n\n" + "请先在 config.toml 中配置:\n" + "• mail_api_token\n" + "• mail_api_base", + parse_mode="HTML", + reply_markup=reply_markup + ) + return + + # 测试 API 连接 + url = f"{MAIL_API_BASE}/api/public/emailList" + headers = { + "Authorization": MAIL_API_TOKEN, + "Content-Type": "application/json" + } + payload = { + "toEmail": "test@test.com", + "timeSort": "desc", + "size": 1 + } + + start_time = time.time() + response = requests.post(url, headers=headers, json=payload, timeout=10) + elapsed = time.time() - start_time + + data = response.json() + + keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")]] + reply_markup = InlineKeyboardMarkup(keyboard) + + if response.status_code == 200 and data.get("code") == 200: + await query.edit_message_text( + "✅ API 连接测试成功\n\n" + f"服务器: {MAIL_API_BASE}\n" + f"响应时间: {elapsed*1000:.0f}ms\n" + f"状态: 正常\n\n" + "Cloud Mail API 服务运行正常", + parse_mode="HTML", + reply_markup=reply_markup + ) + else: + error_msg = data.get("message", "未知错误") + await query.edit_message_text( + f"⚠️ API 响应异常\n\n" + f"HTTP 状态: {response.status_code}\n" + f"API 状态: {data.get('code')}\n" + f"错误: {error_msg}\n" + f"响应时间: {elapsed*1000:.0f}ms", + parse_mode="HTML", + reply_markup=reply_markup + ) + + except ImportError: + keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")]] + reply_markup = InlineKeyboardMarkup(keyboard) + await query.edit_message_text( + "❌ 模块未找到\n\n" + "auto_gpt_team 模块未安装或导入失败", + parse_mode="HTML", + reply_markup=reply_markup + ) + except requests.exceptions.ConnectionError: + keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")]] + reply_markup = InlineKeyboardMarkup(keyboard) + await query.edit_message_text( + "❌ 连接失败\n\n" + "无法连接到邮件 API 服务器\n" + "请检查:\n" + "• mail_api_base 地址是否正确\n" + "• 服务器是否在线\n" + "• 网络连接是否正常", + parse_mode="HTML", + reply_markup=reply_markup + ) + except requests.exceptions.Timeout: + keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")]] + reply_markup = InlineKeyboardMarkup(keyboard) + await query.edit_message_text( + "❌ 连接超时\n\n" + "API 服务器响应超时 (>10s)\n" + "请检查服务器状态", + parse_mode="HTML", + reply_markup=reply_markup + ) + except Exception as e: + keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")]] + reply_markup = InlineKeyboardMarkup(keyboard) + await query.edit_message_text( + f"❌ 测试失败\n\n{e}", + parse_mode="HTML", + reply_markup=reply_markup + ) + @admin_only async def handle_team_custom_count(self, update: Update, context: ContextTypes.DEFAULT_TYPE): - """处理 GPT Team 自定义数量输入""" - # 检查是否在等待输入状态 + """处理文本输入 (GPT Team 自定义数量 / AutoGPTPlus Token)""" + + # 处理 AutoGPTPlus Token 输入 + if context.user_data.get("autogptplus_waiting_token"): + context.user_data["autogptplus_waiting_token"] = False + await self._handle_autogptplus_token_input(update, context) + return + + # 处理 GPT Team 自定义数量输入 if not context.user_data.get("team_waiting_count"): return # 不在等待状态,忽略消息 @@ -3038,6 +3465,147 @@ class ProvisionerBot: reply_markup=reply_markup ) + async def _handle_autogptplus_token_input(self, update: Update, context: ContextTypes.DEFAULT_TYPE): + """处理 AutoGPTPlus Token 输入 - 保存后立即测试""" + import tomli_w + import requests + + token = update.message.text.strip() + + if not token: + await update.message.reply_text("❌ Token 不能为空") + return + + # 脱敏显示 + if len(token) > 10: + token_display = f"{token[:8]}...{token[-4:]}" + else: + token_display = token[:4] + "..." + + # 先发送保存中的消息 + status_msg = await update.message.reply_text( + f"⏳ 正在保存并验证 Token...\n\n" + f"Token: {token_display}", + parse_mode="HTML" + ) + + try: + # 读取当前配置获取 API 地址 + with open(CONFIG_FILE, "rb") as f: + import tomllib + config = tomllib.load(f) + + mail_api_base = config.get("autogptplus", {}).get("mail_api_base", "") + + if not mail_api_base: + keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")]] + reply_markup = InlineKeyboardMarkup(keyboard) + await status_msg.edit_text( + "❌ 无法验证 Token\n\n" + "mail_api_base 未配置\n" + "请先在 config.toml 中配置 API 地址", + parse_mode="HTML", + reply_markup=reply_markup + ) + return + + # 测试 Token 是否有效 + url = f"{mail_api_base}/api/public/emailList" + headers = { + "Authorization": token, + "Content-Type": "application/json" + } + payload = { + "toEmail": "test@test.com", + "timeSort": "desc", + "size": 1 + } + + start_time = time.time() + response = requests.post(url, headers=headers, json=payload, timeout=10) + elapsed = time.time() - start_time + + data = response.json() + + # 检查 Token 是否有效 + if response.status_code == 200 and data.get("code") == 200: + # Token 有效,保存到配置文件 + if "autogptplus" not in config: + config["autogptplus"] = {} + config["autogptplus"]["mail_api_token"] = token + + with open(CONFIG_FILE, "wb") as f: + tomli_w.dump(config, f) + + keyboard = [ + [ + InlineKeyboardButton("📋 查看配置", callback_data="autogptplus:config"), + InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back"), + ], + ] + reply_markup = InlineKeyboardMarkup(keyboard) + + await status_msg.edit_text( + f"✅ Token 验证成功并已保存\n\n" + f"Token: {token_display}\n" + f"响应时间: {elapsed*1000:.0f}ms\n\n" + f"💡 使用 /reload 重载配置使其生效", + parse_mode="HTML", + reply_markup=reply_markup + ) + else: + # Token 无效 + error_msg = data.get("message", "未知错误") + keyboard = [ + [ + InlineKeyboardButton("🔑 重新设置", callback_data="autogptplus:set_token"), + InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back"), + ], + ] + reply_markup = InlineKeyboardMarkup(keyboard) + + await status_msg.edit_text( + f"❌ Token 验证失败\n\n" + f"Token: {token_display}\n" + f"错误: {error_msg}\n\n" + f"Token 未保存,请检查后重试", + parse_mode="HTML", + reply_markup=reply_markup + ) + + except requests.exceptions.ConnectionError: + keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")]] + reply_markup = InlineKeyboardMarkup(keyboard) + await status_msg.edit_text( + "❌ 连接失败\n\n" + "无法连接到 API 服务器\n" + "请检查 mail_api_base 配置", + parse_mode="HTML", + reply_markup=reply_markup + ) + except requests.exceptions.Timeout: + keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")]] + reply_markup = InlineKeyboardMarkup(keyboard) + await status_msg.edit_text( + "❌ 连接超时\n\n" + "API 服务器响应超时", + parse_mode="HTML", + reply_markup=reply_markup + ) + except ImportError: + await status_msg.edit_text( + "❌ 缺少 tomli_w 依赖\n" + "请运行: uv add tomli_w" + ) + except Exception as e: + keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")]] + reply_markup = InlineKeyboardMarkup(keyboard) + await status_msg.edit_text( + f"❌ 验证失败\n\n{e}", + parse_mode="HTML", + reply_markup=reply_markup + ) + async def main(): """主函数"""