Files
codexTool/telegram_bot.py
kyx236 b902922d22 feat(config, email_service, telegram_bot): Add dynamic GPTMail API key management and config reload capability
- Add reload_config() function to dynamically reload config.toml and team.json without restart
- Implement GPTMail API key management with support for multiple keys (api_keys list)
- Add functions to manage GPTMail keys: get_gptmail_keys(), add_gptmail_key(), remove_gptmail_key()
- Add key rotation strategies: get_next_gptmail_key() (round-robin) and get_random_gptmail_key()
- Add gptmail_keys.json file support for dynamic key management
- Fix config parsing to handle include_team_owners and proxy settings in multiple locations
- Update email_service.py to use key rotation for GPTMail API calls
- Update telegram_bot.py to support dynamic key management
- Add install_service.sh script for service installation
- Update config.toml.example with new api_keys configuration option
- Improve configuration flexibility by supporting both old (api_key) and new (api_keys) formats
2026-01-17 05:52:05 +08:00

1324 lines
47 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# ==================== Telegram Bot 主程序 ====================
# 通过 Telegram 远程控制 OpenAI Team 批量注册任务
import asyncio
import sys
from concurrent.futures import ThreadPoolExecutor
from functools import wraps
from typing import Optional
from telegram import Update, Bot, BotCommand
from telegram.ext import (
Application,
CommandHandler,
MessageHandler,
filters,
ContextTypes,
)
from config import (
TELEGRAM_BOT_TOKEN,
TELEGRAM_ADMIN_CHAT_IDS,
TELEGRAM_ENABLED,
TEAMS,
AUTH_PROVIDER,
TEAM_JSON_FILE,
TELEGRAM_CHECK_INTERVAL,
TELEGRAM_LOW_STOCK_THRESHOLD,
CONFIG_FILE,
EMAIL_PROVIDER,
BROWSER_HEADLESS,
ACCOUNTS_PER_TEAM,
PROXY_ENABLED,
PROXIES,
S2A_API_BASE,
CPA_API_BASE,
CRS_API_BASE,
get_gptmail_keys,
add_gptmail_key,
remove_gptmail_key,
GPTMAIL_API_KEYS,
INCLUDE_TEAM_OWNERS,
reload_config,
S2A_CONCURRENCY,
S2A_PRIORITY,
S2A_GROUP_NAMES,
S2A_GROUP_IDS,
S2A_ADMIN_KEY,
)
from utils import load_team_tracker
from bot_notifier import BotNotifier, set_notifier, progress_finish
from s2a_service import s2a_get_dashboard_stats, format_dashboard_stats
from email_service import gptmail_service, unified_create_email
from logger import log
def admin_only(func):
"""装饰器: 仅允许管理员执行命令"""
@wraps(func)
async def wrapper(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
user_id = update.effective_user.id
if user_id not in TELEGRAM_ADMIN_CHAT_IDS:
await update.message.reply_text("⛔ 无权限,你的 ID 不在管理员列表中")
return
return await func(self, update, context)
return wrapper
class ProvisionerBot:
"""OpenAI Team Provisioner Telegram Bot"""
def __init__(self):
self.executor = ThreadPoolExecutor(max_workers=1)
self.current_task: Optional[asyncio.Task] = None
self.current_team: Optional[str] = None
self.app: Optional[Application] = None
self.notifier: Optional[BotNotifier] = None
self._shutdown_event = asyncio.Event()
async def start(self):
"""启动 Bot"""
if not TELEGRAM_BOT_TOKEN:
log.error("Telegram Bot Token not configured")
return
# 创建 Application
self.app = Application.builder().token(TELEGRAM_BOT_TOKEN).build()
# 初始化通知器
self.notifier = BotNotifier(self.app.bot, TELEGRAM_ADMIN_CHAT_IDS)
set_notifier(self.notifier)
# 注册命令处理器
handlers = [
("start", self.cmd_help),
("help", self.cmd_help),
("status", self.cmd_status),
("team", self.cmd_team),
("list", self.cmd_list),
("config", self.cmd_config),
("headless", self.cmd_headless),
("run", self.cmd_run),
("run_all", self.cmd_run_all),
("stop", self.cmd_stop),
("logs", self.cmd_logs),
("dashboard", self.cmd_dashboard),
("import", self.cmd_import),
("stock", self.cmd_stock),
("gptmail_keys", self.cmd_gptmail_keys),
("gptmail_add", self.cmd_gptmail_add),
("gptmail_del", self.cmd_gptmail_del),
("test_email", self.cmd_test_email),
("include_owners", self.cmd_include_owners),
("reload", self.cmd_reload),
("s2a_config", self.cmd_s2a_config),
]
for cmd, handler in handlers:
self.app.add_handler(CommandHandler(cmd, handler))
# 注册文件上传处理器 (JSON 文件)
self.app.add_handler(MessageHandler(
filters.Document.MimeType("application/json"),
self.handle_json_file
))
# 注册定时检查任务
if TELEGRAM_CHECK_INTERVAL > 0 and AUTH_PROVIDER == "s2a":
self.app.job_queue.run_repeating(
self.scheduled_stock_check,
interval=TELEGRAM_CHECK_INTERVAL,
first=60, # 启动后1分钟执行第一次
name="stock_check"
)
log.info(f"Stock check scheduled every {TELEGRAM_CHECK_INTERVAL}s")
# 启动通知器
await self.notifier.start()
log.success("Telegram Bot started")
log.info(f"Admin Chat IDs: {TELEGRAM_ADMIN_CHAT_IDS}")
# 发送启动通知
await self.notifier.notify("<b>🤖 Bot 已启动</b>\n准备就绪,发送 /help 查看帮助")
# 运行 Bot
await self.app.initialize()
await self.app.start()
# 设置命令菜单提示
await self._set_commands()
await self.app.updater.start_polling(drop_pending_updates=True)
# 等待关闭信号
await self._shutdown_event.wait()
# 清理
await self.app.updater.stop()
await self.app.stop()
await self.app.shutdown()
await self.notifier.stop()
def request_shutdown(self):
"""请求关闭 Bot"""
self._shutdown_event.set()
async def _set_commands(self):
"""设置 Bot 命令菜单提示"""
commands = [
BotCommand("help", "查看帮助信息"),
BotCommand("list", "查看 team.json 账号列表"),
BotCommand("status", "查看任务处理状态"),
BotCommand("team", "查看指定 Team 详情"),
BotCommand("config", "查看系统配置"),
BotCommand("run", "处理指定 Team"),
BotCommand("run_all", "处理所有 Team"),
BotCommand("stop", "停止当前任务"),
BotCommand("logs", "查看最近日志"),
BotCommand("headless", "切换无头模式"),
BotCommand("dashboard", "查看 S2A 仪表盘"),
BotCommand("stock", "查看账号库存"),
BotCommand("import", "导入账号到 team.json"),
BotCommand("gptmail_keys", "查看 GPTMail API Keys"),
BotCommand("gptmail_add", "添加 GPTMail API Key"),
BotCommand("gptmail_del", "删除 GPTMail API Key"),
BotCommand("test_email", "测试邮箱创建功能"),
BotCommand("include_owners", "切换 Owner 入库开关"),
BotCommand("reload", "重载配置文件"),
BotCommand("s2a_config", "配置 S2A 服务参数"),
]
try:
await self.app.bot.set_my_commands(commands)
log.info("Bot 命令菜单已设置")
except Exception as e:
log.warning(f"设置命令菜单失败: {e}")
@admin_only
async def cmd_help(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""显示帮助信息"""
help_text = """<b>🤖 OpenAI Team 批量注册 Bot</b>
<b>📋 查看信息:</b>
/list - 查看 team.json 账号列表
/status - 查看任务处理状态
/team &lt;n&gt; - 查看第 n 个 Team 处理详情
/config - 查看系统配置
/logs [n] - 查看最近 n 条日志
<b>🚀 任务控制:</b>
/run &lt;n&gt; - 开始处理第 n 个 Team
/run_all - 开始处理所有 Team
/stop - 停止当前任务
<b>⚙️ 配置管理:</b>
/headless - 开启/关闭无头模式
/include_owners - 开启/关闭 Owner 入库
/reload - 重载配置文件 (无需重启)
<b>📊 S2A 专属:</b>
/dashboard - 查看 S2A 仪表盘
/stock - 查看账号库存
/s2a_config - 配置 S2A 参数
<b>📤 导入账号:</b>
/import - 导入账号到 team.json
或直接发送 JSON 文件
<b>📧 GPTMail 管理:</b>
/gptmail_keys - 查看所有 API Keys
/gptmail_add &lt;key&gt; - 添加 API Key
/gptmail_del &lt;key&gt; - 删除 API Key
/test_email - 测试邮箱创建
<b>💡 示例:</b>
<code>/list</code> - 查看所有待处理账号
<code>/run 0</code> - 处理第一个 Team
<code>/gptmail_add my-api-key</code> - 添加 Key"""
await update.message.reply_text(help_text, parse_mode="HTML")
@admin_only
async def cmd_status(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""查看所有 Team 状态"""
tracker = load_team_tracker()
teams_data = tracker.get("teams", {})
if not teams_data:
await update.message.reply_text("📭 暂无数据,请先运行任务")
return
lines = ["<b>📊 Team 状态总览</b>\n"]
for team_name, accounts in teams_data.items():
total = len(accounts)
completed = sum(1 for a in accounts if a.get("status") == "completed")
failed = sum(1 for a in accounts if "fail" in a.get("status", "").lower())
pending = total - completed - failed
status_icon = "" if completed == total else ("" if failed > 0 else "")
lines.append(
f"{status_icon} <b>{team_name}</b>: {completed}/{total} "
f"(失败:{failed} 待处理:{pending})"
)
# 当前任务状态
if self.current_task and not self.current_task.done():
lines.append(f"\n<b>🔄 运行中:</b> {self.current_team or '未知'}")
await update.message.reply_text("\n".join(lines), parse_mode="HTML")
@admin_only
async def cmd_team(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""查看指定 Team 详情"""
if not context.args:
await update.message.reply_text("用法: /team <序号>\n示例: /team 0")
return
try:
team_idx = int(context.args[0])
except ValueError:
await update.message.reply_text("❌ 无效的序号,必须是数字")
return
if team_idx < 0 or team_idx >= len(TEAMS):
await update.message.reply_text(f"❌ 序号超出范围,有效范围: 0-{len(TEAMS)-1}")
return
team = TEAMS[team_idx]
team_name = team.get("name", f"Team{team_idx}")
tracker = load_team_tracker()
accounts = tracker.get("teams", {}).get(team_name, [])
lines = [f"<b>📁 Team {team_idx}: {team_name}</b>\n"]
lines.append(f"👤 Owner: {team.get('owner_email', '')}")
lines.append(f"📊 账号数: {len(accounts)}\n")
if accounts:
for acc in accounts:
email = acc.get("email", "")
status = acc.get("status", "unknown")
role = acc.get("role", "member")
icon = {"completed": "", "authorized": "🔐", "registered": "📝"}.get(
status, "" if "fail" in status.lower() else ""
)
role_tag = " [Owner]" if role == "owner" else ""
lines.append(f"{icon} {email}{role_tag}")
else:
lines.append("📭 暂无已处理的账号")
await update.message.reply_text("\n".join(lines), parse_mode="HTML")
@admin_only
async def cmd_list(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""查看 team.json 中的账号列表"""
if not TEAMS:
await update.message.reply_text("📭 team.json 中没有账号")
return
lines = [f"<b>📋 team.json 账号列表 (共 {len(TEAMS)} 个)</b>\n"]
for i, team in enumerate(TEAMS):
email = team.get("owner_email", "")
has_token = "🔑" if team.get("auth_token") else "🔒"
authorized = "" if team.get("authorized") else ""
needs_login = " [需登录]" if team.get("needs_login") else ""
lines.append(f"{i}. {has_token} {email}{authorized}{needs_login}")
# 统计
with_token = sum(1 for t in TEAMS if t.get("auth_token"))
authorized = sum(1 for t in TEAMS if t.get("authorized"))
lines.append(f"\n<b>📊 统计:</b>")
lines.append(f"有 Token: {with_token}/{len(TEAMS)}")
lines.append(f"已授权: {authorized}/{len(TEAMS)}")
# 消息太长时分段发送
text = "\n".join(lines)
if len(text) > 4000:
# 分段
for i in range(0, len(lines), 30):
chunk = "\n".join(lines[i:i+30])
await update.message.reply_text(chunk, parse_mode="HTML")
else:
await update.message.reply_text(text, parse_mode="HTML")
@admin_only
async def cmd_config(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""查看当前系统配置"""
# 授权服务地址
if AUTH_PROVIDER == "s2a":
auth_url = S2A_API_BASE or "未配置"
elif AUTH_PROVIDER == "cpa":
auth_url = CPA_API_BASE or "未配置"
else:
auth_url = CRS_API_BASE or "未配置"
# 代理信息
if PROXY_ENABLED and PROXIES:
proxy_info = f"已启用 ({len(PROXIES)} 个)"
else:
proxy_info = "未启用"
# 无头模式状态
headless_status = "✅ 已开启" if BROWSER_HEADLESS else "❌ 未开启"
# Owner 入库状态
include_owners_status = "✅ 已开启" if INCLUDE_TEAM_OWNERS else "❌ 未开启"
lines = [
"<b>⚙️ 系统配置</b>",
"",
"<b>📧 邮箱服务</b>",
f" 提供商: {EMAIL_PROVIDER}",
"",
"<b>🔐 授权服务</b>",
f" 模式: {AUTH_PROVIDER.upper()}",
f" 地址: {auth_url}",
f" Owner 入库: {include_owners_status}",
"",
"<b>🌐 浏览器</b>",
f" 无头模式: {headless_status}",
"",
"<b>👥 账号设置</b>",
f" 每 Team 账号数: {ACCOUNTS_PER_TEAM}",
f" team.json 账号: {len(TEAMS)}",
"",
"<b>🔗 代理</b>",
f" 状态: {proxy_info}",
"",
"<b>💡 提示:</b>",
"/headless - 切换无头模式",
"/include_owners - 切换 Owner 入库",
]
await update.message.reply_text("\n".join(lines), parse_mode="HTML")
@admin_only
async def cmd_headless(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""切换无头模式"""
import tomli_w
try:
# 读取当前配置
with open(CONFIG_FILE, "rb") as f:
import tomllib
config = tomllib.load(f)
# 获取当前状态
current = config.get("browser", {}).get("headless", False)
new_value = not current
# 更新配置
if "browser" not in config:
config["browser"] = {}
config["browser"]["headless"] = new_value
# 写回文件
with open(CONFIG_FILE, "wb") as f:
tomli_w.dump(config, f)
status = "✅ 已开启" if new_value else "❌ 已关闭"
await update.message.reply_text(
f"<b>🌐 无头模式</b>\n\n"
f"状态: {status}\n\n"
f"💡 使用 /reload 立即生效",
parse_mode="HTML"
)
except ImportError:
await update.message.reply_text(
"❌ 缺少 tomli_w 依赖\n"
"请运行: uv add tomli_w"
)
except Exception as e:
await update.message.reply_text(f"❌ 修改配置失败: {e}")
@admin_only
async def cmd_include_owners(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""切换 Owner 入库开关"""
import tomli_w
try:
# 读取当前配置
with open(CONFIG_FILE, "rb") as f:
import tomllib
config = tomllib.load(f)
# 获取当前状态 (顶层配置)
current = config.get("include_team_owners", False)
new_value = not current
# 更新配置 (写到顶层)
config["include_team_owners"] = new_value
# 写回文件
with open(CONFIG_FILE, "wb") as f:
tomli_w.dump(config, f)
status = "✅ 已开启" if new_value else "❌ 已关闭"
desc = "运行任务时会将 Team Owner 账号也入库到授权服务" if new_value else "运行任务时不会入库 Team Owner 账号"
await update.message.reply_text(
f"<b>👤 Owner 入库开关</b>\n\n"
f"状态: {status}\n"
f"{desc}\n\n"
f"💡 使用 /reload 立即生效",
parse_mode="HTML"
)
except ImportError:
await update.message.reply_text(
"❌ 缺少 tomli_w 依赖\n"
"请运行: uv add tomli_w"
)
except Exception as e:
await update.message.reply_text(f"❌ 修改配置失败: {e}")
@admin_only
async def cmd_reload(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""重载配置文件"""
# 检查是否有任务正在运行
if self.current_task and not self.current_task.done():
await update.message.reply_text(
f"⚠️ 有任务正在运行: {self.current_team}\n"
"请等待任务完成或使用 /stop 停止后再重载配置"
)
return
await update.message.reply_text("⏳ 正在重载配置...")
try:
# 调用重载函数
result = reload_config()
if result["success"]:
# 重新导入更新后的配置变量
from config import (
EMAIL_PROVIDER as new_email_provider,
AUTH_PROVIDER as new_auth_provider,
INCLUDE_TEAM_OWNERS as new_include_owners,
BROWSER_HEADLESS as new_headless,
ACCOUNTS_PER_TEAM as new_accounts_per_team,
)
lines = [
"<b>✅ 配置重载成功</b>",
"",
f"📁 team.json: {result['teams_count']} 个账号",
f"📄 config.toml: {'已更新' if result['config_updated'] else '未变化'}",
"",
"<b>当前配置:</b>",
f" 邮箱服务: {new_email_provider}",
f" 授权服务: {new_auth_provider}",
f" Owner 入库: {'' if new_include_owners else ''}",
f" 无头模式: {'' if new_headless else ''}",
f" 每 Team 账号: {new_accounts_per_team}",
]
await update.message.reply_text("\n".join(lines), parse_mode="HTML")
else:
await update.message.reply_text(
f"<b>❌ 配置重载失败</b>\n\n{result['message']}",
parse_mode="HTML"
)
except Exception as e:
await update.message.reply_text(f"❌ 重载配置失败: {e}")
@admin_only
async def cmd_s2a_config(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""配置 S2A 服务参数"""
import tomli_w
# 无参数时显示当前配置
if not context.args:
# 脱敏显示 API Key
key_display = "未配置"
if S2A_ADMIN_KEY:
if len(S2A_ADMIN_KEY) > 10:
key_display = f"{S2A_ADMIN_KEY[:4]}...{S2A_ADMIN_KEY[-4:]}"
else:
key_display = S2A_ADMIN_KEY[:4] + "..."
groups_display = ", ".join(S2A_GROUP_NAMES) if S2A_GROUP_NAMES else "默认分组"
group_ids_display = ", ".join(S2A_GROUP_IDS) if S2A_GROUP_IDS else ""
lines = [
"<b>📊 S2A 服务配置</b>",
"",
f"<b>API 地址:</b> {S2A_API_BASE or '未配置'}",
f"<b>Admin Key:</b> <code>{key_display}</code>",
f"<b>并发数:</b> {S2A_CONCURRENCY}",
f"<b>优先级:</b> {S2A_PRIORITY}",
f"<b>分组名称:</b> {groups_display}",
f"<b>分组 ID:</b> {group_ids_display}",
"",
"<b>💡 修改配置:</b>",
"<code>/s2a_config concurrency 10</code>",
"<code>/s2a_config priority 50</code>",
"<code>/s2a_config groups 分组1,分组2</code>",
"<code>/s2a_config api_base https://...</code>",
"<code>/s2a_config admin_key sk-xxx</code>",
]
await update.message.reply_text("\n".join(lines), parse_mode="HTML")
return
# 解析参数
param = context.args[0].lower()
value = " ".join(context.args[1:]) if len(context.args) > 1 else None
if not value:
await update.message.reply_text(
f"❌ 缺少值\n用法: /s2a_config {param} <值>"
)
return
try:
# 读取当前配置
with open(CONFIG_FILE, "rb") as f:
import tomllib
config = tomllib.load(f)
# 确保 s2a section 存在
if "s2a" not in config:
config["s2a"] = {}
# 根据参数类型处理
updated_key = None
updated_value = None
if param == "concurrency":
try:
new_val = int(value)
if new_val < 1 or new_val > 100:
await update.message.reply_text("❌ 并发数范围: 1-100")
return
config["s2a"]["concurrency"] = new_val
updated_key = "并发数"
updated_value = str(new_val)
except ValueError:
await update.message.reply_text("❌ 并发数必须是数字")
return
elif param == "priority":
try:
new_val = int(value)
if new_val < 0 or new_val > 100:
await update.message.reply_text("❌ 优先级范围: 0-100")
return
config["s2a"]["priority"] = new_val
updated_key = "优先级"
updated_value = str(new_val)
except ValueError:
await update.message.reply_text("❌ 优先级必须是数字")
return
elif param in ("groups", "group_names"):
# 支持逗号分隔的分组名称
groups = [g.strip() for g in value.split(",") if g.strip()]
config["s2a"]["group_names"] = groups
updated_key = "分组名称"
updated_value = ", ".join(groups) if groups else "默认分组"
elif param == "group_ids":
# 支持逗号分隔的分组 ID
ids = [i.strip() for i in value.split(",") if i.strip()]
config["s2a"]["group_ids"] = ids
updated_key = "分组 ID"
updated_value = ", ".join(ids) if ids else ""
elif param == "api_base":
config["s2a"]["api_base"] = value
updated_key = "API 地址"
updated_value = value
elif param == "admin_key":
config["s2a"]["admin_key"] = value
updated_key = "Admin Key"
# 脱敏显示
if len(value) > 10:
updated_value = f"{value[:4]}...{value[-4:]}"
else:
updated_value = value[:4] + "..."
else:
await update.message.reply_text(
f"❌ 未知参数: {param}\n\n"
"可用参数:\n"
"• concurrency - 并发数\n"
"• priority - 优先级\n"
"• groups - 分组名称\n"
"• group_ids - 分组 ID\n"
"• api_base - API 地址\n"
"• admin_key - Admin Key"
)
return
# 写回文件
with open(CONFIG_FILE, "wb") as f:
tomli_w.dump(config, f)
await update.message.reply_text(
f"<b>✅ S2A 配置已更新</b>\n\n"
f"{updated_key}: <code>{updated_value}</code>\n\n"
f"💡 使用 /reload 立即生效",
parse_mode="HTML"
)
except ImportError:
await update.message.reply_text(
"❌ 缺少 tomli_w 依赖\n"
"请运行: uv add tomli_w"
)
except Exception as e:
await update.message.reply_text(f"❌ 修改配置失败: {e}")
@admin_only
async def cmd_run(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""启动处理指定 Team"""
if self.current_task and not self.current_task.done():
await update.message.reply_text(
f"⚠️ 任务正在运行: {self.current_team}\n使用 /stop 停止"
)
return
if not context.args:
await update.message.reply_text("用法: /run <序号>\n示例: /run 0")
return
try:
team_idx = int(context.args[0])
except ValueError:
await update.message.reply_text("❌ 无效的序号,必须是数字")
return
if team_idx < 0 or team_idx >= len(TEAMS):
await update.message.reply_text(f"❌ 序号超出范围,有效范围: 0-{len(TEAMS)-1}")
return
team_name = TEAMS[team_idx].get("name", f"Team{team_idx}")
self.current_team = team_name
await update.message.reply_text(f"🚀 开始处理 Team {team_idx}: {team_name}...")
# 在后台线程执行任务
loop = asyncio.get_event_loop()
self.current_task = loop.run_in_executor(
self.executor,
self._run_team_task,
team_idx
)
# 添加完成回调
self.current_task = asyncio.ensure_future(self._wrap_task(self.current_task, team_name))
@admin_only
async def cmd_run_all(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""启动处理所有 Team"""
if self.current_task and not self.current_task.done():
await update.message.reply_text(
f"⚠️ 任务正在运行: {self.current_team}\n使用 /stop 停止"
)
return
self.current_team = "全部"
await update.message.reply_text(f"🚀 开始处理所有 Team (共 {len(TEAMS)} 个)...")
loop = asyncio.get_event_loop()
self.current_task = loop.run_in_executor(
self.executor,
self._run_all_teams_task
)
self.current_task = asyncio.ensure_future(self._wrap_task(self.current_task, "全部"))
async def _wrap_task(self, task, team_name: str):
"""包装任务以处理完成通知"""
try:
result = await task
success = sum(1 for r in (result or []) if r.get("status") == "completed")
failed = len(result or []) - success
await self.notifier.notify_task_completed(team_name, success, failed)
except Exception as e:
await self.notifier.notify_error(f"任务失败: {team_name}", str(e))
finally:
self.current_team = None
# 清理进度跟踪
progress_finish()
def _run_team_task(self, team_idx: int):
"""执行单个 Team 任务 (在线程池中运行)"""
# 延迟导入避免循环依赖
from run import run_single_team
return run_single_team(team_idx)
def _run_all_teams_task(self):
"""执行所有 Team 任务 (在线程池中运行)"""
from run import run_all_teams
return run_all_teams()
@admin_only
async def cmd_stop(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""强制停止当前任务"""
if not self.current_task or self.current_task.done():
await update.message.reply_text("📭 当前没有运行中的任务")
return
task_name = self.current_team or "未知任务"
await update.message.reply_text(f"🛑 正在强制停止: {task_name}...")
try:
# 1. 设置全局停止标志
try:
import run
run._shutdown_requested = True
except Exception:
pass
# 2. 取消 asyncio 任务
if self.current_task and not self.current_task.done():
self.current_task.cancel()
# 3. 强制关闭浏览器进程
try:
from browser_automation import cleanup_chrome_processes
cleanup_chrome_processes()
except Exception as e:
log.warning(f"清理浏览器进程失败: {e}")
# 4. 重置状态
self.current_team = None
# 5. 重置停止标志 (以便下次任务可以正常运行)
try:
import run
run._shutdown_requested = False
except Exception:
pass
# 清理进度跟踪
progress_finish()
await update.message.reply_text(
f"<b>✅ 任务已强制停止</b>\n\n"
f"已停止: {task_name}\n"
f"已清理浏览器进程\n\n"
f"使用 /status 查看状态",
parse_mode="HTML"
)
except Exception as e:
await update.message.reply_text(f"❌ 停止任务时出错: {e}")
@admin_only
async def cmd_logs(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""查看最近日志"""
try:
n = int(context.args[0]) if context.args else 10
except ValueError:
n = 10
n = min(n, 50) # 限制最大条数
try:
from config import BASE_DIR
log_file = BASE_DIR / "logs" / "app.log"
if not log_file.exists():
await update.message.reply_text("📭 日志文件不存在")
return
with open(log_file, "r", encoding="utf-8", errors="ignore") as f:
lines = f.readlines()
recent = lines[-n:] if len(lines) >= n else lines
if not recent:
await update.message.reply_text("📭 日志文件为空")
return
# 格式化日志 (移除 ANSI 颜色码)
import re
ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
clean_lines = [ansi_escape.sub('', line.strip()) for line in recent]
log_text = "\n".join(clean_lines)
if len(log_text) > 4000:
log_text = log_text[-4000:]
await update.message.reply_text(f"<code>{log_text}</code>", parse_mode="HTML")
except Exception as e:
await update.message.reply_text(f"❌ 读取日志失败: {e}")
@admin_only
async def cmd_dashboard(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""查看 S2A 仪表盘统计"""
if AUTH_PROVIDER != "s2a":
await update.message.reply_text(
f"⚠️ 仪表盘仅支持 S2A 模式\n"
f"当前模式: {AUTH_PROVIDER}"
)
return
await update.message.reply_text("⏳ 正在获取仪表盘数据...")
try:
stats = s2a_get_dashboard_stats()
if stats:
text = format_dashboard_stats(stats)
await update.message.reply_text(text, parse_mode="HTML")
else:
await update.message.reply_text(
"❌ 获取仪表盘数据失败\n"
"请检查 S2A 配置和 API 连接"
)
except Exception as e:
await update.message.reply_text(f"❌ 错误: {e}")
@admin_only
async def cmd_stock(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""查看账号存货"""
if AUTH_PROVIDER != "s2a":
await update.message.reply_text(
f"⚠️ 库存查询仅支持 S2A 模式\n"
f"当前模式: {AUTH_PROVIDER}"
)
return
stats = s2a_get_dashboard_stats()
if not stats:
await update.message.reply_text("❌ 获取库存信息失败")
return
text = self._format_stock_message(stats)
await update.message.reply_text(text, parse_mode="HTML")
async def scheduled_stock_check(self, context: ContextTypes.DEFAULT_TYPE):
"""定时检查账号存货"""
try:
stats = s2a_get_dashboard_stats()
if not stats:
return
normal = stats.get("normal_accounts", 0)
total = stats.get("total_accounts", 0)
# 只在低库存时发送通知
if normal <= TELEGRAM_LOW_STOCK_THRESHOLD:
text = self._format_stock_message(stats, is_alert=True)
for chat_id in TELEGRAM_ADMIN_CHAT_IDS:
try:
await context.bot.send_message(
chat_id=chat_id,
text=text,
parse_mode="HTML"
)
except Exception:
pass
except Exception as e:
log.warning(f"Stock check failed: {e}")
def _format_stock_message(self, stats: dict, is_alert: bool = False) -> str:
"""格式化存货消息"""
total = stats.get("total_accounts", 0)
normal = stats.get("normal_accounts", 0)
error = stats.get("error_accounts", 0)
ratelimit = stats.get("ratelimit_accounts", 0)
overload = stats.get("overload_accounts", 0)
# 计算健康度
health_pct = (normal / total * 100) if total > 0 else 0
# 状态图标
if normal <= TELEGRAM_LOW_STOCK_THRESHOLD:
status_icon = "⚠️ 库存不足"
status_line = f"<b>{status_icon}</b>"
elif health_pct >= 80:
status_icon = "✅ 正常"
status_line = f"<b>{status_icon}</b>"
elif health_pct >= 50:
status_icon = "⚠️ 警告"
status_line = f"<b>{status_icon}</b>"
else:
status_icon = "🔴 严重"
status_line = f"<b>{status_icon}</b>"
title = "🚨 库存不足警报" if is_alert else "📦 账号库存"
lines = [
f"<b>{title}</b>",
"",
f"状态: {status_line}",
f"健康度: {health_pct:.1f}%",
"",
f"正常: <b>{normal}</b>",
f"异常: {error}",
f"限流: {ratelimit}",
f"总计: {total}",
]
if is_alert:
lines.append("")
lines.append(f"预警阈值: {TELEGRAM_LOW_STOCK_THRESHOLD}")
return "\n".join(lines)
@admin_only
async def cmd_import(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""上传账号到 team.json"""
# 获取命令后的 JSON 数据
if not context.args:
await update.message.reply_text(
"<b>📤 导入账号到 team.json</b>\n\n"
"用法:\n"
"1. 直接发送 JSON 文件\n"
"2. /import 后跟 JSON 数据\n\n"
"JSON 格式:\n"
"<code>[{\"account\":\"邮箱\",\"password\":\"密码\",\"token\":\"jwt\"},...]</code>\n\n"
"导入后使用 /run 开始处理",
parse_mode="HTML"
)
return
# 尝试解析 JSON
json_text = " ".join(context.args)
await self._process_import_json(update, json_text)
@admin_only
async def handle_json_file(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""处理上传的 JSON 文件"""
# 检查是否是管理员
user_id = update.effective_user.id
if user_id not in TELEGRAM_ADMIN_CHAT_IDS:
await update.message.reply_text("⛔ 无权限")
return
document = update.message.document
if not document:
return
await update.message.reply_text("⏳ 正在处理 JSON 文件...")
try:
# 下载文件
file = await document.get_file()
file_bytes = await file.download_as_bytearray()
json_text = file_bytes.decode("utf-8")
await self._process_import_json(update, json_text)
except Exception as e:
await update.message.reply_text(f"❌ 读取文件失败: {e}")
async def _process_import_json(self, update: Update, json_text: str):
"""处理导入的 JSON 数据,保存到 team.json"""
import json
from pathlib import Path
try:
new_accounts = json.loads(json_text)
except json.JSONDecodeError as e:
await update.message.reply_text(f"❌ JSON 格式错误: {e}")
return
if not isinstance(new_accounts, list):
# 如果是单个对象,转成列表
new_accounts = [new_accounts]
if not new_accounts:
await update.message.reply_text("📭 JSON 数据中没有账号")
return
# 验证格式
valid_accounts = []
for acc in new_accounts:
if not isinstance(acc, dict):
continue
# 支持 account 或 email 字段
email = acc.get("account") or acc.get("email", "")
token = acc.get("token", "")
password = acc.get("password", "")
if email and token:
valid_accounts.append({
"account": email,
"password": password,
"token": token
})
if not valid_accounts:
await update.message.reply_text("❌ 未找到有效账号 (需要 account/email 和 token 字段)")
return
# 读取现有 team.json (不存在则自动创建)
team_json_path = Path(TEAM_JSON_FILE)
existing_accounts = []
is_new_file = not team_json_path.exists()
if not is_new_file:
try:
with open(team_json_path, "r", encoding="utf-8") as f:
existing_accounts = json.load(f)
if not isinstance(existing_accounts, list):
existing_accounts = [existing_accounts]
except Exception:
existing_accounts = []
# 检查重复
existing_emails = set()
for acc in existing_accounts:
email = acc.get("account") or acc.get("user", {}).get("email", "")
if email:
existing_emails.add(email.lower())
added = 0
skipped = 0
for acc in valid_accounts:
email = acc.get("account", "").lower()
if email in existing_emails:
skipped += 1
else:
existing_accounts.append(acc)
existing_emails.add(email)
added += 1
# 保存到 team.json (自动创建文件)
try:
# 确保父目录存在
team_json_path.parent.mkdir(parents=True, exist_ok=True)
with open(team_json_path, "w", encoding="utf-8") as f:
json.dump(existing_accounts, f, ensure_ascii=False, indent=2)
file_status = "📄 已创建 team.json" if is_new_file else "📄 已更新 team.json"
await update.message.reply_text(
f"<b>✅ 导入完成</b>\n\n"
f"{file_status}\n"
f"新增: {added}\n"
f"跳过 (重复): {skipped}\n"
f"team.json 总数: {len(existing_accounts)}\n\n"
f"💡 使用 /reload 刷新配置\n"
f"使用 /run_all 或 /run &lt;n&gt; 开始处理",
parse_mode="HTML"
)
except Exception as e:
await update.message.reply_text(f"❌ 保存到 team.json 失败: {e}")
# ==================== GPTMail Key 管理命令 ====================
@admin_only
async def cmd_gptmail_keys(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""查看所有 GPTMail API Keys"""
keys = get_gptmail_keys()
config_keys = set(GPTMAIL_API_KEYS)
if not keys:
await update.message.reply_text(
"<b>📧 GPTMail API Keys</b>\n\n"
"📭 暂无配置 Key\n\n"
"使用 /gptmail_add &lt;key&gt; 添加",
parse_mode="HTML"
)
return
lines = [f"<b>📧 GPTMail API Keys (共 {len(keys)} 个)</b>\n"]
for i, key in enumerate(keys):
# 脱敏显示
if len(key) > 10:
masked = f"{key[:4]}...{key[-4:]}"
else:
masked = key[:4] + "..." if len(key) > 4 else key
# 标记来源
source = "📁 配置" if key in config_keys else "🔧 动态"
lines.append(f"{i+1}. <code>{masked}</code> {source}")
lines.append(f"\n<b>💡 管理:</b>")
lines.append(f"/gptmail_add &lt;key&gt; - 添加 Key")
lines.append(f"/gptmail_del &lt;key&gt; - 删除动态 Key")
lines.append(f"/test_email - 测试邮箱创建")
await update.message.reply_text("\n".join(lines), parse_mode="HTML")
@admin_only
async def cmd_gptmail_add(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""添加 GPTMail API Key (支持批量导入)"""
if not context.args:
await update.message.reply_text(
"<b>📧 添加 GPTMail API Key</b>\n\n"
"<b>单个添加:</b>\n"
"<code>/gptmail_add gpt-xxx</code>\n\n"
"<b>批量添加 (空格分隔):</b>\n"
"<code>/gptmail_add key1 key2 key3</code>\n\n"
"<b>批量添加 (换行分隔):</b>\n"
"<code>/gptmail_add key1\nkey2\nkey3</code>",
parse_mode="HTML"
)
return
# 合并所有参数,支持空格和换行分隔
raw_input = " ".join(context.args)
# 按空格和换行分割
keys = []
for part in raw_input.replace("\n", " ").split():
key = part.strip()
if key:
keys.append(key)
if not keys:
await update.message.reply_text("❌ Key 不能为空")
return
# 获取现有 keys
existing_keys = set(get_gptmail_keys())
# 统计结果
added = []
skipped = []
invalid = []
await update.message.reply_text(f"⏳ 正在验证 {len(keys)} 个 Key...")
for key in keys:
# 检查是否已存在
if key in existing_keys:
skipped.append(key)
continue
# 测试 Key 是否有效
success, message = gptmail_service.test_api_key(key)
if not success:
invalid.append(key)
continue
# 添加 Key
if add_gptmail_key(key):
added.append(key)
existing_keys.add(key)
# 生成结果报告
lines = ["<b>📧 GPTMail Key 导入结果</b>\n"]
if added:
lines.append(f"<b>✅ 成功添加:</b> {len(added)}")
for k in added[:5]: # 最多显示5个
masked = f"{k[:4]}...{k[-4:]}" if len(k) > 10 else k
lines.append(f" • <code>{masked}</code>")
if len(added) > 5:
lines.append(f" • ... 等 {len(added)}")
if skipped:
lines.append(f"\n<b>⏭️ 已跳过 (已存在):</b> {len(skipped)}")
if invalid:
lines.append(f"\n<b>❌ 无效 Key:</b> {len(invalid)}")
for k in invalid[:3]: # 最多显示3个
masked = f"{k[:4]}...{k[-4:]}" if len(k) > 10 else k
lines.append(f" • <code>{masked}</code>")
lines.append(f"\n<b>当前 Key 总数:</b> {len(get_gptmail_keys())}")
await update.message.reply_text("\n".join(lines), parse_mode="HTML")
@admin_only
async def cmd_gptmail_del(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""删除 GPTMail API Key"""
if not context.args:
await update.message.reply_text(
"<b>📧 删除 GPTMail API Key</b>\n\n"
"用法: /gptmail_del &lt;key&gt;\n\n"
"注意: 只能删除动态添加的 Key配置文件中的 Key 请直接修改 config.toml",
parse_mode="HTML"
)
return
key = context.args[0].strip()
# 检查是否是配置文件中的 Key
if key in GPTMAIL_API_KEYS:
await update.message.reply_text(
"⚠️ 该 Key 在配置文件中,无法通过 Bot 删除\n"
"请直接修改 config.toml"
)
return
# 删除 Key
if remove_gptmail_key(key):
await update.message.reply_text(
f"<b>✅ Key 已删除</b>\n\n"
f"当前 Key 总数: {len(get_gptmail_keys())}",
parse_mode="HTML"
)
else:
await update.message.reply_text("❌ Key 不存在或删除失败")
@admin_only
async def cmd_test_email(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""测试邮箱创建功能"""
if EMAIL_PROVIDER != "gptmail":
await update.message.reply_text(
f"⚠️ 当前邮箱提供商: {EMAIL_PROVIDER}\n"
f"测试功能仅支持 GPTMail 模式"
)
return
await update.message.reply_text("⏳ 正在测试邮箱创建...")
try:
# 测试创建邮箱
email, password = unified_create_email()
if email:
await update.message.reply_text(
f"<b>✅ 邮箱创建成功</b>\n\n"
f"邮箱: <code>{email}</code>\n"
f"密码: <code>{password}</code>\n\n"
f"当前 Key 数量: {len(get_gptmail_keys())}",
parse_mode="HTML"
)
else:
await update.message.reply_text(
"<b>❌ 邮箱创建失败</b>\n\n"
"请检查 GPTMail API Key 配置",
parse_mode="HTML"
)
except Exception as e:
await update.message.reply_text(f"❌ 测试失败: {e}")
async def main():
"""主函数"""
if not TELEGRAM_ENABLED:
print("Telegram Bot is disabled. Set telegram.enabled = true in config.toml")
sys.exit(1)
if not TELEGRAM_BOT_TOKEN:
print("Telegram Bot Token not configured. Set telegram.bot_token in config.toml")
sys.exit(1)
if not TELEGRAM_ADMIN_CHAT_IDS:
print("No admin chat IDs configured. Set telegram.admin_chat_ids in config.toml")
sys.exit(1)
bot = ProvisionerBot()
# 处理 Ctrl+C
import signal
def signal_handler(sig, frame):
log.info("Shutting down...")
bot.request_shutdown()
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
await bot.start()
if __name__ == "__main__":
asyncio.run(main())