diff --git a/telegram_bot.py b/telegram_bot.py
index 24bd4b7..3719fa3 100644
--- a/telegram_bot.py
+++ b/telegram_bot.py
@@ -148,6 +148,7 @@ class ProvisionerBot:
("clean_errors", self.cmd_clean_errors),
("clean_teams", self.cmd_clean_teams),
("keys_usage", self.cmd_keys_usage),
+ ("autogptplus", self.cmd_autogptplus),
]
for cmd, handler in handlers:
self.app.add_handler(CommandHandler(cmd, handler))
@@ -175,6 +176,10 @@ class ProvisionerBot:
self.callback_team_register,
pattern="^team_reg:"
))
+ self.app.add_handler(CallbackQueryHandler(
+ self.callback_autogptplus,
+ pattern="^autogptplus:"
+ ))
# 注册自定义数量输入处理器 (GPT Team 注册)
self.app.add_handler(MessageHandler(
@@ -251,6 +256,7 @@ class ProvisionerBot:
BotCommand("gptmail_add", "添加 GPTMail API Key"),
BotCommand("gptmail_del", "删除 GPTMail API Key"),
BotCommand("test_email", "测试邮箱创建"),
+ BotCommand("autogptplus", "AutoGPTPlus 管理面板"),
]
try:
await self.app.bot.set_my_commands(commands)
@@ -320,7 +326,10 @@ class ProvisionerBot:
/team_fingerprint - 开启/关闭随机指纹
/team_register - 开始自动订阅注册
-💡 示例:
+� AutoGPTPlus:
+/autogptplus - ChatGPT 订阅自动化管理面板
+
+�💡 示例:
/list - 查看所有待处理账号
/run 0 - 处理第一个 Team
/gptmail_add my-api-key - 添加 Key
@@ -2991,10 +3000,428 @@ class ProvisionerBot:
self.current_task = None
self.current_team = None
+ @admin_only
+ async def cmd_autogptplus(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
+ """AutoGPTPlus 配置管理 - 交互式菜单"""
+ keyboard = [
+ [
+ InlineKeyboardButton("📋 查看配置", callback_data="autogptplus:config"),
+ InlineKeyboardButton("� 设置 Token", callback_data="autogptplus:set_token"),
+ ],
+ [
+ InlineKeyboardButton("� 测试邮件", callback_data="autogptplus:test_email"),
+ InlineKeyboardButton("🔄 测试 API", callback_data="autogptplus:test_api"),
+ ],
+ ]
+ reply_markup = InlineKeyboardMarkup(keyboard)
+
+ await update.message.reply_text(
+ "🤖 AutoGPTPlus 管理面板\n\n"
+ "ChatGPT 订阅自动化配置管理\n\n"
+ "请选择功能:",
+ parse_mode="HTML",
+ reply_markup=reply_markup
+ )
+
+ async def callback_autogptplus(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
+ """处理 AutoGPTPlus 回调"""
+ query = update.callback_query
+
+ # 权限检查
+ user_id = update.effective_user.id
+ if user_id not in TELEGRAM_ADMIN_CHAT_IDS:
+ await query.answer("⛔ 无权限", show_alert=True)
+ return
+
+ await query.answer()
+
+ data = query.data.split(":")
+ action = data[1] if len(data) > 1 else ""
+
+ if action == "config":
+ await self._show_autogptplus_config(query)
+ elif action == "set_token":
+ await self._prompt_autogptplus_token(query, context)
+ elif action == "test_email":
+ await self._test_autogptplus_email(query)
+ elif action == "test_api":
+ await self._test_autogptplus_api(query)
+ elif action == "back":
+ # 返回主菜单
+ keyboard = [
+ [
+ InlineKeyboardButton("📋 查看配置", callback_data="autogptplus:config"),
+ InlineKeyboardButton("� 设置 Token", callback_data="autogptplus:set_token"),
+ ],
+ [
+ InlineKeyboardButton("📧 测试邮件", callback_data="autogptplus:test_email"),
+ InlineKeyboardButton("🔄 测试 API", callback_data="autogptplus:test_api"),
+ ],
+ ]
+ reply_markup = InlineKeyboardMarkup(keyboard)
+
+ await query.edit_message_text(
+ "🤖 AutoGPTPlus 管理面板\n\n"
+ "ChatGPT 订阅自动化配置管理\n\n"
+ "请选择功能:",
+ parse_mode="HTML",
+ reply_markup=reply_markup
+ )
+
+ async def _prompt_autogptplus_token(self, query, context: ContextTypes.DEFAULT_TYPE):
+ """提示用户输入 Token"""
+ try:
+ from auto_gpt_team import MAIL_API_TOKEN
+
+ # 脱敏显示当前 Token
+ current_display = "未配置"
+ if MAIL_API_TOKEN:
+ if len(MAIL_API_TOKEN) > 10:
+ current_display = f"{MAIL_API_TOKEN[:8]}...{MAIL_API_TOKEN[-4:]}"
+ else:
+ current_display = MAIL_API_TOKEN[:4] + "..."
+
+ keyboard = [[InlineKeyboardButton("❌ 取消", callback_data="autogptplus:back")]]
+ reply_markup = InlineKeyboardMarkup(keyboard)
+
+ await query.edit_message_text(
+ "🔑 设置 Cloud Mail API Token\n\n"
+ f"当前 Token: {current_display}\n\n"
+ "请直接发送新的 Token:\n"
+ "(发送后将自动保存到 config.toml)",
+ parse_mode="HTML",
+ reply_markup=reply_markup
+ )
+
+ # 设置等待输入状态
+ context.user_data["autogptplus_waiting_token"] = True
+
+ except ImportError:
+ keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")]]
+ reply_markup = InlineKeyboardMarkup(keyboard)
+ await query.edit_message_text(
+ "❌ 模块未找到\n\n"
+ "auto_gpt_team 模块未安装或导入失败",
+ parse_mode="HTML",
+ reply_markup=reply_markup
+ )
+
+ async def _show_autogptplus_config(self, query):
+ """显示 AutoGPTPlus 配置"""
+ try:
+ from auto_gpt_team import (
+ MAIL_API_TOKEN, MAIL_API_BASE, EMAIL_DOMAINS,
+ SEPA_IBANS, RANDOM_FINGERPRINT, get_email_domains, get_sepa_ibans
+ )
+
+ # 脱敏显示 Token
+ token_display = "未配置"
+ if MAIL_API_TOKEN:
+ if len(MAIL_API_TOKEN) > 10:
+ token_display = f"{MAIL_API_TOKEN[:8]}...{MAIL_API_TOKEN[-4:]}"
+ else:
+ token_display = MAIL_API_TOKEN[:4] + "..."
+
+ # 获取域名列表
+ domains = get_email_domains()
+ domains_display = ", ".join(domains[:3]) if domains else "未配置"
+ if len(domains) > 3:
+ domains_display += f" (+{len(domains) - 3})"
+
+ # 获取 IBAN 列表
+ ibans = get_sepa_ibans()
+
+ # 随机指纹状态
+ fingerprint_status = "✅ 已开启" if RANDOM_FINGERPRINT else "❌ 已关闭"
+
+ lines = [
+ "📋 AutoGPTPlus 配置",
+ "",
+ "🔑 Cloud Mail API",
+ f" Token: {token_display}",
+ f" 地址: {MAIL_API_BASE or '未配置'}",
+ "",
+ "📧 邮箱域名",
+ f" 数量: {len(domains)} 个",
+ f" 域名: {domains_display}",
+ "",
+ "💳 SEPA IBAN",
+ f" 数量: {len(ibans)} 个",
+ "",
+ "🎭 随机指纹",
+ f" 状态: {fingerprint_status}",
+ ]
+
+ # 配置状态检查
+ config_ok = bool(MAIL_API_TOKEN and MAIL_API_BASE and domains)
+ if config_ok:
+ lines.append("\n✅ 配置完整,可以使用")
+ else:
+ lines.append("\n⚠️ 配置不完整:")
+ if not MAIL_API_TOKEN:
+ lines.append(" • 缺少 mail_api_token")
+ if not MAIL_API_BASE:
+ lines.append(" • 缺少 mail_api_base")
+ if not domains:
+ lines.append(" • 缺少 email_domains")
+
+ keyboard = [
+ [
+ InlineKeyboardButton("🔑 设置 Token", callback_data="autogptplus:set_token"),
+ InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back"),
+ ]
+ ]
+ reply_markup = InlineKeyboardMarkup(keyboard)
+
+ await query.edit_message_text(
+ "\n".join(lines),
+ parse_mode="HTML",
+ reply_markup=reply_markup
+ )
+
+ except ImportError:
+ keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")]]
+ reply_markup = InlineKeyboardMarkup(keyboard)
+ await query.edit_message_text(
+ "❌ 模块未找到\n\n"
+ "auto_gpt_team 模块未安装或导入失败",
+ parse_mode="HTML",
+ reply_markup=reply_markup
+ )
+ except Exception as e:
+ keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")]]
+ reply_markup = InlineKeyboardMarkup(keyboard)
+ await query.edit_message_text(
+ f"❌ 获取配置失败: {e}",
+ reply_markup=reply_markup
+ )
+
+ async def _test_autogptplus_email(self, query):
+ """测试 AutoGPTPlus 邮件创建"""
+ await query.edit_message_text("⏳ 正在测试邮件创建...")
+
+ try:
+ from auto_gpt_team import (
+ MAIL_API_TOKEN, MAIL_API_BASE, get_email_domains
+ )
+ import requests
+ import random
+ import string
+
+ # 检查配置
+ if not MAIL_API_TOKEN or not MAIL_API_BASE:
+ keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")]]
+ reply_markup = InlineKeyboardMarkup(keyboard)
+ await query.edit_message_text(
+ "❌ 配置不完整\n\n"
+ "请先在 config.toml 中配置:\n"
+ "• mail_api_token\n"
+ "• mail_api_base",
+ parse_mode="HTML",
+ reply_markup=reply_markup
+ )
+ return
+
+ domains = get_email_domains()
+ if not domains:
+ keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")]]
+ reply_markup = InlineKeyboardMarkup(keyboard)
+ await query.edit_message_text(
+ "❌ 邮箱域名未配置\n\n"
+ "请先在 config.toml 中配置 email_domains\n"
+ "或使用 /domain_add 添加域名",
+ parse_mode="HTML",
+ reply_markup=reply_markup
+ )
+ return
+
+ # 生成测试邮箱
+ random_str = ''.join(random.choices(string.ascii_lowercase + string.digits, k=8))
+ domain = random.choice(domains)
+ test_email = f"test-{random_str}@{domain.lstrip('@')}"
+
+ # 测试创建邮箱 (通过查询邮件列表来验证 API 连接)
+ url = f"{MAIL_API_BASE}/api/public/emailList"
+ headers = {
+ "Authorization": MAIL_API_TOKEN,
+ "Content-Type": "application/json"
+ }
+ payload = {
+ "toEmail": test_email,
+ "timeSort": "desc",
+ "size": 1
+ }
+
+ response = requests.post(url, headers=headers, json=payload, timeout=10)
+ data = response.json()
+
+ keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")]]
+ reply_markup = InlineKeyboardMarkup(keyboard)
+
+ if data.get("code") == 200:
+ await query.edit_message_text(
+ "✅ 邮件 API 测试成功\n\n"
+ f"测试邮箱: {test_email}\n"
+ f"API 响应: 正常\n\n"
+ f"邮件系统已就绪,可以接收验证码",
+ parse_mode="HTML",
+ reply_markup=reply_markup
+ )
+ else:
+ error_msg = data.get("message", "未知错误")
+ await query.edit_message_text(
+ f"⚠️ API 响应异常\n\n"
+ f"状态码: {data.get('code')}\n"
+ f"错误: {error_msg}",
+ parse_mode="HTML",
+ reply_markup=reply_markup
+ )
+
+ except ImportError:
+ keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")]]
+ reply_markup = InlineKeyboardMarkup(keyboard)
+ await query.edit_message_text(
+ "❌ 模块未找到\n\n"
+ "auto_gpt_team 模块未安装或导入失败",
+ parse_mode="HTML",
+ reply_markup=reply_markup
+ )
+ except requests.exceptions.Timeout:
+ keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")]]
+ reply_markup = InlineKeyboardMarkup(keyboard)
+ await query.edit_message_text(
+ "❌ 连接超时\n\n"
+ "无法连接到邮件 API 服务器\n"
+ "请检查 mail_api_base 配置",
+ parse_mode="HTML",
+ reply_markup=reply_markup
+ )
+ except Exception as e:
+ keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")]]
+ reply_markup = InlineKeyboardMarkup(keyboard)
+ await query.edit_message_text(
+ f"❌ 测试失败\n\n{e}",
+ parse_mode="HTML",
+ reply_markup=reply_markup
+ )
+
+ async def _test_autogptplus_api(self, query):
+ """测试 AutoGPTPlus API 连接"""
+ await query.edit_message_text("⏳ 正在测试 API 连接...")
+
+ try:
+ from auto_gpt_team import MAIL_API_TOKEN, MAIL_API_BASE
+ import requests
+
+ # 检查配置
+ if not MAIL_API_TOKEN or not MAIL_API_BASE:
+ keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")]]
+ reply_markup = InlineKeyboardMarkup(keyboard)
+ await query.edit_message_text(
+ "❌ 配置不完整\n\n"
+ "请先在 config.toml 中配置:\n"
+ "• mail_api_token\n"
+ "• mail_api_base",
+ parse_mode="HTML",
+ reply_markup=reply_markup
+ )
+ return
+
+ # 测试 API 连接
+ url = f"{MAIL_API_BASE}/api/public/emailList"
+ headers = {
+ "Authorization": MAIL_API_TOKEN,
+ "Content-Type": "application/json"
+ }
+ payload = {
+ "toEmail": "test@test.com",
+ "timeSort": "desc",
+ "size": 1
+ }
+
+ start_time = time.time()
+ response = requests.post(url, headers=headers, json=payload, timeout=10)
+ elapsed = time.time() - start_time
+
+ data = response.json()
+
+ keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")]]
+ reply_markup = InlineKeyboardMarkup(keyboard)
+
+ if response.status_code == 200 and data.get("code") == 200:
+ await query.edit_message_text(
+ "✅ API 连接测试成功\n\n"
+ f"服务器: {MAIL_API_BASE}\n"
+ f"响应时间: {elapsed*1000:.0f}ms\n"
+ f"状态: 正常\n\n"
+ "Cloud Mail API 服务运行正常",
+ parse_mode="HTML",
+ reply_markup=reply_markup
+ )
+ else:
+ error_msg = data.get("message", "未知错误")
+ await query.edit_message_text(
+ f"⚠️ API 响应异常\n\n"
+ f"HTTP 状态: {response.status_code}\n"
+ f"API 状态: {data.get('code')}\n"
+ f"错误: {error_msg}\n"
+ f"响应时间: {elapsed*1000:.0f}ms",
+ parse_mode="HTML",
+ reply_markup=reply_markup
+ )
+
+ except ImportError:
+ keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")]]
+ reply_markup = InlineKeyboardMarkup(keyboard)
+ await query.edit_message_text(
+ "❌ 模块未找到\n\n"
+ "auto_gpt_team 模块未安装或导入失败",
+ parse_mode="HTML",
+ reply_markup=reply_markup
+ )
+ except requests.exceptions.ConnectionError:
+ keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")]]
+ reply_markup = InlineKeyboardMarkup(keyboard)
+ await query.edit_message_text(
+ "❌ 连接失败\n\n"
+ "无法连接到邮件 API 服务器\n"
+ "请检查:\n"
+ "• mail_api_base 地址是否正确\n"
+ "• 服务器是否在线\n"
+ "• 网络连接是否正常",
+ parse_mode="HTML",
+ reply_markup=reply_markup
+ )
+ except requests.exceptions.Timeout:
+ keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")]]
+ reply_markup = InlineKeyboardMarkup(keyboard)
+ await query.edit_message_text(
+ "❌ 连接超时\n\n"
+ "API 服务器响应超时 (>10s)\n"
+ "请检查服务器状态",
+ parse_mode="HTML",
+ reply_markup=reply_markup
+ )
+ except Exception as e:
+ keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")]]
+ reply_markup = InlineKeyboardMarkup(keyboard)
+ await query.edit_message_text(
+ f"❌ 测试失败\n\n{e}",
+ parse_mode="HTML",
+ reply_markup=reply_markup
+ )
+
@admin_only
async def handle_team_custom_count(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
- """处理 GPT Team 自定义数量输入"""
- # 检查是否在等待输入状态
+ """处理文本输入 (GPT Team 自定义数量 / AutoGPTPlus Token)"""
+
+ # 处理 AutoGPTPlus Token 输入
+ if context.user_data.get("autogptplus_waiting_token"):
+ context.user_data["autogptplus_waiting_token"] = False
+ await self._handle_autogptplus_token_input(update, context)
+ return
+
+ # 处理 GPT Team 自定义数量输入
if not context.user_data.get("team_waiting_count"):
return # 不在等待状态,忽略消息
@@ -3038,6 +3465,147 @@ class ProvisionerBot:
reply_markup=reply_markup
)
+ async def _handle_autogptplus_token_input(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
+ """处理 AutoGPTPlus Token 输入 - 保存后立即测试"""
+ import tomli_w
+ import requests
+
+ token = update.message.text.strip()
+
+ if not token:
+ await update.message.reply_text("❌ Token 不能为空")
+ return
+
+ # 脱敏显示
+ if len(token) > 10:
+ token_display = f"{token[:8]}...{token[-4:]}"
+ else:
+ token_display = token[:4] + "..."
+
+ # 先发送保存中的消息
+ status_msg = await update.message.reply_text(
+ f"⏳ 正在保存并验证 Token...\n\n"
+ f"Token: {token_display}",
+ parse_mode="HTML"
+ )
+
+ try:
+ # 读取当前配置获取 API 地址
+ with open(CONFIG_FILE, "rb") as f:
+ import tomllib
+ config = tomllib.load(f)
+
+ mail_api_base = config.get("autogptplus", {}).get("mail_api_base", "")
+
+ if not mail_api_base:
+ keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")]]
+ reply_markup = InlineKeyboardMarkup(keyboard)
+ await status_msg.edit_text(
+ "❌ 无法验证 Token\n\n"
+ "mail_api_base 未配置\n"
+ "请先在 config.toml 中配置 API 地址",
+ parse_mode="HTML",
+ reply_markup=reply_markup
+ )
+ return
+
+ # 测试 Token 是否有效
+ url = f"{mail_api_base}/api/public/emailList"
+ headers = {
+ "Authorization": token,
+ "Content-Type": "application/json"
+ }
+ payload = {
+ "toEmail": "test@test.com",
+ "timeSort": "desc",
+ "size": 1
+ }
+
+ start_time = time.time()
+ response = requests.post(url, headers=headers, json=payload, timeout=10)
+ elapsed = time.time() - start_time
+
+ data = response.json()
+
+ # 检查 Token 是否有效
+ if response.status_code == 200 and data.get("code") == 200:
+ # Token 有效,保存到配置文件
+ if "autogptplus" not in config:
+ config["autogptplus"] = {}
+ config["autogptplus"]["mail_api_token"] = token
+
+ with open(CONFIG_FILE, "wb") as f:
+ tomli_w.dump(config, f)
+
+ keyboard = [
+ [
+ InlineKeyboardButton("📋 查看配置", callback_data="autogptplus:config"),
+ InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back"),
+ ],
+ ]
+ reply_markup = InlineKeyboardMarkup(keyboard)
+
+ await status_msg.edit_text(
+ f"✅ Token 验证成功并已保存\n\n"
+ f"Token: {token_display}\n"
+ f"响应时间: {elapsed*1000:.0f}ms\n\n"
+ f"💡 使用 /reload 重载配置使其生效",
+ parse_mode="HTML",
+ reply_markup=reply_markup
+ )
+ else:
+ # Token 无效
+ error_msg = data.get("message", "未知错误")
+ keyboard = [
+ [
+ InlineKeyboardButton("🔑 重新设置", callback_data="autogptplus:set_token"),
+ InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back"),
+ ],
+ ]
+ reply_markup = InlineKeyboardMarkup(keyboard)
+
+ await status_msg.edit_text(
+ f"❌ Token 验证失败\n\n"
+ f"Token: {token_display}\n"
+ f"错误: {error_msg}\n\n"
+ f"Token 未保存,请检查后重试",
+ parse_mode="HTML",
+ reply_markup=reply_markup
+ )
+
+ except requests.exceptions.ConnectionError:
+ keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")]]
+ reply_markup = InlineKeyboardMarkup(keyboard)
+ await status_msg.edit_text(
+ "❌ 连接失败\n\n"
+ "无法连接到 API 服务器\n"
+ "请检查 mail_api_base 配置",
+ parse_mode="HTML",
+ reply_markup=reply_markup
+ )
+ except requests.exceptions.Timeout:
+ keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")]]
+ reply_markup = InlineKeyboardMarkup(keyboard)
+ await status_msg.edit_text(
+ "❌ 连接超时\n\n"
+ "API 服务器响应超时",
+ parse_mode="HTML",
+ reply_markup=reply_markup
+ )
+ except ImportError:
+ await status_msg.edit_text(
+ "❌ 缺少 tomli_w 依赖\n"
+ "请运行: uv add tomli_w"
+ )
+ except Exception as e:
+ keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")]]
+ reply_markup = InlineKeyboardMarkup(keyboard)
+ await status_msg.edit_text(
+ f"❌ 验证失败\n\n{e}",
+ parse_mode="HTML",
+ reply_markup=reply_markup
+ )
+
async def main():
"""主函数"""