# ==================== Telegram Bot 主程序 ====================
# 通过 Telegram 远程控制 OpenAI Team 批量注册任务
import asyncio
import sys
import time
from concurrent.futures import ThreadPoolExecutor
from functools import wraps
from pathlib import Path
from typing import Optional
from telegram import Update, Bot, BotCommand, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import (
Application,
CommandHandler,
MessageHandler,
CallbackQueryHandler,
filters,
ContextTypes,
)
from config import (
TELEGRAM_BOT_TOKEN,
TELEGRAM_ADMIN_CHAT_IDS,
TEAMS,
AUTH_PROVIDER,
TEAM_JSON_FILE,
TEAM_TRACKER_FILE,
CSV_FILE,
TELEGRAM_CHECK_INTERVAL,
TELEGRAM_LOW_STOCK_THRESHOLD,
CONFIG_FILE,
EMAIL_PROVIDER,
ACCOUNTS_PER_TEAM,
PROXY_ENABLED,
PROXIES,
S2A_API_BASE,
CPA_API_BASE,
CRS_API_BASE,
get_gptmail_keys,
add_gptmail_key,
remove_gptmail_key,
GPTMAIL_API_KEYS,
INCLUDE_TEAM_OWNERS,
reload_config,
S2A_CONCURRENCY,
S2A_PRIORITY,
S2A_GROUP_NAMES,
S2A_GROUP_IDS,
S2A_ADMIN_KEY,
BROWSER_RANDOM_FINGERPRINT,
batch_remove_teams_by_names,
)
from utils import load_team_tracker, get_all_incomplete_accounts, save_team_tracker, get_completed_teams, batch_remove_completed_teams
from bot_notifier import BotNotifier, set_notifier, progress_finish
from s2a_service import (
s2a_get_dashboard_stats, format_dashboard_stats, s2a_get_keys_with_usage, format_keys_usage,
s2a_get_error_accounts, s2a_delete_account, s2a_batch_delete_error_accounts
)
from email_service import gptmail_service, unified_create_email
from logger import log
def admin_only(func):
"""装饰器: 仅允许管理员执行命令"""
@wraps(func)
async def wrapper(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
user_id = update.effective_user.id
if user_id not in TELEGRAM_ADMIN_CHAT_IDS:
await update.message.reply_text("⛔ 无权限,你的 ID 不在管理员列表中")
return
return await func(self, update, context)
return wrapper
class ProvisionerBot:
"""OpenAI Team Provisioner Telegram Bot"""
def __init__(self):
self.executor = ThreadPoolExecutor(max_workers=1)
self.current_task: Optional[asyncio.Task] = None
self.current_team: Optional[str] = None
self.app: Optional[Application] = None
self.notifier: Optional[BotNotifier] = None
self._shutdown_event = asyncio.Event()
# JSON 导入批量进度跟踪
self._import_progress_message = None # 进度消息对象
self._import_progress_lock = asyncio.Lock() # 并发锁
self._import_batch_stats = { # 批量统计
"total_files": 0,
"processed_files": 0,
"total_added": 0,
"total_skipped": 0,
"current_file": "",
"errors": []
}
self._import_batch_timeout_task = None # 超时任务
async def start(self):
"""启动 Bot"""
if not TELEGRAM_BOT_TOKEN:
log.error("Telegram Bot Token not configured")
return
# 创建 Application
self.app = Application.builder().token(TELEGRAM_BOT_TOKEN).build()
# 初始化通知器
self.notifier = BotNotifier(self.app.bot, TELEGRAM_ADMIN_CHAT_IDS)
set_notifier(self.notifier)
# 注册命令处理器
handlers = [
("start", self.cmd_help),
("help", self.cmd_help),
("status", self.cmd_status),
("team", self.cmd_team),
("list", self.cmd_list),
("config", self.cmd_config),
("fingerprint", self.cmd_fingerprint),
("run", self.cmd_run),
("run_all", self.cmd_run_all),
("resume", self.cmd_resume),
("stop", self.cmd_stop),
("logs", self.cmd_logs),
("logs_live", self.cmd_logs_live),
("logs_stop", self.cmd_logs_stop),
("dashboard", self.cmd_dashboard),
("import", self.cmd_import),
("stock", self.cmd_stock),
("gptmail_keys", self.cmd_gptmail_keys),
("gptmail_add", self.cmd_gptmail_add),
("gptmail_del", self.cmd_gptmail_del),
("iban_list", self.cmd_iban_list),
("iban_add", self.cmd_iban_add),
("iban_clear", self.cmd_iban_clear),
("domain_list", self.cmd_domain_list),
("domain_add", self.cmd_domain_add),
("domain_del", self.cmd_domain_del),
("domain_clear", self.cmd_domain_clear),
("team_fingerprint", self.cmd_team_fingerprint),
("team_register", self.cmd_team_register),
("test_email", self.cmd_test_email),
("include_owners", self.cmd_include_owners),
("reload", self.cmd_reload),
("s2a_config", self.cmd_s2a_config),
("clean", self.cmd_clean),
("clean_errors", self.cmd_clean_errors),
("clean_teams", self.cmd_clean_teams),
("keys_usage", self.cmd_keys_usage),
("autogptplus", self.cmd_autogptplus),
]
for cmd, handler in handlers:
self.app.add_handler(CommandHandler(cmd, handler))
# 注册文件上传处理器 (JSON 文件)
self.app.add_handler(MessageHandler(
filters.Document.MimeType("application/json"),
self.handle_json_file
))
# 注册回调查询处理器 (InlineKeyboard)
self.app.add_handler(CallbackQueryHandler(
self.callback_keys_usage,
pattern="^keys_usage:"
))
self.app.add_handler(CallbackQueryHandler(
self.callback_clean_errors,
pattern="^clean_errors:"
))
self.app.add_handler(CallbackQueryHandler(
self.callback_clean_teams,
pattern="^clean_teams:"
))
self.app.add_handler(CallbackQueryHandler(
self.callback_team_register,
pattern="^team_reg:"
))
self.app.add_handler(CallbackQueryHandler(
self.callback_autogptplus,
pattern="^autogptplus:"
))
# 注册自定义数量输入处理器 (GPT Team 注册)
self.app.add_handler(MessageHandler(
filters.TEXT & ~filters.COMMAND,
self.handle_team_custom_count
))
# 注册定时检查任务
if TELEGRAM_CHECK_INTERVAL > 0 and AUTH_PROVIDER == "s2a":
self.app.job_queue.run_repeating(
self.scheduled_stock_check,
interval=TELEGRAM_CHECK_INTERVAL,
first=60, # 启动后1分钟执行第一次
name="stock_check"
)
log.info(f"Stock check scheduled every {TELEGRAM_CHECK_INTERVAL}s")
# 启动通知器
await self.notifier.start()
log.success("Telegram Bot started")
log.info(f"Admin Chat IDs: {TELEGRAM_ADMIN_CHAT_IDS}")
# 发送启动通知
await self.notifier.notify("🤖 Bot 已启动\n准备就绪,发送 /help 查看帮助")
# 运行 Bot
await self.app.initialize()
await self.app.start()
# 设置命令菜单提示
await self._set_commands()
await self.app.updater.start_polling(drop_pending_updates=True)
# 等待关闭信号
await self._shutdown_event.wait()
# 清理
await self.app.updater.stop()
await self.app.stop()
await self.app.shutdown()
await self.notifier.stop()
def request_shutdown(self):
"""请求关闭 Bot"""
self._shutdown_event.set()
async def _set_commands(self):
"""设置 Bot 命令菜单提示"""
commands = [
# 基础信息
BotCommand("help", "查看帮助信息"),
BotCommand("list", "查看 team.json 账号列表"),
BotCommand("status", "查看任务处理状态"),
BotCommand("team", "查看指定 Team 详情"),
BotCommand("config", "查看系统配置"),
BotCommand("logs", "查看最近日志"),
BotCommand("logs_live", "启用实时日志推送"),
BotCommand("logs_stop", "停止实时日志推送"),
# 任务控制
BotCommand("run", "处理指定 Team"),
BotCommand("run_all", "处理所有 Team"),
BotCommand("resume", "继续处理未完成账号"),
BotCommand("stop", "停止当前任务"),
# 配置管理
BotCommand("fingerprint", "开启/关闭随机指纹"),
BotCommand("include_owners", "开启/关闭 Owner 入库"),
BotCommand("reload", "重载配置文件"),
# 清理管理
BotCommand("clean", "清理已完成账号"),
BotCommand("clean_errors", "清理错误状态账号"),
BotCommand("clean_teams", "清理已完成 Team"),
# S2A
BotCommand("dashboard", "查看 S2A 仪表盘"),
BotCommand("keys_usage", "查看 API 密钥用量"),
BotCommand("stock", "查看账号库存"),
BotCommand("s2a_config", "配置 S2A 参数"),
BotCommand("import", "导入账号到 team.json"),
# GPTMail
BotCommand("gptmail_keys", "查看 GPTMail API Keys"),
BotCommand("gptmail_add", "添加 GPTMail API Key"),
BotCommand("gptmail_del", "删除 GPTMail API Key"),
BotCommand("test_email", "测试邮箱创建"),
# IBAN 管理
BotCommand("iban_list", "查看 IBAN 列表"),
BotCommand("iban_add", "添加 IBAN"),
BotCommand("iban_clear", "清空 IBAN 列表"),
# 域名管理
BotCommand("domain_list", "查看邮箱域名列表"),
BotCommand("domain_add", "添加邮箱域名"),
BotCommand("domain_del", "删除指定域名"),
BotCommand("domain_clear", "清空域名列表"),
# GPT Team
BotCommand("team_fingerprint", "GPT Team 随机指纹"),
BotCommand("team_register", "GPT Team 自动注册"),
# AutoGPTPlus
BotCommand("autogptplus", "AutoGPTPlus 管理面板"),
]
try:
await self.app.bot.set_my_commands(commands)
log.info("Bot 命令菜单已设置")
except Exception as e:
log.warning(f"设置命令菜单失败: {e}")
@admin_only
async def cmd_help(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""显示帮助信息"""
help_text = """🤖 OpenAI Team 批量注册 Bot
📋 查看信息:
/list - 查看 team.json 账号列表
/status - 查看任务处理状态
/team <n> - 查看第 n 个 Team 处理详情
/config - 查看系统配置
/logs [n] - 查看最近 n 条日志
/logs_live - 启用实时日志推送
/logs_stop - 停止实时日志推送
🚀 任务控制:
/run <n> - 开始处理第 n 个 Team
/run_all - 开始处理所有 Team
/resume - 继续处理未完成账号
/stop - 停止当前任务
⚙️ 配置管理:
/fingerprint - 开启/关闭随机指纹
/include_owners - 开启/关闭 Owner 入库
/reload - 重载配置文件 (无需重启)
/clean - 清理 team.json 和 tracker 数据
📊 S2A 专属:
/dashboard - 查看 S2A 仪表盘
/keys_usage - 查看 API 密钥用量
/stock - 查看账号库存
/s2a_config - 配置 S2A 参数
/clean_errors - 清理错误状态账号
🧹 清理管理:
/clean - 清理已完成账号 (team.json)
/clean_teams - 清理已完成 Team (tracker)
📤 导入账号:
/import - 导入账号到 team.json
或直接发送 JSON 文件
📧 GPTMail 管理:
/gptmail_keys - 查看所有 API Keys
/gptmail_add <key> - 添加 API Key
/gptmail_del <key> - 删除 API Key
/test_email - 测试邮箱创建
💳 IBAN 管理 (GPT Team):
/iban_list - 查看 IBAN 列表
/iban_add <ibans> - 添加 IBAN (每行一个或逗号分隔)
/iban_clear - 清空 IBAN 列表
📧 域名管理 (GPT Team):
/domain_list - 查看邮箱域名列表
/domain_add <domains> - 添加域名 (每行一个或逗号分隔)
/domain_del <domain> - 删除指定域名
/domain_clear - 清空域名列表
🤖 GPT Team:
/team_fingerprint - 开启/关闭随机指纹
/team_register - 开始自动订阅注册
� AutoGPTPlus:
/autogptplus - ChatGPT 订阅自动化管理面板
�💡 示例:
/list - 查看所有待处理账号
/run 0 - 处理第一个 Team
/gptmail_add my-api-key - 添加 Key
/iban_add DE123...,DE456... - 添加 IBAN
/domain_add @example.com - 添加域名"""
await update.message.reply_text(help_text, parse_mode="HTML")
@admin_only
async def cmd_status(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""查看所有 Team 状态"""
tracker = load_team_tracker()
teams_data = tracker.get("teams", {})
if not teams_data:
await update.message.reply_text("📭 暂无数据,请先运行任务")
return
lines = ["📊 Team 状态总览\n"]
for team_name, accounts in teams_data.items():
total = len(accounts)
completed = sum(1 for a in accounts if a.get("status") == "completed")
failed = sum(1 for a in accounts if "fail" in a.get("status", "").lower())
pending = total - completed - failed
status_icon = "✅" if completed == total else ("❌" if failed > 0 else "⏳")
lines.append(
f"{status_icon} {team_name}: {completed}/{total} "
f"(失败:{failed} 待处理:{pending})"
)
# 当前任务状态
if self.current_task and not self.current_task.done():
lines.append(f"\n🔄 运行中: {self.current_team or '未知'}")
await update.message.reply_text("\n".join(lines), parse_mode="HTML")
@admin_only
async def cmd_team(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""查看指定 Team 详情"""
if not context.args:
await update.message.reply_text("用法: /team <序号>\n示例: /team 0")
return
try:
team_idx = int(context.args[0])
except ValueError:
await update.message.reply_text("❌ 无效的序号,必须是数字")
return
if team_idx < 0 or team_idx >= len(TEAMS):
await update.message.reply_text(f"❌ 序号超出范围,有效范围: 0-{len(TEAMS)-1}")
return
team = TEAMS[team_idx]
team_name = team.get("name", f"Team{team_idx}")
tracker = load_team_tracker()
accounts = tracker.get("teams", {}).get(team_name, [])
lines = [f"📁 Team {team_idx}: {team_name}\n"]
lines.append(f"👤 Owner: {team.get('owner_email', '无')}")
lines.append(f"📊 账号数: {len(accounts)}\n")
if accounts:
for acc in accounts:
email = acc.get("email", "")
status = acc.get("status", "unknown")
role = acc.get("role", "member")
icon = {"completed": "✅", "authorized": "🔐", "registered": "📝"}.get(
status, "❌" if "fail" in status.lower() else "⏳"
)
role_tag = " [Owner]" if role == "owner" else ""
lines.append(f"{icon} {email}{role_tag}")
else:
lines.append("📭 暂无已处理的账号")
await update.message.reply_text("\n".join(lines), parse_mode="HTML")
@admin_only
async def cmd_list(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""查看 team.json 中的账号列表"""
if not TEAMS:
await update.message.reply_text("📭 team.json 中没有账号")
return
# 加载 tracker 以获取成员账号完成状态
tracker = load_team_tracker()
tracker_teams = tracker.get("teams", {})
lines = [f"📋 team.json 账号列表 (共 {len(TEAMS)} 个)\n"]
total_completed = 0
total_accounts = 0
for i, team in enumerate(TEAMS):
email = team.get("owner_email", "")
team_name = team.get("name", "")
has_token = "🔑" if team.get("auth_token") else "🔒"
needs_login = " [需登录]" if team.get("needs_login") else ""
# 从 tracker 获取该 Team 的成员完成情况
team_accounts = tracker_teams.get(team_name, [])
completed_count = sum(1 for acc in team_accounts if acc.get("status") == "completed")
account_count = len(team_accounts)
total_completed += completed_count
total_accounts += account_count
# 显示完成进度
if account_count > 0:
progress = f" [{completed_count}/{account_count}]"
else:
progress = ""
lines.append(f"{i}. {has_token} {email}{progress}{needs_login}")
# 统计
with_token = sum(1 for t in TEAMS if t.get("auth_token"))
lines.append(f"\n📊 统计:")
lines.append(f"有 Token: {with_token}/{len(TEAMS)}")
lines.append(f"已完成: {total_completed}/{total_accounts}")
# 消息太长时分段发送
text = "\n".join(lines)
if len(text) > 4000:
# 分段
for i in range(0, len(lines), 30):
chunk = "\n".join(lines[i:i+30])
await update.message.reply_text(chunk, parse_mode="HTML")
else:
await update.message.reply_text(text, parse_mode="HTML")
@admin_only
async def cmd_config(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""查看当前系统配置"""
# 授权服务地址
if AUTH_PROVIDER == "s2a":
auth_url = S2A_API_BASE or "未配置"
elif AUTH_PROVIDER == "cpa":
auth_url = CPA_API_BASE or "未配置"
else:
auth_url = CRS_API_BASE or "未配置"
# 代理信息
if PROXY_ENABLED and PROXIES:
proxy_info = f"已启用 ({len(PROXIES)} 个)"
else:
proxy_info = "未启用"
# Owner 入库状态
include_owners_status = "✅ 已开启" if INCLUDE_TEAM_OWNERS else "❌ 未开启"
# 随机指纹状态
fingerprint_status = "✅ 已开启" if BROWSER_RANDOM_FINGERPRINT else "❌ 未开启"
lines = [
"⚙️ 系统配置",
"",
"📧 邮箱服务",
f" 提供商: {EMAIL_PROVIDER}",
"",
"🔐 授权服务",
f" 模式: {AUTH_PROVIDER.upper()}",
f" 地址: {auth_url}",
f" Owner 入库: {include_owners_status}",
"",
"🌐 浏览器",
f" 随机指纹: {fingerprint_status}",
"",
"👥 账号设置",
f" 每 Team 账号数: {ACCOUNTS_PER_TEAM}",
f" team.json 账号: {len(TEAMS)}",
"",
"🔗 代理",
f" 状态: {proxy_info}",
"",
"💡 提示:",
"/fingerprint - 切换随机指纹",
"/include_owners - 切换 Owner 入库",
"/s2a_config - 配置 S2A 参数",
]
await update.message.reply_text("\n".join(lines), parse_mode="HTML")
@admin_only
async def cmd_fingerprint(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""切换随机指纹"""
import tomli_w
try:
# 读取当前配置
with open(CONFIG_FILE, "rb") as f:
import tomllib
config = tomllib.load(f)
# 获取当前状态
current = config.get("browser", {}).get("random_fingerprint", True)
new_value = not current
# 更新配置
if "browser" not in config:
config["browser"] = {}
config["browser"]["random_fingerprint"] = new_value
# 写回文件
with open(CONFIG_FILE, "wb") as f:
tomli_w.dump(config, f)
status = "✅ 已开启" if new_value else "❌ 已关闭"
await update.message.reply_text(
f"🎭 随机指纹\n\n"
f"状态: {status}\n\n"
f"开启后每次启动浏览器将随机使用不同的:\n"
f"• User-Agent (Chrome 133-144)\n"
f"• WebGL 显卡指纹\n"
f"• 屏幕分辨率\n\n"
f"💡 使用 /reload 立即生效",
parse_mode="HTML"
)
except ImportError:
await update.message.reply_text(
"❌ 缺少 tomli_w 依赖\n"
"请运行: uv add tomli_w"
)
except Exception as e:
await update.message.reply_text(f"❌ 修改配置失败: {e}")
@admin_only
async def cmd_include_owners(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""切换 Owner 入库开关"""
import tomli_w
try:
# 读取当前配置
with open(CONFIG_FILE, "rb") as f:
import tomllib
config = tomllib.load(f)
# 获取当前状态 (顶层配置)
current = config.get("include_team_owners", False)
new_value = not current
# 更新配置 (写到顶层)
config["include_team_owners"] = new_value
# 写回文件
with open(CONFIG_FILE, "wb") as f:
tomli_w.dump(config, f)
status = "✅ 已开启" if new_value else "❌ 已关闭"
desc = "运行任务时会将 Team Owner 账号也入库到授权服务" if new_value else "运行任务时不会入库 Team Owner 账号"
await update.message.reply_text(
f"👤 Owner 入库开关\n\n"
f"状态: {status}\n"
f"{desc}\n\n"
f"💡 使用 /reload 立即生效",
parse_mode="HTML"
)
except ImportError:
await update.message.reply_text(
"❌ 缺少 tomli_w 依赖\n"
"请运行: uv add tomli_w"
)
except Exception as e:
await update.message.reply_text(f"❌ 修改配置失败: {e}")
@admin_only
async def cmd_reload(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""重载配置文件"""
# 检查是否有任务正在运行
if self.current_task and not self.current_task.done():
await update.message.reply_text(
f"⚠️ 有任务正在运行: {self.current_team}\n"
"请等待任务完成或使用 /stop 停止后再重载配置"
)
return
await update.message.reply_text("⏳ 正在重载配置...")
try:
# 调用重载函数
result = reload_config()
if result["success"]:
# 重新导入更新后的配置变量
from config import (
EMAIL_PROVIDER as new_email_provider,
AUTH_PROVIDER as new_auth_provider,
INCLUDE_TEAM_OWNERS as new_include_owners,
ACCOUNTS_PER_TEAM as new_accounts_per_team,
BROWSER_RANDOM_FINGERPRINT as new_random_fingerprint,
)
lines = [
"✅ 配置重载成功",
"",
f"📁 team.json: {result['teams_count']} 个账号",
f"📄 config.toml: {'已更新' if result['config_updated'] else '未变化'}",
"",
"当前配置:",
f" 邮箱服务: {new_email_provider}",
f" 授权服务: {new_auth_provider}",
f" Owner 入库: {'✅' if new_include_owners else '❌'}",
f" 随机指纹: {'✅' if new_random_fingerprint else '❌'}",
f" 每 Team 账号: {new_accounts_per_team}",
]
await update.message.reply_text("\n".join(lines), parse_mode="HTML")
else:
await update.message.reply_text(
f"❌ 配置重载失败\n\n{result['message']}",
parse_mode="HTML"
)
except Exception as e:
await update.message.reply_text(f"❌ 重载配置失败: {e}")
@admin_only
async def cmd_clean(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""清理 team.json 和 team_tracker.json 数据"""
# 检查是否有任务正在运行
if self.current_task and not self.current_task.done():
await update.message.reply_text(
f"⚠️ 有任务正在运行: {self.current_team}\n"
"请等待任务完成或使用 /stop 停止后再清理"
)
return
# 解析参数
args = [a.lower() for a in context.args] if context.args else []
include_csv = "all" in args
confirmed = "confirm" in args
# 需要确认参数
if not confirmed:
# 统计当前数据
team_count = len(TEAMS)
tracker = load_team_tracker()
tracker_accounts = sum(len(accs) for accs in tracker.get("teams", {}).values())
# 统计 CSV 行数
csv_count = 0
csv_path = Path(CSV_FILE)
if csv_path.exists():
try:
with open(csv_path, "r", encoding="utf-8") as f:
csv_count = max(0, sum(1 for _ in f) - 1) # 减去表头
except:
pass
msg = (
f"⚠️ 确认清理数据?\n\n"
f"将清空以下文件:\n"
f"• team.json ({team_count} 个 Team)\n"
f"• team_tracker.json ({tracker_accounts} 个账号记录)\n"
)
if include_csv:
msg += f"• accounts.csv ({csv_count} 条记录)\n"
msg += (
f"\n此操作不可恢复!\n\n"
f"确认清理请发送:\n"
f"/clean confirm\n\n"
f"如需同时清理 CSV:\n"
f"/clean all confirm"
)
await update.message.reply_text(msg, parse_mode="HTML")
return
# 执行清理
cleaned = []
errors = []
# 清理 team.json
try:
if TEAM_JSON_FILE.exists():
with open(TEAM_JSON_FILE, "w", encoding="utf-8") as f:
f.write("[]")
cleaned.append("team.json")
except Exception as e:
errors.append(f"team.json: {e}")
# 清理 team_tracker.json
try:
tracker_file = Path(TEAM_TRACKER_FILE)
if tracker_file.exists():
with open(tracker_file, "w", encoding="utf-8") as f:
f.write('{"teams": {}}')
cleaned.append("team_tracker.json")
except Exception as e:
errors.append(f"team_tracker.json: {e}")
# 清理 accounts.csv (可选)
if include_csv:
try:
csv_path = Path(CSV_FILE)
if csv_path.exists():
with open(csv_path, "w", encoding="utf-8") as f:
f.write("email,password,team,status,crs_id,timestamp\n")
cleaned.append("accounts.csv")
except Exception as e:
errors.append(f"accounts.csv: {e}")
# 重载配置以清空内存中的 TEAMS
reload_config()
# 返回结果
if errors:
await update.message.reply_text(
f"⚠️ 部分清理失败\n\n"
f"已清理: {', '.join(cleaned) or '无'}\n"
f"失败: {'; '.join(errors)}",
parse_mode="HTML"
)
else:
await update.message.reply_text(
f"✅ 清理完成\n\n"
f"已清空: {', '.join(cleaned)}\n\n"
f"现在可以导入新的 team.json 了",
parse_mode="HTML"
)
@admin_only
async def cmd_s2a_config(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""配置 S2A 服务参数"""
import tomli_w
# 无参数时显示当前配置
if not context.args:
# 脱敏显示 API Key
key_display = "未配置"
if S2A_ADMIN_KEY:
if len(S2A_ADMIN_KEY) > 10:
key_display = f"{S2A_ADMIN_KEY[:4]}...{S2A_ADMIN_KEY[-4:]}"
else:
key_display = S2A_ADMIN_KEY[:4] + "..."
groups_display = ", ".join(S2A_GROUP_NAMES) if S2A_GROUP_NAMES else "默认分组"
group_ids_display = ", ".join(str(x) for x in S2A_GROUP_IDS) if S2A_GROUP_IDS else "无"
lines = [
"📊 S2A 服务配置",
"",
f"API 地址: {S2A_API_BASE or '未配置'}",
f"Admin Key: {key_display}",
f"并发数: {S2A_CONCURRENCY}",
f"优先级: {S2A_PRIORITY}",
f"分组名称: {groups_display}",
f"分组 ID: {group_ids_display}",
"",
"💡 修改配置:",
"/s2a_config concurrency 10",
"/s2a_config priority 50",
"/s2a_config groups 分组1,分组2",
"/s2a_config api_base https://...",
"/s2a_config admin_key sk-xxx",
]
await update.message.reply_text("\n".join(lines), parse_mode="HTML")
return
# 解析参数
param = context.args[0].lower()
value = " ".join(context.args[1:]) if len(context.args) > 1 else None
if not value:
await update.message.reply_text(
f"❌ 缺少值\n用法: /s2a_config {param} <值>"
)
return
try:
# 读取当前配置
with open(CONFIG_FILE, "rb") as f:
import tomllib
config = tomllib.load(f)
# 确保 s2a section 存在
if "s2a" not in config:
config["s2a"] = {}
# 根据参数类型处理
updated_key = None
updated_value = None
if param == "concurrency":
try:
new_val = int(value)
if new_val < 1 or new_val > 100:
await update.message.reply_text("❌ 并发数范围: 1-100")
return
config["s2a"]["concurrency"] = new_val
updated_key = "并发数"
updated_value = str(new_val)
except ValueError:
await update.message.reply_text("❌ 并发数必须是数字")
return
elif param == "priority":
try:
new_val = int(value)
if new_val < 0 or new_val > 100:
await update.message.reply_text("❌ 优先级范围: 0-100")
return
config["s2a"]["priority"] = new_val
updated_key = "优先级"
updated_value = str(new_val)
except ValueError:
await update.message.reply_text("❌ 优先级必须是数字")
return
elif param in ("groups", "group_names"):
# 支持逗号分隔的分组名称
groups = [g.strip() for g in value.split(",") if g.strip()]
config["s2a"]["group_names"] = groups
updated_key = "分组名称"
updated_value = ", ".join(groups) if groups else "默认分组"
elif param == "group_ids":
# 支持逗号分隔的分组 ID
ids = [i.strip() for i in value.split(",") if i.strip()]
config["s2a"]["group_ids"] = ids
updated_key = "分组 ID"
updated_value = ", ".join(ids) if ids else "无"
elif param == "api_base":
config["s2a"]["api_base"] = value
updated_key = "API 地址"
updated_value = value
elif param == "admin_key":
config["s2a"]["admin_key"] = value
updated_key = "Admin Key"
# 脱敏显示
if len(value) > 10:
updated_value = f"{value[:4]}...{value[-4:]}"
else:
updated_value = value[:4] + "..."
else:
await update.message.reply_text(
f"❌ 未知参数: {param}\n\n"
"可用参数:\n"
"• concurrency - 并发数\n"
"• priority - 优先级\n"
"• groups - 分组名称\n"
"• group_ids - 分组 ID\n"
"• api_base - API 地址\n"
"• admin_key - Admin Key"
)
return
# 写回文件
with open(CONFIG_FILE, "wb") as f:
tomli_w.dump(config, f)
await update.message.reply_text(
f"✅ S2A 配置已更新\n\n"
f"{updated_key}: {updated_value}\n\n"
f"💡 使用 /reload 立即生效",
parse_mode="HTML"
)
except ImportError:
await update.message.reply_text(
"❌ 缺少 tomli_w 依赖\n"
"请运行: uv add tomli_w"
)
except Exception as e:
await update.message.reply_text(f"❌ 修改配置失败: {e}")
@admin_only
async def cmd_run(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""启动处理指定 Team"""
if self.current_task and not self.current_task.done():
await update.message.reply_text(
f"⚠️ 任务正在运行: {self.current_team}\n使用 /stop 停止"
)
return
if not context.args:
await update.message.reply_text("用法: /run <序号>\n示例: /run 0")
return
try:
team_idx = int(context.args[0])
except ValueError:
await update.message.reply_text("❌ 无效的序号,必须是数字")
return
if team_idx < 0 or team_idx >= len(TEAMS):
await update.message.reply_text(f"❌ 序号超出范围,有效范围: 0-{len(TEAMS)-1}")
return
team_name = TEAMS[team_idx].get("name", f"Team{team_idx}")
self.current_team = team_name
# 重置停止标志,确保新任务可以正常运行
try:
import run
run._shutdown_requested = False
except Exception:
pass
await update.message.reply_text(f"🚀 开始处理 Team {team_idx}: {team_name}...")
# 在后台线程执行任务
loop = asyncio.get_event_loop()
self.current_task = loop.run_in_executor(
self.executor,
self._run_team_task,
team_idx
)
# 添加完成回调
self.current_task = asyncio.ensure_future(self._wrap_task(self.current_task, team_name))
@admin_only
async def cmd_run_all(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""启动处理所有 Team"""
if self.current_task and not self.current_task.done():
await update.message.reply_text(
f"⚠️ 任务正在运行: {self.current_team}\n使用 /stop 停止"
)
return
self.current_team = "全部"
# 重置停止标志,确保新任务可以正常运行
try:
import run
run._shutdown_requested = False
except Exception:
pass
await update.message.reply_text(f"🚀 开始处理所有 Team (共 {len(TEAMS)} 个)...")
loop = asyncio.get_event_loop()
self.current_task = loop.run_in_executor(
self.executor,
self._run_all_teams_task
)
self.current_task = asyncio.ensure_future(self._wrap_task(self.current_task, "全部"))
@admin_only
async def cmd_resume(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""继续处理未完成的账号"""
if self.current_task and not self.current_task.done():
await update.message.reply_text(
f"⚠️ 任务正在运行: {self.current_team}\n使用 /stop 停止"
)
return
# 检查是否有未完成的账号
tracker = load_team_tracker()
all_incomplete = get_all_incomplete_accounts(tracker)
if not all_incomplete:
await update.message.reply_text("✅ 没有待处理的账号,所有任务已完成")
return
# 统计未完成账号
total_incomplete = sum(len(accs) for accs in all_incomplete.values())
teams_count = len(all_incomplete)
# 构建消息
lines = [
f"⏳ 发现 {total_incomplete} 个未完成账号",
f"涉及 {teams_count} 个 Team:",
""
]
for team_name, accounts in all_incomplete.items():
lines.append(f" • {team_name}: {len(accounts)} 个")
lines.append("")
lines.append("🚀 开始继续处理...")
await update.message.reply_text("\n".join(lines), parse_mode="HTML")
# 启动任务 (run_all_teams 会自动处理未完成的账号)
self.current_team = "继续处理"
# 重置停止标志,确保新任务可以正常运行
try:
import run
run._shutdown_requested = False
except Exception:
pass
loop = asyncio.get_event_loop()
self.current_task = loop.run_in_executor(
self.executor,
self._run_all_teams_task
)
self.current_task = asyncio.ensure_future(self._wrap_task(self.current_task, "继续处理"))
async def _wrap_task(self, task, team_name: str):
"""包装任务以处理完成通知"""
try:
result = await task
# 收集成功和失败的账号
success_accounts = [r.get("email") for r in (result or []) if r.get("status") == "success"]
failed_accounts = [r.get("email") for r in (result or []) if r.get("status") != "success"]
log.info(f"任务完成: {team_name}, 成功: {len(success_accounts)}, 失败: {len(failed_accounts)}")
await self.notifier.notify_task_completed(team_name, success_accounts, failed_accounts)
except Exception as e:
log.error(f"任务异常: {team_name}, 错误: {e}")
await self.notifier.notify_error(f"任务失败: {team_name}", str(e))
finally:
self.current_team = None
# 清理进度跟踪
progress_finish()
# 注意: 不在这里重置 _shutdown_requested
# 让标志保持 True,直到下次任务启动时再重置
# 这样可以确保线程池中的任务能够正确检测到停止信号
def _run_team_task(self, team_idx: int):
"""执行单个 Team 任务 (在线程池中运行)"""
# 延迟导入避免循环依赖
from run import run_single_team
from team_service import preload_all_account_ids
from utils import load_team_tracker, save_team_tracker, add_team_owners_to_tracker
from config import DEFAULT_PASSWORD
# 预加载 account_id
preload_all_account_ids()
_tracker = load_team_tracker()
add_team_owners_to_tracker(_tracker, DEFAULT_PASSWORD)
save_team_tracker(_tracker)
return run_single_team(team_idx)
def _run_all_teams_task(self):
"""执行所有 Team 任务 (在线程池中运行)"""
from run import run_all_teams
from team_service import preload_all_account_ids
from utils import load_team_tracker, save_team_tracker, add_team_owners_to_tracker
from config import DEFAULT_PASSWORD
# 预加载 account_id
preload_all_account_ids()
_tracker = load_team_tracker()
add_team_owners_to_tracker(_tracker, DEFAULT_PASSWORD)
save_team_tracker(_tracker)
return run_all_teams()
@admin_only
async def cmd_stop(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""强制停止当前任务"""
if not self.current_task or self.current_task.done():
await update.message.reply_text("📭 当前没有运行中的任务")
return
task_name = self.current_team or "未知任务"
await update.message.reply_text(f"🛑 正在强制停止: {task_name}...")
try:
# 1. 设置全局停止标志
try:
import run
run._shutdown_requested = True
# 获取当前运行结果
current_results = run._current_results.copy() if run._current_results else []
except Exception:
current_results = []
# 2. 取消 asyncio 任务
if self.current_task and not self.current_task.done():
self.current_task.cancel()
# 3. 强制关闭浏览器进程
try:
from browser_automation import cleanup_chrome_processes
cleanup_chrome_processes()
except Exception as e:
log.warning(f"清理浏览器进程失败: {e}")
# 4. 重置状态
self.current_team = None
# 注意:不在这里重置 _shutdown_requested,让任务完成后在 _wrap_task 中重置
# 这样可以确保线程池中的任务有足够时间检测到停止信号
# 清理进度跟踪
progress_finish()
# 6. 生成停止报告
report_lines = [
f"🛑 任务已停止",
f"任务: {task_name}",
"",
]
# 本次运行结果
if current_results:
success_count = sum(1 for r in current_results if r.get("status") == "success")
failed_count = len(current_results) - success_count
report_lines.append(f"📊 本次运行结果:")
report_lines.append(f" 成功: {success_count}")
report_lines.append(f" 失败: {failed_count}")
report_lines.append("")
# 获取未完成账号信息
tracker = load_team_tracker()
all_incomplete = get_all_incomplete_accounts(tracker)
if all_incomplete:
total_incomplete = sum(len(accs) for accs in all_incomplete.values())
report_lines.append(f"⏳ 待继续处理: {total_incomplete} 个账号")
# 显示每个 Team 的未完成账号 (最多显示 3 个 Team)
shown_teams = 0
for team_name, accounts in all_incomplete.items():
if shown_teams >= 3:
remaining = len(all_incomplete) - 3
report_lines.append(f" ... 还有 {remaining} 个 Team")
break
report_lines.append(f" {team_name}: {len(accounts)} 个")
# 显示第一个待处理账号
if accounts:
first_acc = accounts[0]
report_lines.append(f" 下一个: {first_acc['email']}")
shown_teams += 1
report_lines.append("")
report_lines.append("💡 使用 /resume 继续处理")
else:
report_lines.append("✅ 没有待处理的账号")
await update.message.reply_text("\n".join(report_lines), parse_mode="HTML")
except Exception as e:
await update.message.reply_text(f"❌ 停止任务时出错: {e}")
@admin_only
async def cmd_logs(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""查看最近日志"""
try:
n = int(context.args[0]) if context.args else 10
except ValueError:
n = 10
n = min(n, 50) # 限制最大条数
try:
from config import BASE_DIR
log_file = BASE_DIR / "logs" / "app.log"
if not log_file.exists():
await update.message.reply_text("📭 日志文件不存在")
return
with open(log_file, "r", encoding="utf-8", errors="ignore") as f:
lines = f.readlines()
recent = lines[-n:] if len(lines) >= n else lines
if not recent:
await update.message.reply_text("📭 日志文件为空")
return
# 格式化日志 (移除 ANSI 颜色码)
import re
ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
clean_lines = [ansi_escape.sub('', line.strip()) for line in recent]
log_text = "\n".join(clean_lines)
if len(log_text) > 4000:
log_text = log_text[-4000:]
await update.message.reply_text(f"{log_text}", parse_mode="HTML")
except Exception as e:
await update.message.reply_text(f"❌ 读取日志失败: {e}")
@admin_only
async def cmd_logs_live(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""启用实时日志推送"""
from logger import log
if log.is_telegram_logging_enabled():
await update.message.reply_text("📡 实时日志已经在运行中")
return
# 启用 Telegram 日志推送
def log_callback(message: str, level: str):
"""日志回调函数"""
if self.notifier:
self.notifier.queue_message(message, level)
log.enable_telegram_logging(log_callback)
await update.message.reply_text(
"✅ 实时日志已启用\n"
"所有日志将实时推送到此聊天\n"
"使用 /logs_stop 停止推送"
)
@admin_only
async def cmd_logs_stop(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""停止实时日志推送"""
from logger import log
if not log.is_telegram_logging_enabled():
await update.message.reply_text("📭 实时日志未启用")
return
log.disable_telegram_logging()
await update.message.reply_text("✅ 实时日志已停止")
@admin_only
async def cmd_dashboard(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""查看 S2A 仪表盘统计"""
if AUTH_PROVIDER != "s2a":
await update.message.reply_text(
f"⚠️ 仪表盘仅支持 S2A 模式\n"
f"当前模式: {AUTH_PROVIDER}"
)
return
await update.message.reply_text("⏳ 正在获取仪表盘数据...")
try:
stats = s2a_get_dashboard_stats()
if stats:
text = format_dashboard_stats(stats)
await update.message.reply_text(text, parse_mode="HTML")
else:
await update.message.reply_text(
"❌ 获取仪表盘数据失败\n"
"请检查 S2A 配置和 API 连接"
)
except Exception as e:
await update.message.reply_text(f"❌ 错误: {e}")
@admin_only
async def cmd_keys_usage(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""查看 API 密钥用量 - 显示时间选择菜单"""
if AUTH_PROVIDER != "s2a":
await update.message.reply_text(
f"⚠️ 密钥用量查询仅支持 S2A 模式\n"
f"当前模式: {AUTH_PROVIDER}"
)
return
# 创建时间选择按钮
keyboard = [
[
InlineKeyboardButton("📍 今天", callback_data="keys_usage:today"),
InlineKeyboardButton("◀ 昨天", callback_data="keys_usage:yesterday"),
],
[
InlineKeyboardButton("◀ 近 7 天", callback_data="keys_usage:7d"),
InlineKeyboardButton("◀ 近 14 天", callback_data="keys_usage:14d"),
],
[
InlineKeyboardButton("◀ 近 30 天", callback_data="keys_usage:30d"),
InlineKeyboardButton("📅 本月", callback_data="keys_usage:this_month"),
],
[
InlineKeyboardButton("📅 上月", callback_data="keys_usage:last_month"),
],
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
"🔑 API 密钥用量查询\n\n请选择时间范围:",
reply_markup=reply_markup,
parse_mode="HTML"
)
async def callback_keys_usage(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""处理密钥用量查询的回调"""
query = update.callback_query
await query.answer()
# 验证权限
user_id = update.effective_user.id
if user_id not in TELEGRAM_ADMIN_CHAT_IDS:
await query.edit_message_text("⛔ 无权限")
return
from datetime import datetime, timedelta
# 解析回调数据
data = query.data.replace("keys_usage:", "")
today = datetime.now()
if data == "today":
start_date = today.strftime("%Y-%m-%d")
end_date = today.strftime("%Y-%m-%d")
period_text = "今天"
elif data == "yesterday":
yesterday = today - timedelta(days=1)
start_date = yesterday.strftime("%Y-%m-%d")
end_date = yesterday.strftime("%Y-%m-%d")
period_text = "昨天"
elif data == "7d":
start_date = (today - timedelta(days=6)).strftime("%Y-%m-%d")
end_date = today.strftime("%Y-%m-%d")
period_text = "近 7 天"
elif data == "14d":
start_date = (today - timedelta(days=13)).strftime("%Y-%m-%d")
end_date = today.strftime("%Y-%m-%d")
period_text = "近 14 天"
elif data == "30d":
start_date = (today - timedelta(days=29)).strftime("%Y-%m-%d")
end_date = today.strftime("%Y-%m-%d")
period_text = "近 30 天"
elif data == "this_month":
start_date = today.replace(day=1).strftime("%Y-%m-%d")
end_date = today.strftime("%Y-%m-%d")
period_text = "本月"
elif data == "last_month":
first_of_this_month = today.replace(day=1)
last_month_end = first_of_this_month - timedelta(days=1)
last_month_start = last_month_end.replace(day=1)
start_date = last_month_start.strftime("%Y-%m-%d")
end_date = last_month_end.strftime("%Y-%m-%d")
period_text = "上月"
else:
await query.edit_message_text("❌ 未知的时间选项")
return
await query.edit_message_text(f"⏳ 正在获取密钥用量 ({period_text})...")
try:
keys = s2a_get_keys_with_usage(start_date, end_date)
if keys is not None:
text = format_keys_usage(keys, period_text)
# 消息太长时分段发送
if len(text) > 4000:
# 先更新原消息
await query.edit_message_text(text[:4000], parse_mode="HTML")
# 剩余部分新消息发送
remaining = text[4000:]
while remaining:
chunk = remaining[:4000]
remaining = remaining[4000:]
await context.bot.send_message(
chat_id=update.effective_chat.id,
text=chunk,
parse_mode="HTML"
)
else:
await query.edit_message_text(text, parse_mode="HTML")
else:
await query.edit_message_text(
"❌ 获取密钥用量失败\n"
"请检查 S2A 配置和 API 连接"
)
except Exception as e:
await query.edit_message_text(f"❌ 错误: {e}")
@admin_only
async def cmd_stock(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""查看账号存货"""
if AUTH_PROVIDER != "s2a":
await update.message.reply_text(
f"⚠️ 库存查询仅支持 S2A 模式\n"
f"当前模式: {AUTH_PROVIDER}"
)
return
stats = s2a_get_dashboard_stats()
if not stats:
await update.message.reply_text("❌ 获取库存信息失败")
return
text = self._format_stock_message(stats)
await update.message.reply_text(text, parse_mode="HTML")
async def scheduled_stock_check(self, context: ContextTypes.DEFAULT_TYPE):
"""定时检查账号存货"""
try:
stats = s2a_get_dashboard_stats()
if not stats:
return
normal = stats.get("normal_accounts", 0)
total = stats.get("total_accounts", 0)
# 只在低库存时发送通知
if normal <= TELEGRAM_LOW_STOCK_THRESHOLD:
text = self._format_stock_message(stats, is_alert=True)
for chat_id in TELEGRAM_ADMIN_CHAT_IDS:
try:
await context.bot.send_message(
chat_id=chat_id,
text=text,
parse_mode="HTML"
)
except Exception:
pass
except Exception as e:
log.warning(f"Stock check failed: {e}")
def _format_stock_message(self, stats: dict, is_alert: bool = False) -> str:
"""格式化存货消息"""
total = stats.get("total_accounts", 0)
normal = stats.get("normal_accounts", 0)
error = stats.get("error_accounts", 0)
ratelimit = stats.get("ratelimit_accounts", 0)
overload = stats.get("overload_accounts", 0)
# 计算健康度
health_pct = (normal / total * 100) if total > 0 else 0
# 状态图标
if normal <= TELEGRAM_LOW_STOCK_THRESHOLD:
status_icon = "⚠️ 库存不足"
status_line = f"{status_icon}"
elif health_pct >= 80:
status_icon = "✅ 正常"
status_line = f"{status_icon}"
elif health_pct >= 50:
status_icon = "⚠️ 警告"
status_line = f"{status_icon}"
else:
status_icon = "🔴 严重"
status_line = f"{status_icon}"
title = "🚨 库存不足警报" if is_alert else "📦 账号库存"
lines = [
f"{title}",
"",
f"状态: {status_line}",
f"健康度: {health_pct:.1f}%",
"",
f"正常: {normal}",
f"异常: {error}",
f"限流: {ratelimit}",
f"总计: {total}",
]
if is_alert:
lines.append("")
lines.append(f"预警阈值: {TELEGRAM_LOW_STOCK_THRESHOLD}")
return "\n".join(lines)
@admin_only
async def cmd_clean_errors(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""清理错误状态的账号"""
if AUTH_PROVIDER != "s2a":
await update.message.reply_text(
f"⚠️ 清理错误账号仅支持 S2A 模式\n"
f"当前模式: {AUTH_PROVIDER}"
)
return
# 获取错误账号
error_accounts, total = s2a_get_error_accounts()
if total == 0:
await update.message.reply_text("✅ 没有错误状态的账号需要清理")
return
# 存储账号数据到 context.bot_data 供分页使用
context.bot_data["clean_errors_accounts"] = error_accounts
context.bot_data["clean_errors_total"] = total
# 显示第一页
text, keyboard = self._build_clean_errors_page(error_accounts, total, page=0)
await update.message.reply_text(text, reply_markup=keyboard, parse_mode="HTML")
def _build_clean_errors_page(self, accounts: list, total: int, page: int = 0, page_size: int = 10):
"""构建错误账号预览页面"""
total_pages = (total + page_size - 1) // page_size
start_idx = page * page_size
end_idx = min(start_idx + page_size, total)
page_accounts = accounts[start_idx:end_idx]
# 按错误类型分组统计(全部账号)
error_types = {}
for acc in accounts:
error_msg = acc.get("error_message", "Unknown")
error_key = error_msg[:50] if error_msg else "Unknown"
error_types[error_key] = error_types.get(error_key, 0) + 1
lines = [
"🗑️ 清理错误账号 (预览)",
"",
f"共发现 {total} 个错误状态账号",
"",
"错误类型统计:",
]
# 显示前5种错误类型
sorted_errors = sorted(error_types.items(), key=lambda x: x[1], reverse=True)[:5]
for error_msg, count in sorted_errors:
lines.append(f"• {count}x: {error_msg}...")
if len(error_types) > 5:
lines.append(f"• ... 还有 {len(error_types) - 5} 种其他错误")
lines.extend([
"",
f"账号列表 (第 {page + 1}/{total_pages} 页):",
])
# 显示当前页的账号
for i, acc in enumerate(page_accounts, start=start_idx + 1):
name = acc.get("name", "Unknown")[:25]
error_msg = acc.get("error_message", "")[:30]
lines.append(f"{i}. {name} - {error_msg}")
lines.extend([
"",
"⚠️ 此操作不可撤销!",
])
text = "\n".join(lines)
# 构建分页按钮
nav_buttons = []
if page > 0:
nav_buttons.append(InlineKeyboardButton("⬅️ 上一页", callback_data=f"clean_errors:page:{page - 1}"))
if page < total_pages - 1:
nav_buttons.append(InlineKeyboardButton("下一页 ➡️", callback_data=f"clean_errors:page:{page + 1}"))
keyboard = [
nav_buttons,
[InlineKeyboardButton(f"🗑️ 确认删除全部 ({total})", callback_data="clean_errors:confirm")],
[InlineKeyboardButton("❌ 取消", callback_data="clean_errors:cancel")],
]
# 过滤空行
keyboard = [row for row in keyboard if row]
return text, InlineKeyboardMarkup(keyboard)
async def callback_clean_errors(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""处理清理错误账号的回调"""
query = update.callback_query
await query.answer()
# 验证权限
user_id = update.effective_user.id
if user_id not in TELEGRAM_ADMIN_CHAT_IDS:
await query.edit_message_text("⛔ 无权限")
return
# 解析回调数据
data = query.data.replace("clean_errors:", "")
if data.startswith("page:"):
# 分页浏览
page = int(data.replace("page:", ""))
accounts = context.bot_data.get("clean_errors_accounts", [])
total = context.bot_data.get("clean_errors_total", 0)
if not accounts:
await query.edit_message_text("❌ 数据已过期,请重新使用 /clean_errors")
return
text, keyboard = self._build_clean_errors_page(accounts, total, page)
await query.edit_message_text(text, reply_markup=keyboard, parse_mode="HTML")
elif data == "cancel":
# 取消操作
context.bot_data.pop("clean_errors_accounts", None)
context.bot_data.pop("clean_errors_total", None)
await query.edit_message_text("✅ 已取消清理操作")
elif data == "confirm":
# 执行删除
total = context.bot_data.get("clean_errors_total", 0)
await query.edit_message_text(
f"🗑️ 正在删除 {total} 个错误账号...\n\n"
"进度: 0%",
parse_mode="HTML"
)
# 同步执行删除
results = s2a_batch_delete_error_accounts()
# 清理缓存数据
context.bot_data.pop("clean_errors_accounts", None)
context.bot_data.pop("clean_errors_total", None)
# 显示结果
lines = [
"✅ 清理完成",
"",
f"成功删除: {results['success']}",
f"删除失败: {results['failed']}",
f"总计: {results['total']}",
]
# 如果有失败的,显示部分失败详情
failed_details = [d for d in results.get("details", []) if d.get("status") == "failed"]
if failed_details:
lines.append("")
lines.append("失败详情:")
for detail in failed_details[:5]:
lines.append(f"• {detail.get('name', '')}: {detail.get('message', '')}")
if len(failed_details) > 5:
lines.append(f"• ... 还有 {len(failed_details) - 5} 个失败")
await query.edit_message_text("\n".join(lines), parse_mode="HTML")
@admin_only
async def cmd_clean_teams(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""清理处理成功的 Team"""
# 获取已完成的 teams
tracker = load_team_tracker()
completed_teams = get_completed_teams(tracker)
if not completed_teams:
await update.message.reply_text("✅ 没有处理成功的 Team 需要清理")
return
# 存储数据供分页使用
context.bot_data["clean_teams_data"] = completed_teams
context.bot_data["clean_teams_total"] = len(completed_teams)
# 显示第一页
text, keyboard = self._build_clean_teams_page(completed_teams, page=0)
await update.message.reply_text(text, reply_markup=keyboard, parse_mode="HTML")
def _build_clean_teams_page(self, teams: list, page: int = 0, page_size: int = 10):
"""构建已完成 Team 预览页面"""
total = len(teams)
total_pages = (total + page_size - 1) // page_size
start_idx = page * page_size
end_idx = min(start_idx + page_size, total)
page_teams = teams[start_idx:end_idx]
# 统计总账号数
total_accounts = sum(t["total"] for t in teams)
lines = [
"🧹 清理已完成 Team (预览)",
"",
f"共发现 {total} 个已完成的 Team",
f"涉及 {total_accounts} 个账号记录",
"",
f"Team 列表 (第 {page + 1}/{total_pages} 页):",
]
# 显示当前页的 Team
for i, team in enumerate(page_teams, start=start_idx + 1):
name = team["name"][:25]
count = team["total"]
lines.append(f"{i}. ✅ {name} ({count} 个账号)")
lines.extend([
"",
"⚠️ 此操作将同时清理:",
"• team_tracker.json (账号处理记录)",
"• team.json (Team 配置)",
])
text = "\n".join(lines)
# 构建分页按钮
nav_buttons = []
if page > 0:
nav_buttons.append(InlineKeyboardButton("⬅️ 上一页", callback_data=f"clean_teams:page:{page - 1}"))
if page < total_pages - 1:
nav_buttons.append(InlineKeyboardButton("下一页 ➡️", callback_data=f"clean_teams:page:{page + 1}"))
keyboard = [
nav_buttons,
[InlineKeyboardButton(f"🧹 确认清理全部 ({total})", callback_data="clean_teams:confirm")],
[InlineKeyboardButton("❌ 取消", callback_data="clean_teams:cancel")],
]
# 过滤空行
keyboard = [row for row in keyboard if row]
return text, InlineKeyboardMarkup(keyboard)
async def callback_clean_teams(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""处理清理已完成 Team 的回调"""
query = update.callback_query
await query.answer()
# 验证权限
user_id = update.effective_user.id
if user_id not in TELEGRAM_ADMIN_CHAT_IDS:
await query.edit_message_text("⛔ 无权限")
return
# 解析回调数据
data = query.data.replace("clean_teams:", "")
if data.startswith("page:"):
# 分页浏览
page = int(data.replace("page:", ""))
teams = context.bot_data.get("clean_teams_data", [])
if not teams:
await query.edit_message_text("❌ 数据已过期,请重新使用 /clean_teams")
return
text, keyboard = self._build_clean_teams_page(teams, page)
await query.edit_message_text(text, reply_markup=keyboard, parse_mode="HTML")
elif data == "cancel":
# 取消操作
context.bot_data.pop("clean_teams_data", None)
context.bot_data.pop("clean_teams_total", None)
await query.edit_message_text("✅ 已取消清理操作")
elif data == "confirm":
# 执行清理
teams_data = context.bot_data.get("clean_teams_data", [])
total = context.bot_data.get("clean_teams_total", 0)
await query.edit_message_text(
f"🧹 正在清理 {total} 个已完成 Team...",
parse_mode="HTML"
)
# 获取要删除的 team 名称列表
team_names = [t["name"] for t in teams_data]
# 1. 清理 tracker
tracker = load_team_tracker()
tracker_results = batch_remove_completed_teams(tracker)
save_team_tracker(tracker)
# 2. 清理 team.json
json_results = batch_remove_teams_by_names(team_names)
# 清理缓存数据
context.bot_data.pop("clean_teams_data", None)
context.bot_data.pop("clean_teams_total", None)
# 统计清理的账号数
total_accounts = sum(d.get("accounts", 0) for d in tracker_results.get("details", []) if d.get("status") == "success")
# 显示结果
lines = [
"✅ 清理完成",
"",
f"清理 Team: {tracker_results['success']}",
f"清理账号记录: {total_accounts}",
"",
"📁 文件清理:",
f"• tracker: {tracker_results['success']} 个 Team",
f"• team.json: {json_results['success']} 个 Team",
]
if tracker_results['failed'] > 0 or json_results['failed'] > 0:
lines.append("")
lines.append(f"失败: tracker={tracker_results['failed']}, json={json_results['failed']}")
await query.edit_message_text("\n".join(lines), parse_mode="HTML")
@admin_only
async def cmd_import(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""上传账号到 team.json"""
# 获取命令后的 JSON 数据
if not context.args:
await update.message.reply_text(
"📤 导入账号到 team.json\n\n"
"用法:\n"
"1. 直接发送 JSON 文件\n"
"2. /import 后跟 JSON 数据\n\n"
"JSON 格式:\n"
"[{\"account\":\"邮箱\",\"password\":\"密码\",\"token\":\"jwt\"},...]\n\n"
"导入后使用 /run 开始处理",
parse_mode="HTML"
)
return
# 尝试解析 JSON
json_text = " ".join(context.args)
await self._process_import_json(update, json_text)
def _reset_import_batch_stats(self):
"""重置批量导入统计"""
self._import_batch_stats = {
"total_files": 0,
"processed_files": 0,
"total_added": 0,
"total_skipped": 0,
"current_file": "",
"errors": [],
"team_json_total": 0
}
def _get_import_progress_text(self, is_processing: bool = True) -> str:
"""生成导入进度消息文本"""
stats = self._import_batch_stats
if is_processing:
lines = [
"⏳ 正在处理 JSON 文件...",
"",
f"📁 文件: {stats['processed_files']}/{stats['total_files']}",
]
if stats['current_file']:
lines.append(f"📄 当前: {stats['current_file']}")
lines.extend([
"",
f"新增: {stats['total_added']}",
f"跳过 (重复): {stats['total_skipped']}",
])
else:
# 完成状态
lines = [
"✅ 导入完成",
"",
f"📁 处理文件: {stats['processed_files']} 个",
f"📄 已更新 team.json",
f"新增: {stats['total_added']}",
f"跳过 (重复): {stats['total_skipped']}",
f"team.json 总数: {stats['team_json_total']}",
]
if stats['errors']:
lines.append("")
lines.append(f"⚠️ 错误 ({len(stats['errors'])} 个):")
for err in stats['errors'][:3]: # 最多显示3个错误
lines.append(f" • {err}")
if len(stats['errors']) > 3:
lines.append(f" ... 还有 {len(stats['errors']) - 3} 个错误")
lines.extend([
"",
"✅ 配置已自动刷新",
"使用 /run_all 或 /run <n> 开始处理"
])
return "\n".join(lines)
async def _update_import_progress(self, chat_id: int, is_final: bool = False):
"""更新导入进度消息"""
text = self._get_import_progress_text(is_processing=not is_final)
try:
if self._import_progress_message:
await self.app.bot.edit_message_text(
chat_id=chat_id,
message_id=self._import_progress_message.message_id,
text=text,
parse_mode="HTML"
)
except Exception:
pass # 忽略编辑失败
async def _finalize_import_batch(self, chat_id: int):
"""完成批量导入,发送最终结果"""
async with self._import_progress_lock:
if self._import_progress_message is None:
return
# 取消超时任务 (job_queue job)
if self._import_batch_timeout_task:
try:
self._import_batch_timeout_task.schedule_removal()
except Exception:
pass
self._import_batch_timeout_task = None
# 更新最终进度
await self._update_import_progress(chat_id, is_final=True)
# 重置状态
self._import_progress_message = None
self._reset_import_batch_stats()
async def _import_batch_timeout_callback(self, context: ContextTypes.DEFAULT_TYPE):
"""批量导入超时回调 - 由 job_queue 调用"""
chat_id = context.job.data.get("chat_id")
if chat_id:
await self._finalize_import_batch(chat_id)
def _schedule_import_finalize(self, context: ContextTypes.DEFAULT_TYPE, chat_id: int, delay: float = 1.5):
"""调度批量导入完成任务"""
# 取消之前的超时任务
if self._import_batch_timeout_task:
self._import_batch_timeout_task.schedule_removal()
self._import_batch_timeout_task = None
# 使用 job_queue 调度新的超时任务
self._import_batch_timeout_task = context.job_queue.run_once(
self._import_batch_timeout_callback,
when=delay,
data={"chat_id": chat_id},
name="import_batch_finalize"
)
@admin_only
async def handle_json_file(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""处理上传的 JSON 文件 - 支持批量导入进度更新"""
# 检查是否是管理员
user_id = update.effective_user.id
if user_id not in TELEGRAM_ADMIN_CHAT_IDS:
await update.message.reply_text("⛔ 无权限")
return
document = update.message.document
if not document:
return
chat_id = update.effective_chat.id
file_name = document.file_name or "unknown.json"
async with self._import_progress_lock:
# 取消之前的超时任务(如果有)
if self._import_batch_timeout_task:
self._import_batch_timeout_task.schedule_removal()
self._import_batch_timeout_task = None
# 更新统计
self._import_batch_stats["total_files"] += 1
self._import_batch_stats["current_file"] = file_name
# 如果是新批次,发送初始进度消息
if self._import_progress_message is None:
self._reset_import_batch_stats()
self._import_batch_stats["total_files"] = 1
self._import_batch_stats["current_file"] = file_name
self._import_progress_message = await update.message.reply_text(
self._get_import_progress_text(is_processing=True),
parse_mode="HTML"
)
else:
# 更新进度消息
await self._update_import_progress(chat_id)
try:
# 下载文件
file = await document.get_file()
file_bytes = await file.download_as_bytearray()
json_text = file_bytes.decode("utf-8")
# 处理导入并获取结果
result = await self._process_import_json_batch(json_text)
async with self._import_progress_lock:
self._import_batch_stats["processed_files"] += 1
self._import_batch_stats["total_added"] += result.get("added", 0)
self._import_batch_stats["total_skipped"] += result.get("skipped", 0)
self._import_batch_stats["team_json_total"] = result.get("total", 0)
self._import_batch_stats["current_file"] = ""
if result.get("error"):
self._import_batch_stats["errors"].append(f"{file_name}: {result['error']}")
# 更新进度
await self._update_import_progress(chat_id)
# 检查是否所有文件都已处理,如果是则调度完成任务
stats = self._import_batch_stats
if stats["processed_files"] >= stats["total_files"]:
# 所有文件处理完成,短延迟后完成批次(防止更多文件到来)
self._schedule_import_finalize(context, chat_id, delay=1.0)
else:
# 还有文件在处理,设置较长的超时
self._schedule_import_finalize(context, chat_id, delay=3.0)
except Exception as e:
async with self._import_progress_lock:
self._import_batch_stats["processed_files"] += 1
self._import_batch_stats["errors"].append(f"{file_name}: {str(e)}")
self._import_batch_stats["current_file"] = ""
await self._update_import_progress(chat_id)
# 调度完成任务
stats = self._import_batch_stats
if stats["processed_files"] >= stats["total_files"]:
self._schedule_import_finalize(context, chat_id, delay=1.0)
else:
self._schedule_import_finalize(context, chat_id, delay=3.0)
async def _process_import_json_batch(self, json_text: str) -> dict:
"""处理导入的 JSON 数据,保存到 team.json (批量版本,返回结果)
Returns:
dict: {"added": int, "skipped": int, "total": int, "error": str|None}
"""
import json
from pathlib import Path
result = {"added": 0, "skipped": 0, "total": 0, "error": None}
try:
new_accounts = json.loads(json_text)
except json.JSONDecodeError as e:
result["error"] = f"JSON 格式错误: {e}"
return result
if not isinstance(new_accounts, list):
new_accounts = [new_accounts]
if not new_accounts:
result["error"] = "JSON 数据中没有账号"
return result
# 验证格式
valid_accounts = []
for acc in new_accounts:
if not isinstance(acc, dict):
continue
email = acc.get("account") or acc.get("email", "")
token = acc.get("token", "")
password = acc.get("password", "")
if email and token:
valid_accounts.append({
"account": email,
"password": password,
"token": token
})
if not valid_accounts:
result["error"] = "未找到有效账号"
return result
# 读取现有 team.json
team_json_path = Path(TEAM_JSON_FILE)
existing_accounts = []
if team_json_path.exists():
try:
with open(team_json_path, "r", encoding="utf-8") as f:
existing_accounts = json.load(f)
if not isinstance(existing_accounts, list):
existing_accounts = [existing_accounts]
except Exception:
existing_accounts = []
# 检查重复
existing_emails = set()
for acc in existing_accounts:
email = acc.get("account") or acc.get("user", {}).get("email", "")
if email:
existing_emails.add(email.lower())
added = 0
skipped = 0
for acc in valid_accounts:
email = acc.get("account", "").lower()
if email in existing_emails:
skipped += 1
else:
existing_accounts.append(acc)
existing_emails.add(email)
added += 1
# 保存到 team.json
try:
team_json_path.parent.mkdir(parents=True, exist_ok=True)
with open(team_json_path, "w", encoding="utf-8") as f:
json.dump(existing_accounts, f, ensure_ascii=False, indent=2)
# 重载配置
reload_config()
result["added"] = added
result["skipped"] = skipped
result["total"] = len(existing_accounts)
except Exception as e:
result["error"] = f"保存失败: {e}"
return result
async def _process_import_json(self, update: Update, json_text: str):
"""处理导入的 JSON 数据,保存到 team.json"""
import json
from pathlib import Path
try:
new_accounts = json.loads(json_text)
except json.JSONDecodeError as e:
await update.message.reply_text(f"❌ JSON 格式错误: {e}")
return
if not isinstance(new_accounts, list):
# 如果是单个对象,转成列表
new_accounts = [new_accounts]
if not new_accounts:
await update.message.reply_text("📭 JSON 数据中没有账号")
return
# 验证格式
valid_accounts = []
for acc in new_accounts:
if not isinstance(acc, dict):
continue
# 支持 account 或 email 字段
email = acc.get("account") or acc.get("email", "")
token = acc.get("token", "")
password = acc.get("password", "")
if email and token:
valid_accounts.append({
"account": email,
"password": password,
"token": token
})
if not valid_accounts:
await update.message.reply_text("❌ 未找到有效账号 (需要 account/email 和 token 字段)")
return
# 读取现有 team.json (不存在则自动创建)
team_json_path = Path(TEAM_JSON_FILE)
existing_accounts = []
is_new_file = not team_json_path.exists()
if not is_new_file:
try:
with open(team_json_path, "r", encoding="utf-8") as f:
existing_accounts = json.load(f)
if not isinstance(existing_accounts, list):
existing_accounts = [existing_accounts]
except Exception:
existing_accounts = []
# 检查重复
existing_emails = set()
for acc in existing_accounts:
email = acc.get("account") or acc.get("user", {}).get("email", "")
if email:
existing_emails.add(email.lower())
added = 0
skipped = 0
for acc in valid_accounts:
email = acc.get("account", "").lower()
if email in existing_emails:
skipped += 1
else:
existing_accounts.append(acc)
existing_emails.add(email)
added += 1
# 保存到 team.json (自动创建文件)
try:
# 确保父目录存在
team_json_path.parent.mkdir(parents=True, exist_ok=True)
with open(team_json_path, "w", encoding="utf-8") as f:
json.dump(existing_accounts, f, ensure_ascii=False, indent=2)
file_status = "📄 已创建 team.json" if is_new_file else "📄 已更新 team.json"
# 自动重载配置,同步内存状态,避免 save_team_json() 覆盖新导入的数据
reload_result = reload_config()
await update.message.reply_text(
f"✅ 导入完成\n\n"
f"{file_status}\n"
f"新增: {added}\n"
f"跳过 (重复): {skipped}\n"
f"team.json 总数: {len(existing_accounts)}\n\n"
f"✅ 配置已自动刷新\n"
f"使用 /run_all 或 /run <n> 开始处理",
parse_mode="HTML"
)
except Exception as e:
await update.message.reply_text(f"❌ 保存到 team.json 失败: {e}")
# ==================== GPTMail Key 管理命令 ====================
@admin_only
async def cmd_gptmail_keys(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""查看所有 GPTMail API Keys"""
keys = get_gptmail_keys()
config_keys = set(GPTMAIL_API_KEYS)
if not keys:
await update.message.reply_text(
"📧 GPTMail API Keys\n\n"
"📭 暂无配置 Key\n\n"
"使用 /gptmail_add <key> 添加",
parse_mode="HTML"
)
return
lines = [f"📧 GPTMail API Keys (共 {len(keys)} 个)\n"]
for i, key in enumerate(keys):
# 脱敏显示
if len(key) > 10:
masked = f"{key[:4]}...{key[-4:]}"
else:
masked = key[:4] + "..." if len(key) > 4 else key
# 标记来源
source = "📁 配置" if key in config_keys else "🔧 动态"
lines.append(f"{i+1}. {masked} {source}")
lines.append(f"\n💡 管理:")
lines.append(f"/gptmail_add <key> - 添加 Key")
lines.append(f"/gptmail_del <key> - 删除动态 Key")
lines.append(f"/test_email - 测试邮箱创建")
await update.message.reply_text("\n".join(lines), parse_mode="HTML")
@admin_only
async def cmd_gptmail_add(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""添加 GPTMail API Key (支持批量导入)"""
if not context.args:
await update.message.reply_text(
"📧 添加 GPTMail API Key\n\n"
"单个添加:\n"
"/gptmail_add gpt-xxx\n\n"
"批量添加 (空格分隔):\n"
"/gptmail_add key1 key2 key3\n\n"
"批量添加 (换行分隔):\n"
"/gptmail_add key1\nkey2\nkey3",
parse_mode="HTML"
)
return
# 合并所有参数,支持空格和换行分隔
raw_input = " ".join(context.args)
# 按空格和换行分割
keys = []
for part in raw_input.replace("\n", " ").split():
key = part.strip()
if key:
keys.append(key)
if not keys:
await update.message.reply_text("❌ Key 不能为空")
return
# 获取现有 keys
existing_keys = set(get_gptmail_keys())
# 统计结果
added = []
skipped = []
invalid = []
await update.message.reply_text(f"⏳ 正在验证 {len(keys)} 个 Key...")
for key in keys:
# 检查是否已存在
if key in existing_keys:
skipped.append(key)
continue
# 测试 Key 是否有效
success, message = gptmail_service.test_api_key(key)
if not success:
invalid.append(key)
continue
# 添加 Key
if add_gptmail_key(key):
added.append(key)
existing_keys.add(key)
# 生成结果报告
lines = ["📧 GPTMail Key 导入结果\n"]
if added:
lines.append(f"✅ 成功添加: {len(added)}")
for k in added[:5]: # 最多显示5个
masked = f"{k[:4]}...{k[-4:]}" if len(k) > 10 else k
lines.append(f" • {masked}")
if len(added) > 5:
lines.append(f" • ... 等 {len(added)} 个")
if skipped:
lines.append(f"\n⏭️ 已跳过 (已存在): {len(skipped)}")
if invalid:
lines.append(f"\n❌ 无效 Key: {len(invalid)}")
for k in invalid[:3]: # 最多显示3个
masked = f"{k[:4]}...{k[-4:]}" if len(k) > 10 else k
lines.append(f" • {masked}")
lines.append(f"\n当前 Key 总数: {len(get_gptmail_keys())}")
await update.message.reply_text("\n".join(lines), parse_mode="HTML")
@admin_only
async def cmd_gptmail_del(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""删除 GPTMail API Key"""
if not context.args:
await update.message.reply_text(
"📧 删除 GPTMail API Key\n\n"
"用法: /gptmail_del <key>\n\n"
"注意: 只能删除动态添加的 Key,配置文件中的 Key 请直接修改 config.toml",
parse_mode="HTML"
)
return
key = context.args[0].strip()
# 检查是否是配置文件中的 Key
if key in GPTMAIL_API_KEYS:
await update.message.reply_text(
"⚠️ 该 Key 在配置文件中,无法通过 Bot 删除\n"
"请直接修改 config.toml"
)
return
# 删除 Key
if remove_gptmail_key(key):
await update.message.reply_text(
f"✅ Key 已删除\n\n"
f"当前 Key 总数: {len(get_gptmail_keys())}",
parse_mode="HTML"
)
else:
await update.message.reply_text("❌ Key 不存在或删除失败")
@admin_only
async def cmd_test_email(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""测试邮箱创建功能"""
if EMAIL_PROVIDER != "gptmail":
await update.message.reply_text(
f"⚠️ 当前邮箱提供商: {EMAIL_PROVIDER}\n"
f"测试功能仅支持 GPTMail 模式"
)
return
await update.message.reply_text("⏳ 正在测试邮箱创建...")
try:
# 测试创建邮箱
email, password = unified_create_email()
if email:
await update.message.reply_text(
f"✅ 邮箱创建成功\n\n"
f"邮箱: {email}\n"
f"密码: {password}\n\n"
f"当前 Key 数量: {len(get_gptmail_keys())}",
parse_mode="HTML"
)
else:
await update.message.reply_text(
"❌ 邮箱创建失败\n\n"
"请检查 GPTMail API Key 配置",
parse_mode="HTML"
)
except Exception as e:
await update.message.reply_text(f"❌ 测试失败: {e}")
@admin_only
async def cmd_iban_list(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""查看 IBAN 列表"""
try:
from auto_gpt_team import get_sepa_ibans
ibans = get_sepa_ibans()
if not ibans:
await update.message.reply_text(
"💳 SEPA IBAN 列表\n\n"
"📭 暂无 IBAN\n\n"
"使用 /iban_add 添加 IBAN",
parse_mode="HTML"
)
return
# 显示 IBAN 列表
lines = [f"💳 SEPA IBAN 列表 ({len(ibans)} 个)\n"]
for i, iban in enumerate(ibans[:50], 1): # 最多显示 50 个
lines.append(f"{i}. {iban}")
if len(ibans) > 50:
lines.append(f"\n... 还有 {len(ibans) - 50} 个未显示")
await update.message.reply_text("\n".join(lines), parse_mode="HTML")
except ImportError:
await update.message.reply_text("❌ auto_gpt_team 模块未找到")
except Exception as e:
await update.message.reply_text(f"❌ 获取 IBAN 列表失败: {e}")
@admin_only
async def cmd_iban_add(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""添加 IBAN"""
if not context.args:
await update.message.reply_text(
"💳 添加 IBAN\n\n"
"用法:\n"
"/iban_add DE123... DE456...\n"
"/iban_add DE123...,DE456...\n\n"
"支持空格或逗号分隔,每行一个也可以",
parse_mode="HTML"
)
return
try:
from auto_gpt_team import add_sepa_ibans
# 解析输入 (支持空格、逗号、换行分隔)
raw_input = " ".join(context.args)
# 替换逗号和换行为空格,然后按空格分割
ibans = [s.strip() for s in raw_input.replace(",", " ").replace("\n", " ").split() if s.strip()]
if not ibans:
await update.message.reply_text("❌ 未提供有效的 IBAN")
return
added, skipped, total = add_sepa_ibans(ibans)
await update.message.reply_text(
f"✅ IBAN 导入完成\n\n"
f"新增: {added}\n"
f"跳过 (重复): {skipped}\n"
f"当前总数: {total}",
parse_mode="HTML"
)
except ImportError:
await update.message.reply_text("❌ auto_gpt_team 模块未找到")
except Exception as e:
await update.message.reply_text(f"❌ 添加 IBAN 失败: {e}")
@admin_only
async def cmd_iban_clear(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""清空 IBAN 列表"""
# 需要确认
if not context.args or context.args[0].lower() != "confirm":
try:
from auto_gpt_team import get_sepa_ibans
count = len(get_sepa_ibans())
except:
count = 0
await update.message.reply_text(
f"⚠️ 确认清空 IBAN 列表?\n\n"
f"当前共有 {count} 个 IBAN\n\n"
f"确认请发送:\n"
f"/iban_clear confirm",
parse_mode="HTML"
)
return
try:
from auto_gpt_team import clear_sepa_ibans
clear_sepa_ibans()
await update.message.reply_text("✅ IBAN 列表已清空", parse_mode="HTML")
except ImportError:
await update.message.reply_text("❌ auto_gpt_team 模块未找到")
except Exception as e:
await update.message.reply_text(f"❌ 清空 IBAN 失败: {e}")
@admin_only
async def cmd_domain_list(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""查看邮箱域名列表"""
try:
from auto_gpt_team import get_email_domains
domains = get_email_domains()
if not domains:
await update.message.reply_text(
"📧 邮箱域名列表\n\n"
"📭 暂无域名\n\n"
"使用 /domain_add 添加域名",
parse_mode="HTML"
)
return
# 显示域名列表
lines = [f"📧 邮箱域名列表 ({len(domains)} 个)\n"]
for i, domain in enumerate(domains[:50], 1): # 最多显示 50 个
lines.append(f"{i}. {domain}")
if len(domains) > 50:
lines.append(f"\n... 还有 {len(domains) - 50} 个未显示")
await update.message.reply_text("\n".join(lines), parse_mode="HTML")
except ImportError:
await update.message.reply_text("❌ auto_gpt_team 模块未找到")
except Exception as e:
await update.message.reply_text(f"❌ 获取域名列表失败: {e}")
@admin_only
async def cmd_domain_add(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""添加邮箱域名"""
if not context.args:
await update.message.reply_text(
"📧 添加邮箱域名\n\n"
"用法:\n"
"/domain_add @example.com\n"
"/domain_add @a.com,@b.com\n\n"
"支持空格或逗号分隔\n"
"@ 符号可省略,会自动添加\n\n"
"格式要求:\n"
"• 域名需包含至少一个点号\n"
"• 只能包含字母、数字、连字符\n"
"• 顶级域名至少2个字符",
parse_mode="HTML"
)
return
try:
from auto_gpt_team import add_email_domains
# 解析输入 (支持空格、逗号、换行分隔)
raw_input = " ".join(context.args)
# 替换逗号和换行为空格,然后按空格分割
domains = [s.strip() for s in raw_input.replace(",", " ").replace("\n", " ").split() if s.strip()]
if not domains:
await update.message.reply_text("❌ 未提供有效的域名")
return
added, skipped, invalid, total = add_email_domains(domains)
# 构建响应消息
lines = ["✅ 域名导入完成\n"]
lines.append(f"新增: {added}")
lines.append(f"跳过 (重复): {skipped}")
if invalid > 0:
lines.append(f"无效 (格式错误): {invalid}")
lines.append(f"当前总数: {total}")
await update.message.reply_text("\n".join(lines), parse_mode="HTML")
except ImportError:
await update.message.reply_text("❌ auto_gpt_team 模块未找到")
except Exception as e:
await update.message.reply_text(f"❌ 添加域名失败: {e}")
@admin_only
async def cmd_domain_del(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""删除指定域名"""
if not context.args:
await update.message.reply_text(
"📧 删除域名\n\n"
"用法:\n"
"/domain_del @example.com\n\n"
"@ 符号可省略",
parse_mode="HTML"
)
return
try:
from auto_gpt_team import remove_email_domain, get_email_domains
domain = context.args[0].strip()
if remove_email_domain(domain):
total = len(get_email_domains())
await update.message.reply_text(
f"✅ 域名已删除\n\n"
f"已删除: {domain}\n"
f"剩余: {total} 个",
parse_mode="HTML"
)
else:
await update.message.reply_text(
f"❌ 域名不存在: {domain}",
parse_mode="HTML"
)
except ImportError:
await update.message.reply_text("❌ auto_gpt_team 模块未找到")
except Exception as e:
await update.message.reply_text(f"❌ 删除域名失败: {e}")
@admin_only
async def cmd_domain_clear(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""清空域名列表 (只清空txt文件中的域名,保留config配置)"""
# 需要确认
if not context.args or context.args[0].lower() != "confirm":
try:
from auto_gpt_team import get_file_domains_count, EMAIL_DOMAINS
file_count = get_file_domains_count()
config_count = len(EMAIL_DOMAINS) if EMAIL_DOMAINS else 0
except:
file_count = 0
config_count = 0
if file_count == 0:
await update.message.reply_text(
"📧 域名列表\n\n"
"txt文件中没有可清空的域名\n"
f"config配置中的域名: {config_count} 个 (不会被清空)",
parse_mode="HTML"
)
return
await update.message.reply_text(
f"⚠️ 确认清空域名列表?\n\n"
f"将清空txt文件中的域名: {file_count} 个\n"
f"config配置中的域名: {config_count} 个 (不会被清空)\n\n"
f"确认请发送:\n"
f"/domain_clear confirm",
parse_mode="HTML"
)
return
try:
from auto_gpt_team import clear_email_domains, EMAIL_DOMAINS
cleared_count = clear_email_domains()
config_count = len(EMAIL_DOMAINS) if EMAIL_DOMAINS else 0
await update.message.reply_text(
f"✅ 域名列表已清空\n\n"
f"已清空: {cleared_count} 个域名\n"
f"保留 (config配置): {config_count} 个",
parse_mode="HTML"
)
except ImportError:
await update.message.reply_text("❌ auto_gpt_team 模块未找到")
except Exception as e:
await update.message.reply_text(f"❌ 清空域名失败: {e}")
@admin_only
async def cmd_team_fingerprint(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""切换 GPT Team 随机指纹"""
import tomli_w
try:
# 读取当前配置
with open(CONFIG_FILE, "rb") as f:
import tomllib
config = tomllib.load(f)
# 确保 GPT Team section 存在
if "GPT Team" not in config:
config["GPT Team"] = {}
# 获取当前状态
current = config.get("GPT Team", {}).get("random_fingerprint", True)
new_value = not current
# 更新配置
config["GPT Team"]["random_fingerprint"] = new_value
# 写回文件
with open(CONFIG_FILE, "wb") as f:
tomli_w.dump(config, f)
status = "✅ 已开启" if new_value else "❌ 已关闭"
await update.message.reply_text(
f"🎭 GPT Team 随机指纹\n\n"
f"状态: {status}\n\n"
f"开启后每次运行将随机使用不同的:\n"
f"• User-Agent (Chrome 139-144)\n"
f"• WebGL 显卡指纹 (NVIDIA/AMD/Intel)\n"
f"• 屏幕分辨率 (1080p/1440p/4K)\n\n"
f"💡 下次运行 auto_gpt_team.py 时生效",
parse_mode="HTML"
)
except ImportError:
await update.message.reply_text(
"❌ 缺少 tomli_w 依赖\n"
"请运行: uv add tomli_w"
)
except Exception as e:
await update.message.reply_text(f"❌ 修改配置失败: {e}")
@admin_only
async def cmd_team_register(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""开始 GPT Team 自动订阅注册"""
# 检查是否有任务正在运行
if self.current_task and not self.current_task.done():
await update.message.reply_text(
f"⚠️ 有任务正在运行: {self.current_team}\n"
"请等待任务完成或使用 /stop 停止后再开始注册"
)
return
# 检查配置
try:
from auto_gpt_team import MAIL_API_TOKEN, MAIL_API_BASE, get_email_domains, get_sepa_ibans
if not MAIL_API_TOKEN or not MAIL_API_BASE:
await update.message.reply_text(
"❌ 配置错误\n\n"
"请在 config.toml 中配置 [GPT Team] 段:\n"
"• mail_api_token\n"
"• mail_api_base",
parse_mode="HTML"
)
return
domains = get_email_domains()
if not domains:
await update.message.reply_text(
"❌ 没有可用的邮箱域名\n\n"
"请先使用 /domain_add 导入域名",
parse_mode="HTML"
)
return
ibans = get_sepa_ibans()
if not ibans:
await update.message.reply_text(
"❌ 没有可用的 IBAN\n\n"
"请先使用 /iban_add 导入 IBAN",
parse_mode="HTML"
)
return
except ImportError:
await update.message.reply_text("❌ auto_gpt_team 模块未找到")
return
# 显示数量选择
keyboard = [
[
InlineKeyboardButton("1 个", callback_data="team_reg:count:1"),
InlineKeyboardButton("3 个", callback_data="team_reg:count:3"),
InlineKeyboardButton("5 个", callback_data="team_reg:count:5"),
],
[
InlineKeyboardButton("10 个", callback_data="team_reg:count:10"),
InlineKeyboardButton("20 个", callback_data="team_reg:count:20"),
InlineKeyboardButton("自定义", callback_data="team_reg:count:custom"),
],
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
"🚀 GPT Team 自动订阅\n\n"
f"📧 邮箱域名: {len(domains)} 个\n"
f"💳 可用 IBAN: {len(ibans)} 个\n\n"
"请选择注册数量:",
parse_mode="HTML",
reply_markup=reply_markup
)
async def callback_team_register(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""处理 GPT Team 注册回调"""
query = update.callback_query
# 权限检查
user_id = update.effective_user.id
if user_id not in TELEGRAM_ADMIN_CHAT_IDS:
await query.answer("⛔ 无权限", show_alert=True)
return
await query.answer()
data = query.data.split(":")
action = data[1] if len(data) > 1 else ""
value = data[2] if len(data) > 2 else ""
if action == "count":
if value == "custom":
await query.edit_message_text(
"📝 自定义数量\n\n"
"请发送数量 (1-50):\n"
"直接回复一个数字即可\n\n"
"例如: 20",
parse_mode="HTML"
)
# 设置等待输入状态
context.user_data["team_waiting_count"] = True
return
count = int(value)
# 显示输出方式选择
keyboard = [
[
InlineKeyboardButton("📄 JSON 文件", callback_data=f"team_reg:output:json:{count}"),
],
[
InlineKeyboardButton("📥 添加到 team.json", callback_data=f"team_reg:output:team:{count}"),
],
]
reply_markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text(
f"⚙️ 配置完成\n\n"
f"注册数量: {count} 个\n\n"
f"请选择输出方式:",
parse_mode="HTML",
reply_markup=reply_markup
)
elif action == "output":
output_type = value # json 或 team
count = int(data[3]) if len(data) > 3 else 1
await query.edit_message_text(
f"⚙️ 配置完成\n\n"
f"注册数量: {count} 个\n"
f"输出方式: {'📄 JSON 文件' if output_type == 'json' else '📥 team.json'}\n\n"
f"即将开始完整注册流程...",
parse_mode="HTML"
)
# 开始注册任务
self.current_team = f"GPT Team 注册 ({count}个)"
# 重置停止标志,确保新任务可以正常运行
try:
import run
run._shutdown_requested = False
except Exception:
pass
self.current_task = asyncio.create_task(
self._run_team_registration(query.message.chat_id, count, output_type)
)
async def _run_team_registration(self, chat_id: int, count: int, output_type: str):
"""执行 GPT Team 注册任务"""
from auto_gpt_team import run_single_registration, cleanup_chrome_processes
import json
import threading
results = []
success_count = 0
fail_count = 0
# 当前步骤 (用于显示)
current_step = ["初始化..."]
current_account = [""]
step_lock = threading.Lock()
def step_callback(step: str):
"""步骤回调 - 更新当前步骤"""
with step_lock:
current_step[0] = step
# 发送开始消息
progress_msg = await self.app.bot.send_message(
chat_id,
f"🚀 开始注册\n\n"
f"进度: 0/{count}\n"
f"{'▱' * 20}",
parse_mode="HTML"
)
# 进度更新任务
async def update_progress_loop():
"""定期更新进度消息"""
last_step = ""
while True:
await asyncio.sleep(1.5) # 每 1.5 秒更新一次
try:
with step_lock:
step = current_step[0]
account = current_account[0]
# 只有步骤变化时才更新
if step != last_step:
last_step = step
progress = int((success_count + fail_count) / count * 20) if count > 0 else 0
progress_bar = '▰' * progress + '▱' * (20 - progress)
text = (
f"🚀 注册中...\n\n"
f"进度: {success_count + fail_count}/{count}\n"
f"{progress_bar}\n\n"
f"✅ 成功: {success_count}\n"
f"❌ 失败: {fail_count}\n"
)
if account:
text += f"\n⏳ 账号: {account[:20]}..."
if step:
text += f"\n ▸ {step}"
try:
await progress_msg.edit_text(text, parse_mode="HTML")
except:
pass
except asyncio.CancelledError:
break
except:
pass
# 启动进度更新任务
progress_task = asyncio.create_task(update_progress_loop())
for i in range(count):
# 检查停止请求
try:
import run
if run._shutdown_requested:
break
except:
pass
# 执行注册
try:
# 使用 functools.partial 传递回调
import functools
def run_with_callback():
return run_single_registration(
progress_callback=None,
step_callback=step_callback
)
# 更新当前账号
with step_lock:
current_step[0] = "生成账号信息..."
current_account[0] = f"第 {i+1} 个"
result = await asyncio.get_event_loop().run_in_executor(
self.executor,
run_with_callback
)
if result.get("stopped"):
# 被 /stop 命令中断,不计入失败
log.info("注册被用户停止")
break
elif result.get("success"):
success_count += 1
results.append({
"account": result["account"],
"password": result["password"],
"token": result["token"],
"account_id": result.get("account_id", "")
})
with step_lock:
current_account[0] = result["account"]
else:
fail_count += 1
log.warning(f"注册失败: {result.get('error', '未知错误')}")
except Exception as e:
fail_count += 1
log.error(f"注册异常: {e}")
# 清理浏览器进程
cleanup_chrome_processes()
# 停止进度更新任务
progress_task.cancel()
try:
await progress_task
except asyncio.CancelledError:
pass
# 完成进度
progress_bar = '▰' * 20
await progress_msg.edit_text(
f"🎉 注册完成! {success_count}/{count}\n"
f"{progress_bar}\n\n"
f"✅ 成功: {success_count}\n"
f"❌ 失败: {fail_count}",
parse_mode="HTML"
)
# 处理结果
if results:
if output_type == "json":
# 生成 JSON 文件
timestamp = time.strftime("%Y%m%d_%H%M%S")
filename = f"team_accounts_{timestamp}.json"
filepath = Path(filename)
with open(filepath, "w", encoding="utf-8") as f:
json.dump(results, f, ensure_ascii=False, indent=2)
# 发送文件
await self.app.bot.send_document(
chat_id,
document=open(filepath, "rb"),
filename=filename,
caption=f"📄 注册结果 ({success_count} 个账号)"
)
# 删除临时文件
filepath.unlink()
elif output_type == "team":
# 添加到 team.json
try:
team_file = TEAM_JSON_FILE
existing = []
if team_file.exists():
with open(team_file, "r", encoding="utf-8") as f:
existing = json.load(f)
existing.extend(results)
with open(team_file, "w", encoding="utf-8") as f:
json.dump(existing, f, ensure_ascii=False, indent=2)
# 重载配置
reload_config()
await self.app.bot.send_message(
chat_id,
f"✅ 已添加到 team.json\n\n"
f"新增: {success_count} 个账号\n"
f"当前总数: {len(existing)} 个",
parse_mode="HTML"
)
except Exception as e:
await self.app.bot.send_message(
chat_id,
f"❌ 保存到 team.json 失败: {e}"
)
self.current_task = None
self.current_team = None
@admin_only
async def cmd_autogptplus(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""AutoGPTPlus 配置管理 - 交互式菜单"""
keyboard = [
[
InlineKeyboardButton("📋 查看配置", callback_data="autogptplus:config"),
InlineKeyboardButton("🔑 设置 Token", callback_data="autogptplus:set_token"),
],
[
InlineKeyboardButton("📧 域名管理", callback_data="autogptplus:domains"),
InlineKeyboardButton("💳 IBAN 管理", callback_data="autogptplus:ibans"),
],
[
InlineKeyboardButton("🎭 随机指纹", callback_data="autogptplus:fingerprint"),
InlineKeyboardButton("📊 统计信息", callback_data="autogptplus:stats"),
],
[
InlineKeyboardButton("📧 测试邮件", callback_data="autogptplus:test_email"),
InlineKeyboardButton("🔄 测试 API", callback_data="autogptplus:test_api"),
],
[
InlineKeyboardButton("🚀 开始注册", callback_data="autogptplus:register"),
],
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
"🤖 AutoGPTPlus 管理面板\n\n"
"ChatGPT 订阅自动化配置管理\n\n"
"请选择功能:",
parse_mode="HTML",
reply_markup=reply_markup
)
def _get_autogptplus_main_keyboard(self):
"""获取 AutoGPTPlus 主菜单键盘"""
return InlineKeyboardMarkup([
[
InlineKeyboardButton("📋 查看配置", callback_data="autogptplus:config"),
InlineKeyboardButton("🔑 设置 Token", callback_data="autogptplus:set_token"),
],
[
InlineKeyboardButton("📧 域名管理", callback_data="autogptplus:domains"),
InlineKeyboardButton("💳 IBAN 管理", callback_data="autogptplus:ibans"),
],
[
InlineKeyboardButton("🎭 随机指纹", callback_data="autogptplus:fingerprint"),
InlineKeyboardButton("📊 统计信息", callback_data="autogptplus:stats"),
],
[
InlineKeyboardButton("📧 测试邮件", callback_data="autogptplus:test_email"),
InlineKeyboardButton("🔄 测试 API", callback_data="autogptplus:test_api"),
],
[
InlineKeyboardButton("🚀 开始注册", callback_data="autogptplus:register"),
],
])
async def callback_autogptplus(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""处理 AutoGPTPlus 回调"""
query = update.callback_query
# 权限检查
user_id = update.effective_user.id
if user_id not in TELEGRAM_ADMIN_CHAT_IDS:
await query.answer("⛔ 无权限", show_alert=True)
return
await query.answer()
data = query.data.split(":")
action = data[1] if len(data) > 1 else ""
sub_action = data[2] if len(data) > 2 else ""
if action == "config":
await self._show_autogptplus_config(query)
elif action == "set_token":
await self._prompt_autogptplus_token(query, context)
elif action == "test_email":
await self._test_autogptplus_email(query)
elif action == "test_api":
await self._test_autogptplus_api(query)
elif action == "domains":
await self._show_autogptplus_domains(query, sub_action)
elif action == "ibans":
await self._show_autogptplus_ibans(query, sub_action)
elif action == "fingerprint":
await self._toggle_autogptplus_fingerprint(query)
elif action == "stats":
await self._show_autogptplus_stats(query)
elif action == "register":
await self._start_autogptplus_register(query, context)
elif action == "back":
# 返回主菜单
await query.edit_message_text(
"🤖 AutoGPTPlus 管理面板\n\n"
"ChatGPT 订阅自动化配置管理\n\n"
"请选择功能:",
parse_mode="HTML",
reply_markup=self._get_autogptplus_main_keyboard()
)
async def _prompt_autogptplus_token(self, query, context: ContextTypes.DEFAULT_TYPE):
"""提示用户输入 Token"""
try:
from auto_gpt_team import MAIL_API_TOKEN
# 脱敏显示当前 Token
current_display = "未配置"
if MAIL_API_TOKEN:
if len(MAIL_API_TOKEN) > 10:
current_display = f"{MAIL_API_TOKEN[:8]}...{MAIL_API_TOKEN[-4:]}"
else:
current_display = MAIL_API_TOKEN[:4] + "..."
keyboard = [[InlineKeyboardButton("❌ 取消", callback_data="autogptplus:back")]]
reply_markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text(
"🔑 设置 Cloud Mail API Token\n\n"
f"当前 Token: {current_display}\n\n"
"请直接发送新的 Token:\n"
"(发送后将自动保存到 config.toml)",
parse_mode="HTML",
reply_markup=reply_markup
)
# 设置等待输入状态
context.user_data["autogptplus_waiting_token"] = True
except ImportError:
keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")]]
reply_markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text(
"❌ 模块未找到\n\n"
"auto_gpt_team 模块未安装或导入失败",
parse_mode="HTML",
reply_markup=reply_markup
)
async def _show_autogptplus_config(self, query):
"""显示 AutoGPTPlus 配置"""
try:
from auto_gpt_team import (
MAIL_API_TOKEN, MAIL_API_BASE, EMAIL_DOMAINS,
SEPA_IBANS, RANDOM_FINGERPRINT, get_email_domains, get_sepa_ibans
)
# 脱敏显示 Token
token_display = "未配置"
if MAIL_API_TOKEN:
if len(MAIL_API_TOKEN) > 10:
token_display = f"{MAIL_API_TOKEN[:8]}...{MAIL_API_TOKEN[-4:]}"
else:
token_display = MAIL_API_TOKEN[:4] + "..."
# 获取域名列表
domains = get_email_domains()
domains_display = ", ".join(domains[:3]) if domains else "未配置"
if len(domains) > 3:
domains_display += f" (+{len(domains) - 3})"
# 获取 IBAN 列表
ibans = get_sepa_ibans()
# 随机指纹状态
fingerprint_status = "✅ 已开启" if RANDOM_FINGERPRINT else "❌ 已关闭"
lines = [
"📋 AutoGPTPlus 配置",
"",
"🔑 Cloud Mail API",
f" Token: {token_display}",
f" 地址: {MAIL_API_BASE or '未配置'}",
"",
"📧 邮箱域名",
f" 数量: {len(domains)} 个",
f" 域名: {domains_display}",
"",
"💳 SEPA IBAN",
f" 数量: {len(ibans)} 个",
"",
"🎭 随机指纹",
f" 状态: {fingerprint_status}",
]
# 配置状态检查
config_ok = bool(MAIL_API_TOKEN and MAIL_API_BASE and domains)
if config_ok:
lines.append("\n✅ 配置完整,可以使用")
else:
lines.append("\n⚠️ 配置不完整:")
if not MAIL_API_TOKEN:
lines.append(" • 缺少 mail_api_token")
if not MAIL_API_BASE:
lines.append(" • 缺少 mail_api_base")
if not domains:
lines.append(" • 缺少 email_domains")
keyboard = [
[
InlineKeyboardButton("🔑 设置 Token", callback_data="autogptplus:set_token"),
InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back"),
]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text(
"\n".join(lines),
parse_mode="HTML",
reply_markup=reply_markup
)
except ImportError:
keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")]]
reply_markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text(
"❌ 模块未找到\n\n"
"auto_gpt_team 模块未安装或导入失败",
parse_mode="HTML",
reply_markup=reply_markup
)
except Exception as e:
keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")]]
reply_markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text(
f"❌ 获取配置失败: {e}",
reply_markup=reply_markup
)
async def _test_autogptplus_email(self, query):
"""测试 AutoGPTPlus 邮件创建"""
await query.edit_message_text("⏳ 正在测试邮件创建...")
try:
from auto_gpt_team import (
MAIL_API_TOKEN, MAIL_API_BASE, get_email_domains
)
import requests
import random
import string
# 检查配置
if not MAIL_API_TOKEN or not MAIL_API_BASE:
keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")]]
reply_markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text(
"❌ 配置不完整\n\n"
"请先在 config.toml 中配置:\n"
"• mail_api_token\n"
"• mail_api_base",
parse_mode="HTML",
reply_markup=reply_markup
)
return
domains = get_email_domains()
if not domains:
keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")]]
reply_markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text(
"❌ 邮箱域名未配置\n\n"
"请先在 config.toml 中配置 email_domains\n"
"或使用 /domain_add 添加域名",
parse_mode="HTML",
reply_markup=reply_markup
)
return
# 生成测试邮箱
random_str = ''.join(random.choices(string.ascii_lowercase + string.digits, k=8))
domain = random.choice(domains)
test_email = f"test-{random_str}@{domain.lstrip('@')}"
# 测试创建邮箱 (通过查询邮件列表来验证 API 连接)
url = f"{MAIL_API_BASE}/api/public/emailList"
headers = {
"Authorization": MAIL_API_TOKEN,
"Content-Type": "application/json"
}
payload = {
"toEmail": test_email,
"timeSort": "desc",
"size": 1
}
response = requests.post(url, headers=headers, json=payload, timeout=10)
data = response.json()
keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")]]
reply_markup = InlineKeyboardMarkup(keyboard)
if data.get("code") == 200:
await query.edit_message_text(
"✅ 邮件 API 测试成功\n\n"
f"测试邮箱: {test_email}\n"
f"API 响应: 正常\n\n"
f"邮件系统已就绪,可以接收验证码",
parse_mode="HTML",
reply_markup=reply_markup
)
else:
error_msg = data.get("message", "未知错误")
await query.edit_message_text(
f"⚠️ API 响应异常\n\n"
f"状态码: {data.get('code')}\n"
f"错误: {error_msg}",
parse_mode="HTML",
reply_markup=reply_markup
)
except ImportError:
keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")]]
reply_markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text(
"❌ 模块未找到\n\n"
"auto_gpt_team 模块未安装或导入失败",
parse_mode="HTML",
reply_markup=reply_markup
)
except requests.exceptions.Timeout:
keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")]]
reply_markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text(
"❌ 连接超时\n\n"
"无法连接到邮件 API 服务器\n"
"请检查 mail_api_base 配置",
parse_mode="HTML",
reply_markup=reply_markup
)
except Exception as e:
keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")]]
reply_markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text(
f"❌ 测试失败\n\n{e}",
parse_mode="HTML",
reply_markup=reply_markup
)
async def _test_autogptplus_api(self, query):
"""测试 AutoGPTPlus API 连接"""
await query.edit_message_text("⏳ 正在测试 API 连接...")
try:
from auto_gpt_team import MAIL_API_TOKEN, MAIL_API_BASE
import requests
# 检查配置
if not MAIL_API_TOKEN or not MAIL_API_BASE:
keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")]]
reply_markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text(
"❌ 配置不完整\n\n"
"请先在 config.toml 中配置:\n"
"• mail_api_token\n"
"• mail_api_base",
parse_mode="HTML",
reply_markup=reply_markup
)
return
# 测试 API 连接
url = f"{MAIL_API_BASE}/api/public/emailList"
headers = {
"Authorization": MAIL_API_TOKEN,
"Content-Type": "application/json"
}
payload = {
"toEmail": "test@test.com",
"timeSort": "desc",
"size": 1
}
start_time = time.time()
response = requests.post(url, headers=headers, json=payload, timeout=10)
elapsed = time.time() - start_time
data = response.json()
keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")]]
reply_markup = InlineKeyboardMarkup(keyboard)
if response.status_code == 200 and data.get("code") == 200:
await query.edit_message_text(
"✅ API 连接测试成功\n\n"
f"服务器: {MAIL_API_BASE}\n"
f"响应时间: {elapsed*1000:.0f}ms\n"
f"状态: 正常\n\n"
"Cloud Mail API 服务运行正常",
parse_mode="HTML",
reply_markup=reply_markup
)
else:
error_msg = data.get("message", "未知错误")
await query.edit_message_text(
f"⚠️ API 响应异常\n\n"
f"HTTP 状态: {response.status_code}\n"
f"API 状态: {data.get('code')}\n"
f"错误: {error_msg}\n"
f"响应时间: {elapsed*1000:.0f}ms",
parse_mode="HTML",
reply_markup=reply_markup
)
except ImportError:
keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")]]
reply_markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text(
"❌ 模块未找到\n\n"
"auto_gpt_team 模块未安装或导入失败",
parse_mode="HTML",
reply_markup=reply_markup
)
except requests.exceptions.ConnectionError:
keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")]]
reply_markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text(
"❌ 连接失败\n\n"
"无法连接到邮件 API 服务器\n"
"请检查:\n"
"• mail_api_base 地址是否正确\n"
"• 服务器是否在线\n"
"• 网络连接是否正常",
parse_mode="HTML",
reply_markup=reply_markup
)
except requests.exceptions.Timeout:
keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")]]
reply_markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text(
"❌ 连接超时\n\n"
"API 服务器响应超时 (>10s)\n"
"请检查服务器状态",
parse_mode="HTML",
reply_markup=reply_markup
)
except Exception as e:
keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")]]
reply_markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text(
f"❌ 测试失败\n\n{e}",
parse_mode="HTML",
reply_markup=reply_markup
)
async def _show_autogptplus_domains(self, query, sub_action: str = ""):
"""显示/管理域名"""
try:
from auto_gpt_team import (
get_email_domains, get_file_domains_count, EMAIL_DOMAINS,
clear_email_domains
)
if sub_action == "clear":
# 清空域名
cleared = clear_email_domains()
config_count = len(EMAIL_DOMAINS) if EMAIL_DOMAINS else 0
keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:domains")]]
reply_markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text(
f"✅ 域名已清空\n\n"
f"已清空: {cleared} 个\n"
f"保留 (config): {config_count} 个",
parse_mode="HTML",
reply_markup=reply_markup
)
return
# 显示域名列表
domains = get_email_domains()
file_count = get_file_domains_count()
config_count = len(EMAIL_DOMAINS) if EMAIL_DOMAINS else 0
lines = ["📧 邮箱域名管理\n"]
lines.append(f"总计: {len(domains)} 个")
lines.append(f" • txt文件: {file_count} 个")
lines.append(f" • config配置: {config_count} 个\n")
if domains:
lines.append("域名列表:")
for i, domain in enumerate(domains[:15], 1):
lines.append(f" {i}. {domain}")
if len(domains) > 15:
lines.append(f" ... 还有 {len(domains) - 15} 个")
keyboard = [
[
InlineKeyboardButton("🗑️ 清空txt域名", callback_data="autogptplus:domains:clear"),
],
[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")],
]
reply_markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text(
"\n".join(lines),
parse_mode="HTML",
reply_markup=reply_markup
)
except ImportError:
keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")]]
reply_markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text(
"❌ 模块未找到",
parse_mode="HTML",
reply_markup=reply_markup
)
except Exception as e:
keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")]]
reply_markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text(
f"❌ 操作失败\n\n{e}",
parse_mode="HTML",
reply_markup=reply_markup
)
async def _show_autogptplus_ibans(self, query, sub_action: str = ""):
"""显示/管理 IBAN"""
try:
from auto_gpt_team import get_sepa_ibans, load_ibans_from_file, SEPA_IBANS, clear_sepa_ibans
if sub_action == "clear":
# 清空 IBAN
clear_sepa_ibans()
keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:ibans")]]
reply_markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text(
"✅ IBAN 列表已清空",
parse_mode="HTML",
reply_markup=reply_markup
)
return
# 显示 IBAN 列表
ibans = get_sepa_ibans()
file_ibans = load_ibans_from_file()
config_count = len(SEPA_IBANS) if SEPA_IBANS else 0
lines = ["💳 IBAN 管理\n"]
lines.append(f"总计: {len(ibans)} 个")
lines.append(f" • txt文件: {len(file_ibans)} 个")
lines.append(f" • config配置: {config_count} 个\n")
if ibans:
lines.append("IBAN 列表:")
for i, iban in enumerate(ibans[:10], 1):
# 脱敏显示
masked = f"{iban[:8]}...{iban[-4:]}" if len(iban) > 12 else iban
lines.append(f" {i}. {masked}")
if len(ibans) > 10:
lines.append(f" ... 还有 {len(ibans) - 10} 个")
keyboard = [
[
InlineKeyboardButton("🗑️ 清空IBAN", callback_data="autogptplus:ibans:clear"),
],
[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")],
]
reply_markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text(
"\n".join(lines),
parse_mode="HTML",
reply_markup=reply_markup
)
except ImportError:
keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")]]
reply_markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text(
"❌ 模块未找到",
parse_mode="HTML",
reply_markup=reply_markup
)
except Exception as e:
keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")]]
reply_markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text(
f"❌ 操作失败\n\n{e}",
parse_mode="HTML",
reply_markup=reply_markup
)
async def _toggle_autogptplus_fingerprint(self, query):
"""切换随机指纹"""
import tomli_w
try:
# 读取当前配置
with open(CONFIG_FILE, "rb") as f:
import tomllib
config = tomllib.load(f)
# 确保 autogptplus section 存在
if "autogptplus" not in config:
config["autogptplus"] = {}
# 获取当前状态并切换
current = config.get("autogptplus", {}).get("random_fingerprint", True)
new_value = not current
# 更新配置
config["autogptplus"]["random_fingerprint"] = new_value
# 写回文件
with open(CONFIG_FILE, "wb") as f:
tomli_w.dump(config, f)
# 重新加载模块
import importlib
import auto_gpt_team
importlib.reload(auto_gpt_team)
status = "✅ 已开启" if new_value else "❌ 已关闭"
keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")]]
reply_markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text(
f"🎭 随机指纹设置\n\n"
f"状态: {status}\n\n"
f"{'每次注册将使用随机浏览器指纹' if new_value else '将使用固定浏览器指纹'}",
parse_mode="HTML",
reply_markup=reply_markup
)
except Exception as e:
keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")]]
reply_markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text(
f"❌ 设置失败\n\n{e}",
parse_mode="HTML",
reply_markup=reply_markup
)
async def _show_autogptplus_stats(self, query):
"""显示统计信息"""
try:
from auto_gpt_team import get_email_domains, get_sepa_ibans, RANDOM_FINGERPRINT
import json
from pathlib import Path
# 读取账号文件统计
accounts_file = Path("accounts.json")
accounts_count = 0
if accounts_file.exists():
try:
with open(accounts_file, "r", encoding="utf-8") as f:
accounts = json.load(f)
accounts_count = len(accounts)
except:
pass
domains = get_email_domains()
ibans = get_sepa_ibans()
lines = ["📊 AutoGPTPlus 统计信息\n"]
# 资源统计
lines.append("📦 可用资源:")
lines.append(f" • 邮箱域名: {len(domains)} 个")
lines.append(f" • IBAN: {len(ibans)} 个")
lines.append(f" • 随机指纹: {'开启' if RANDOM_FINGERPRINT else '关闭'}")
lines.append("")
# 账号统计
lines.append("👥 已注册账号:")
lines.append(f" • 总计: {accounts_count} 个")
# 配置状态
lines.append("")
lines.append("⚙️ 配置状态:")
if len(domains) > 0 and len(ibans) > 0:
lines.append(" ✅ 已就绪,可以开始注册")
else:
if len(domains) == 0:
lines.append(" ⚠️ 缺少邮箱域名")
if len(ibans) == 0:
lines.append(" ⚠️ 缺少 IBAN")
keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")]]
reply_markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text(
"\n".join(lines),
parse_mode="HTML",
reply_markup=reply_markup
)
except ImportError:
keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")]]
reply_markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text(
"❌ 模块未找到",
parse_mode="HTML",
reply_markup=reply_markup
)
except Exception as e:
keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")]]
reply_markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text(
f"❌ 获取统计失败\n\n{e}",
parse_mode="HTML",
reply_markup=reply_markup
)
async def _start_autogptplus_register(self, query, context):
"""快速开始注册 (跳转到 team_register)"""
try:
from auto_gpt_team import get_email_domains, get_sepa_ibans, MAIL_API_TOKEN, MAIL_API_BASE
# 检查配置
domains = get_email_domains()
ibans = get_sepa_ibans()
missing = []
if not MAIL_API_TOKEN:
missing.append("mail_api_token")
if not MAIL_API_BASE:
missing.append("mail_api_base")
if not domains:
missing.append("邮箱域名")
if not ibans:
missing.append("IBAN")
if missing:
keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")]]
reply_markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text(
"⚠️ 配置不完整\n\n"
"缺少以下配置:\n" +
"\n".join(f" • {m}" for m in missing) +
"\n\n请先完成配置后再开始注册",
parse_mode="HTML",
reply_markup=reply_markup
)
return
# 显示注册选项
keyboard = [
[
InlineKeyboardButton("1️⃣ 注册1个", callback_data="team_reg:1"),
InlineKeyboardButton("3️⃣ 注册3个", callback_data="team_reg:3"),
],
[
InlineKeyboardButton("5️⃣ 注册5个", callback_data="team_reg:5"),
InlineKeyboardButton("🔢 自定义", callback_data="team_reg:custom"),
],
[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")],
]
reply_markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text(
"🚀 开始 ChatGPT Team 注册\n\n"
f"可用资源:\n"
f" • 邮箱域名: {len(domains)} 个\n"
f" • IBAN: {len(ibans)} 个\n\n"
"选择注册数量:",
parse_mode="HTML",
reply_markup=reply_markup
)
except ImportError:
keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")]]
reply_markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text(
"❌ 模块未找到",
parse_mode="HTML",
reply_markup=reply_markup
)
except Exception as e:
keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")]]
reply_markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text(
f"❌ 操作失败\n\n{e}",
parse_mode="HTML",
reply_markup=reply_markup
)
@admin_only
async def handle_team_custom_count(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""处理文本输入 (GPT Team 自定义数量 / AutoGPTPlus Token)"""
# 处理 AutoGPTPlus Token 输入
if context.user_data.get("autogptplus_waiting_token"):
context.user_data["autogptplus_waiting_token"] = False
await self._handle_autogptplus_token_input(update, context)
return
# 处理 GPT Team 自定义数量输入
if not context.user_data.get("team_waiting_count"):
return # 不在等待状态,忽略消息
# 清除等待状态
context.user_data["team_waiting_count"] = False
text = update.message.text.strip()
# 验证输入
try:
count = int(text)
if count < 1 or count > 50:
await update.message.reply_text(
"❌ 数量必须在 1-50 之间\n\n"
"请重新使用 /team_register 开始"
)
return
except ValueError:
await update.message.reply_text(
"❌ 请输入有效的数字\n\n"
"请重新使用 /team_register 开始"
)
return
# 显示输出方式选择
keyboard = [
[
InlineKeyboardButton("📄 JSON 文件", callback_data=f"team_reg:output:json:{count}"),
],
[
InlineKeyboardButton("📥 添加到 team.json", callback_data=f"team_reg:output:team:{count}"),
],
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
f"⚙️ 配置完成\n\n"
f"注册数量: {count} 个\n\n"
f"请选择输出方式:",
parse_mode="HTML",
reply_markup=reply_markup
)
async def _handle_autogptplus_token_input(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""处理 AutoGPTPlus Token 输入 - 保存后立即测试"""
import tomli_w
import requests
token = update.message.text.strip()
if not token:
await update.message.reply_text("❌ Token 不能为空")
return
# 脱敏显示
if len(token) > 10:
token_display = f"{token[:8]}...{token[-4:]}"
else:
token_display = token[:4] + "..."
# 先发送保存中的消息
status_msg = await update.message.reply_text(
f"⏳ 正在保存并验证 Token...\n\n"
f"Token: {token_display}",
parse_mode="HTML"
)
try:
# 读取当前配置获取 API 地址
with open(CONFIG_FILE, "rb") as f:
import tomllib
config = tomllib.load(f)
mail_api_base = config.get("autogptplus", {}).get("mail_api_base", "")
if not mail_api_base:
keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")]]
reply_markup = InlineKeyboardMarkup(keyboard)
await status_msg.edit_text(
"❌ 无法验证 Token\n\n"
"mail_api_base 未配置\n"
"请先在 config.toml 中配置 API 地址",
parse_mode="HTML",
reply_markup=reply_markup
)
return
# 测试 Token 是否有效
url = f"{mail_api_base}/api/public/emailList"
headers = {
"Authorization": token,
"Content-Type": "application/json"
}
payload = {
"toEmail": "test@test.com",
"timeSort": "desc",
"size": 1
}
start_time = time.time()
response = requests.post(url, headers=headers, json=payload, timeout=10)
elapsed = time.time() - start_time
data = response.json()
# 检查 Token 是否有效
if response.status_code == 200 and data.get("code") == 200:
# Token 有效,保存到配置文件
if "autogptplus" not in config:
config["autogptplus"] = {}
config["autogptplus"]["mail_api_token"] = token
with open(CONFIG_FILE, "wb") as f:
tomli_w.dump(config, f)
keyboard = [
[
InlineKeyboardButton("📋 查看配置", callback_data="autogptplus:config"),
InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back"),
],
]
reply_markup = InlineKeyboardMarkup(keyboard)
await status_msg.edit_text(
f"✅ Token 验证成功并已保存\n\n"
f"Token: {token_display}\n"
f"响应时间: {elapsed*1000:.0f}ms\n\n"
f"💡 使用 /reload 重载配置使其生效",
parse_mode="HTML",
reply_markup=reply_markup
)
else:
# Token 无效
error_msg = data.get("message", "未知错误")
keyboard = [
[
InlineKeyboardButton("🔑 重新设置", callback_data="autogptplus:set_token"),
InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back"),
],
]
reply_markup = InlineKeyboardMarkup(keyboard)
await status_msg.edit_text(
f"❌ Token 验证失败\n\n"
f"Token: {token_display}\n"
f"错误: {error_msg}\n\n"
f"Token 未保存,请检查后重试",
parse_mode="HTML",
reply_markup=reply_markup
)
except requests.exceptions.ConnectionError:
keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")]]
reply_markup = InlineKeyboardMarkup(keyboard)
await status_msg.edit_text(
"❌ 连接失败\n\n"
"无法连接到 API 服务器\n"
"请检查 mail_api_base 配置",
parse_mode="HTML",
reply_markup=reply_markup
)
except requests.exceptions.Timeout:
keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")]]
reply_markup = InlineKeyboardMarkup(keyboard)
await status_msg.edit_text(
"❌ 连接超时\n\n"
"API 服务器响应超时",
parse_mode="HTML",
reply_markup=reply_markup
)
except ImportError:
await status_msg.edit_text(
"❌ 缺少 tomli_w 依赖\n"
"请运行: uv add tomli_w"
)
except Exception as e:
keyboard = [[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")]]
reply_markup = InlineKeyboardMarkup(keyboard)
await status_msg.edit_text(
f"❌ 验证失败\n\n{e}",
parse_mode="HTML",
reply_markup=reply_markup
)
async def main():
"""主函数"""
if not TELEGRAM_BOT_TOKEN:
print("Telegram Bot Token 未配置,请在 config.toml 中设置 telegram.bot_token")
sys.exit(1)
if not TELEGRAM_ADMIN_CHAT_IDS:
print("管理员 Chat ID 未配置,请在 config.toml 中设置 telegram.admin_chat_ids")
sys.exit(1)
bot = ProvisionerBot()
# 处理 Ctrl+C
import signal
def signal_handler(sig, frame):
log.info("正在关闭...")
bot.request_shutdown()
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
await bot.start()
if __name__ == "__main__":
asyncio.run(main())