Files
codexTool/telegram_bot.py
2026-01-15 23:53:33 +08:00

792 lines
27 KiB
Python

# ==================== Telegram Bot 主程序 ====================
# 通过 Telegram 远程控制 OpenAI Team 批量注册任务
import asyncio
import sys
from concurrent.futures import ThreadPoolExecutor
from functools import wraps
from typing import Optional
from telegram import Update, Bot
from telegram.ext import (
Application,
CommandHandler,
MessageHandler,
filters,
ContextTypes,
)
from config import (
TELEGRAM_BOT_TOKEN,
TELEGRAM_ADMIN_CHAT_IDS,
TELEGRAM_ENABLED,
TEAMS,
AUTH_PROVIDER,
TEAM_JSON_FILE,
TELEGRAM_CHECK_INTERVAL,
TELEGRAM_LOW_STOCK_THRESHOLD,
CONFIG_FILE,
EMAIL_PROVIDER,
BROWSER_HEADLESS,
ACCOUNTS_PER_TEAM,
PROXY_ENABLED,
PROXIES,
S2A_API_BASE,
CPA_API_BASE,
CRS_API_BASE,
)
from utils import load_team_tracker
from bot_notifier import BotNotifier, set_notifier, progress_finish
from s2a_service import s2a_get_dashboard_stats, format_dashboard_stats
from logger import log
def admin_only(func):
"""装饰器: 仅允许管理员执行命令"""
@wraps(func)
async def wrapper(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
user_id = update.effective_user.id
if user_id not in TELEGRAM_ADMIN_CHAT_IDS:
await update.message.reply_text("⛔ 无权限,你的 ID 不在管理员列表中")
return
return await func(self, update, context)
return wrapper
class ProvisionerBot:
"""OpenAI Team Provisioner Telegram Bot"""
def __init__(self):
self.executor = ThreadPoolExecutor(max_workers=1)
self.current_task: Optional[asyncio.Task] = None
self.current_team: Optional[str] = None
self.app: Optional[Application] = None
self.notifier: Optional[BotNotifier] = None
self._shutdown_event = asyncio.Event()
async def start(self):
"""启动 Bot"""
if not TELEGRAM_BOT_TOKEN:
log.error("Telegram Bot Token not configured")
return
# 创建 Application
self.app = Application.builder().token(TELEGRAM_BOT_TOKEN).build()
# 初始化通知器
self.notifier = BotNotifier(self.app.bot, TELEGRAM_ADMIN_CHAT_IDS)
set_notifier(self.notifier)
# 注册命令处理器
handlers = [
("start", self.cmd_help),
("help", self.cmd_help),
("status", self.cmd_status),
("team", self.cmd_team),
("list", self.cmd_list),
("config", self.cmd_config),
("headless", self.cmd_headless),
("run", self.cmd_run),
("run_all", self.cmd_run_all),
("stop", self.cmd_stop),
("logs", self.cmd_logs),
("dashboard", self.cmd_dashboard),
("import", self.cmd_import),
("stock", self.cmd_stock),
]
for cmd, handler in handlers:
self.app.add_handler(CommandHandler(cmd, handler))
# 注册文件上传处理器 (JSON 文件)
self.app.add_handler(MessageHandler(
filters.Document.MimeType("application/json"),
self.handle_json_file
))
# 注册定时检查任务
if TELEGRAM_CHECK_INTERVAL > 0 and AUTH_PROVIDER == "s2a":
self.app.job_queue.run_repeating(
self.scheduled_stock_check,
interval=TELEGRAM_CHECK_INTERVAL,
first=60, # 启动后1分钟执行第一次
name="stock_check"
)
log.info(f"Stock check scheduled every {TELEGRAM_CHECK_INTERVAL}s")
# 启动通知器
await self.notifier.start()
log.success("Telegram Bot started")
log.info(f"Admin Chat IDs: {TELEGRAM_ADMIN_CHAT_IDS}")
# 发送启动通知
await self.notifier.notify("<b>🤖 Bot 已启动</b>\n准备就绪,发送 /help 查看帮助")
# 运行 Bot
await self.app.initialize()
await self.app.start()
await self.app.updater.start_polling(drop_pending_updates=True)
# 等待关闭信号
await self._shutdown_event.wait()
# 清理
await self.app.updater.stop()
await self.app.stop()
await self.app.shutdown()
await self.notifier.stop()
def request_shutdown(self):
"""请求关闭 Bot"""
self._shutdown_event.set()
@admin_only
async def cmd_help(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""显示帮助信息"""
help_text = """<b>🤖 OpenAI Team 批量注册 Bot</b>
<b>📋 查看信息:</b>
/list - 查看 team.json 账号列表
/status - 查看任务处理状态
/team &lt;n&gt; - 查看第 n 个 Team 处理详情
/config - 查看系统配置
/logs [n] - 查看最近 n 条日志
<b>🚀 任务控制:</b>
/run &lt;n&gt; - 开始处理第 n 个 Team
/run_all - 开始处理所有 Team
/stop - 停止当前任务
<b>⚙️ 配置管理:</b>
/headless - 开启/关闭无头模式
<b>📊 S2A 专属:</b>
/dashboard - 查看 S2A 仪表盘
/stock - 查看账号库存
<b>📤 导入账号:</b>
/import - 导入账号到 team.json
或直接发送 JSON 文件
<b>💡 示例:</b>
<code>/list</code> - 查看所有待处理账号
<code>/run 0</code> - 处理第一个 Team
<code>/config</code> - 查看当前配置"""
await update.message.reply_text(help_text, parse_mode="HTML")
@admin_only
async def cmd_status(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""查看所有 Team 状态"""
tracker = load_team_tracker()
teams_data = tracker.get("teams", {})
if not teams_data:
await update.message.reply_text("📭 暂无数据,请先运行任务")
return
lines = ["<b>📊 Team 状态总览</b>\n"]
for team_name, accounts in teams_data.items():
total = len(accounts)
completed = sum(1 for a in accounts if a.get("status") == "completed")
failed = sum(1 for a in accounts if "fail" in a.get("status", "").lower())
pending = total - completed - failed
status_icon = "" if completed == total else ("" if failed > 0 else "")
lines.append(
f"{status_icon} <b>{team_name}</b>: {completed}/{total} "
f"(失败:{failed} 待处理:{pending})"
)
# 当前任务状态
if self.current_task and not self.current_task.done():
lines.append(f"\n<b>🔄 运行中:</b> {self.current_team or '未知'}")
await update.message.reply_text("\n".join(lines), parse_mode="HTML")
@admin_only
async def cmd_team(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""查看指定 Team 详情"""
if not context.args:
await update.message.reply_text("用法: /team <序号>\n示例: /team 0")
return
try:
team_idx = int(context.args[0])
except ValueError:
await update.message.reply_text("❌ 无效的序号,必须是数字")
return
if team_idx < 0 or team_idx >= len(TEAMS):
await update.message.reply_text(f"❌ 序号超出范围,有效范围: 0-{len(TEAMS)-1}")
return
team = TEAMS[team_idx]
team_name = team.get("name", f"Team{team_idx}")
tracker = load_team_tracker()
accounts = tracker.get("teams", {}).get(team_name, [])
lines = [f"<b>📁 Team {team_idx}: {team_name}</b>\n"]
lines.append(f"👤 Owner: {team.get('owner_email', '')}")
lines.append(f"📊 账号数: {len(accounts)}\n")
if accounts:
for acc in accounts:
email = acc.get("email", "")
status = acc.get("status", "unknown")
role = acc.get("role", "member")
icon = {"completed": "", "authorized": "🔐", "registered": "📝"}.get(
status, "" if "fail" in status.lower() else ""
)
role_tag = " [Owner]" if role == "owner" else ""
lines.append(f"{icon} {email}{role_tag}")
else:
lines.append("📭 暂无已处理的账号")
await update.message.reply_text("\n".join(lines), parse_mode="HTML")
@admin_only
async def cmd_list(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""查看 team.json 中的账号列表"""
if not TEAMS:
await update.message.reply_text("📭 team.json 中没有账号")
return
lines = [f"<b>📋 team.json 账号列表 (共 {len(TEAMS)} 个)</b>\n"]
for i, team in enumerate(TEAMS):
email = team.get("owner_email", "")
has_token = "🔑" if team.get("auth_token") else "🔒"
authorized = "" if team.get("authorized") else ""
needs_login = " [需登录]" if team.get("needs_login") else ""
lines.append(f"{i}. {has_token} {email}{authorized}{needs_login}")
# 统计
with_token = sum(1 for t in TEAMS if t.get("auth_token"))
authorized = sum(1 for t in TEAMS if t.get("authorized"))
lines.append(f"\n<b>📊 统计:</b>")
lines.append(f"有 Token: {with_token}/{len(TEAMS)}")
lines.append(f"已授权: {authorized}/{len(TEAMS)}")
# 消息太长时分段发送
text = "\n".join(lines)
if len(text) > 4000:
# 分段
for i in range(0, len(lines), 30):
chunk = "\n".join(lines[i:i+30])
await update.message.reply_text(chunk, parse_mode="HTML")
else:
await update.message.reply_text(text, parse_mode="HTML")
@admin_only
async def cmd_config(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""查看当前系统配置"""
# 授权服务地址
if AUTH_PROVIDER == "s2a":
auth_url = S2A_API_BASE or "未配置"
elif AUTH_PROVIDER == "cpa":
auth_url = CPA_API_BASE or "未配置"
else:
auth_url = CRS_API_BASE or "未配置"
# 代理信息
if PROXY_ENABLED and PROXIES:
proxy_info = f"已启用 ({len(PROXIES)} 个)"
else:
proxy_info = "未启用"
# 无头模式状态
headless_status = "✅ 已开启" if BROWSER_HEADLESS else "❌ 未开启"
lines = [
"<b>⚙️ 系统配置</b>",
"",
"<b>📧 邮箱服务</b>",
f" 提供商: {EMAIL_PROVIDER}",
"",
"<b>🔐 授权服务</b>",
f" 模式: {AUTH_PROVIDER.upper()}",
f" 地址: {auth_url}",
"",
"<b>🌐 浏览器</b>",
f" 无头模式: {headless_status}",
"",
"<b>👥 账号设置</b>",
f" 每 Team 账号数: {ACCOUNTS_PER_TEAM}",
f" team.json 账号: {len(TEAMS)}",
"",
"<b>🔗 代理</b>",
f" 状态: {proxy_info}",
"",
"<b>💡 提示:</b>",
"使用 /headless 开启/关闭无头模式",
]
await update.message.reply_text("\n".join(lines), parse_mode="HTML")
@admin_only
async def cmd_headless(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""切换无头模式"""
import tomli_w
try:
# 读取当前配置
with open(CONFIG_FILE, "rb") as f:
import tomllib
config = tomllib.load(f)
# 获取当前状态
current = config.get("browser", {}).get("headless", False)
new_value = not current
# 更新配置
if "browser" not in config:
config["browser"] = {}
config["browser"]["headless"] = new_value
# 写回文件
with open(CONFIG_FILE, "wb") as f:
tomli_w.dump(config, f)
status = "✅ 已开启" if new_value else "❌ 已关闭"
await update.message.reply_text(
f"<b>🌐 无头模式</b>\n\n"
f"状态: {status}\n\n"
f"⚠️ 需要重启 Bot 生效",
parse_mode="HTML"
)
except ImportError:
await update.message.reply_text(
"❌ 缺少 tomli_w 依赖\n"
"请运行: uv add tomli_w"
)
except Exception as e:
await update.message.reply_text(f"❌ 修改配置失败: {e}")
@admin_only
async def cmd_run(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""启动处理指定 Team"""
if self.current_task and not self.current_task.done():
await update.message.reply_text(
f"⚠️ 任务正在运行: {self.current_team}\n使用 /stop 停止"
)
return
if not context.args:
await update.message.reply_text("用法: /run <序号>\n示例: /run 0")
return
try:
team_idx = int(context.args[0])
except ValueError:
await update.message.reply_text("❌ 无效的序号,必须是数字")
return
if team_idx < 0 or team_idx >= len(TEAMS):
await update.message.reply_text(f"❌ 序号超出范围,有效范围: 0-{len(TEAMS)-1}")
return
team_name = TEAMS[team_idx].get("name", f"Team{team_idx}")
self.current_team = team_name
await update.message.reply_text(f"🚀 开始处理 Team {team_idx}: {team_name}...")
# 在后台线程执行任务
loop = asyncio.get_event_loop()
self.current_task = loop.run_in_executor(
self.executor,
self._run_team_task,
team_idx
)
# 添加完成回调
self.current_task = asyncio.ensure_future(self._wrap_task(self.current_task, team_name))
@admin_only
async def cmd_run_all(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""启动处理所有 Team"""
if self.current_task and not self.current_task.done():
await update.message.reply_text(
f"⚠️ 任务正在运行: {self.current_team}\n使用 /stop 停止"
)
return
self.current_team = "全部"
await update.message.reply_text(f"🚀 开始处理所有 Team (共 {len(TEAMS)} 个)...")
loop = asyncio.get_event_loop()
self.current_task = loop.run_in_executor(
self.executor,
self._run_all_teams_task
)
self.current_task = asyncio.ensure_future(self._wrap_task(self.current_task, "全部"))
async def _wrap_task(self, task, team_name: str):
"""包装任务以处理完成通知"""
try:
result = await task
success = sum(1 for r in (result or []) if r.get("status") == "completed")
failed = len(result or []) - success
await self.notifier.notify_task_completed(team_name, success, failed)
except Exception as e:
await self.notifier.notify_error(f"任务失败: {team_name}", str(e))
finally:
self.current_team = None
# 清理进度跟踪
progress_finish()
def _run_team_task(self, team_idx: int):
"""执行单个 Team 任务 (在线程池中运行)"""
# 延迟导入避免循环依赖
from run import run_single_team
return run_single_team(team_idx)
def _run_all_teams_task(self):
"""执行所有 Team 任务 (在线程池中运行)"""
from run import run_all_teams
return run_all_teams()
@admin_only
async def cmd_stop(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""停止当前任务"""
if not self.current_task or self.current_task.done():
await update.message.reply_text("📭 当前没有运行中的任务")
return
# 注意: 由于任务在线程池中运行,无法直接取消
# 这里只能发送信号
await update.message.reply_text(
f"🛑 正在停止: {self.current_team}\n"
"注意: 当前账号处理完成后才会停止"
)
# 设置全局停止标志
try:
import run
run._shutdown_requested = True
except Exception:
pass
@admin_only
async def cmd_logs(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""查看最近日志"""
try:
n = int(context.args[0]) if context.args else 10
except ValueError:
n = 10
n = min(n, 50) # 限制最大条数
try:
from config import BASE_DIR
log_file = BASE_DIR / "logs" / "app.log"
if not log_file.exists():
await update.message.reply_text("📭 日志文件不存在")
return
with open(log_file, "r", encoding="utf-8", errors="ignore") as f:
lines = f.readlines()
recent = lines[-n:] if len(lines) >= n else lines
if not recent:
await update.message.reply_text("📭 日志文件为空")
return
# 格式化日志 (移除 ANSI 颜色码)
import re
ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
clean_lines = [ansi_escape.sub('', line.strip()) for line in recent]
log_text = "\n".join(clean_lines)
if len(log_text) > 4000:
log_text = log_text[-4000:]
await update.message.reply_text(f"<code>{log_text}</code>", parse_mode="HTML")
except Exception as e:
await update.message.reply_text(f"❌ 读取日志失败: {e}")
@admin_only
async def cmd_dashboard(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""查看 S2A 仪表盘统计"""
if AUTH_PROVIDER != "s2a":
await update.message.reply_text(
f"⚠️ 仪表盘仅支持 S2A 模式\n"
f"当前模式: {AUTH_PROVIDER}"
)
return
await update.message.reply_text("⏳ 正在获取仪表盘数据...")
try:
stats = s2a_get_dashboard_stats()
if stats:
text = format_dashboard_stats(stats)
await update.message.reply_text(text, parse_mode="HTML")
else:
await update.message.reply_text(
"❌ 获取仪表盘数据失败\n"
"请检查 S2A 配置和 API 连接"
)
except Exception as e:
await update.message.reply_text(f"❌ 错误: {e}")
@admin_only
async def cmd_stock(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""查看账号存货"""
if AUTH_PROVIDER != "s2a":
await update.message.reply_text(
f"⚠️ 库存查询仅支持 S2A 模式\n"
f"当前模式: {AUTH_PROVIDER}"
)
return
stats = s2a_get_dashboard_stats()
if not stats:
await update.message.reply_text("❌ 获取库存信息失败")
return
text = self._format_stock_message(stats)
await update.message.reply_text(text, parse_mode="HTML")
async def scheduled_stock_check(self, context: ContextTypes.DEFAULT_TYPE):
"""定时检查账号存货"""
try:
stats = s2a_get_dashboard_stats()
if not stats:
return
normal = stats.get("normal_accounts", 0)
total = stats.get("total_accounts", 0)
# 只在低库存时发送通知
if normal <= TELEGRAM_LOW_STOCK_THRESHOLD:
text = self._format_stock_message(stats, is_alert=True)
for chat_id in TELEGRAM_ADMIN_CHAT_IDS:
try:
await context.bot.send_message(
chat_id=chat_id,
text=text,
parse_mode="HTML"
)
except Exception:
pass
except Exception as e:
log.warning(f"Stock check failed: {e}")
def _format_stock_message(self, stats: dict, is_alert: bool = False) -> str:
"""格式化存货消息"""
total = stats.get("total_accounts", 0)
normal = stats.get("normal_accounts", 0)
error = stats.get("error_accounts", 0)
ratelimit = stats.get("ratelimit_accounts", 0)
overload = stats.get("overload_accounts", 0)
# 计算健康度
health_pct = (normal / total * 100) if total > 0 else 0
# 状态图标
if normal <= TELEGRAM_LOW_STOCK_THRESHOLD:
status_icon = "⚠️ 库存不足"
status_line = f"<b>{status_icon}</b>"
elif health_pct >= 80:
status_icon = "✅ 正常"
status_line = f"<b>{status_icon}</b>"
elif health_pct >= 50:
status_icon = "⚠️ 警告"
status_line = f"<b>{status_icon}</b>"
else:
status_icon = "🔴 严重"
status_line = f"<b>{status_icon}</b>"
title = "🚨 库存不足警报" if is_alert else "📦 账号库存"
lines = [
f"<b>{title}</b>",
"",
f"状态: {status_line}",
f"健康度: {health_pct:.1f}%",
"",
f"正常: <b>{normal}</b>",
f"异常: {error}",
f"限流: {ratelimit}",
f"总计: {total}",
]
if is_alert:
lines.append("")
lines.append(f"预警阈值: {TELEGRAM_LOW_STOCK_THRESHOLD}")
return "\n".join(lines)
@admin_only
async def cmd_import(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""上传账号到 team.json"""
# 获取命令后的 JSON 数据
if not context.args:
await update.message.reply_text(
"<b>📤 导入账号到 team.json</b>\n\n"
"用法:\n"
"1. 直接发送 JSON 文件\n"
"2. /import 后跟 JSON 数据\n\n"
"JSON 格式:\n"
"<code>[{\"account\":\"邮箱\",\"password\":\"密码\",\"token\":\"jwt\"},...]</code>\n\n"
"导入后使用 /run 开始处理",
parse_mode="HTML"
)
return
# 尝试解析 JSON
json_text = " ".join(context.args)
await self._process_import_json(update, json_text)
@admin_only
async def handle_json_file(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""处理上传的 JSON 文件"""
# 检查是否是管理员
user_id = update.effective_user.id
if user_id not in TELEGRAM_ADMIN_CHAT_IDS:
await update.message.reply_text("⛔ 无权限")
return
document = update.message.document
if not document:
return
await update.message.reply_text("⏳ 正在处理 JSON 文件...")
try:
# 下载文件
file = await document.get_file()
file_bytes = await file.download_as_bytearray()
json_text = file_bytes.decode("utf-8")
await self._process_import_json(update, json_text)
except Exception as e:
await update.message.reply_text(f"❌ 读取文件失败: {e}")
async def _process_import_json(self, update: Update, json_text: str):
"""处理导入的 JSON 数据,保存到 team.json"""
import json
from pathlib import Path
try:
new_accounts = json.loads(json_text)
except json.JSONDecodeError as e:
await update.message.reply_text(f"❌ JSON 格式错误: {e}")
return
if not isinstance(new_accounts, list):
# 如果是单个对象,转成列表
new_accounts = [new_accounts]
if not new_accounts:
await update.message.reply_text("📭 JSON 数据中没有账号")
return
# 验证格式
valid_accounts = []
for acc in new_accounts:
if not isinstance(acc, dict):
continue
# 支持 account 或 email 字段
email = acc.get("account") or acc.get("email", "")
token = acc.get("token", "")
password = acc.get("password", "")
if email and token:
valid_accounts.append({
"account": email,
"password": password,
"token": token
})
if not valid_accounts:
await update.message.reply_text("❌ 未找到有效账号 (需要 account/email 和 token 字段)")
return
# 读取现有 team.json
team_json_path = Path(TEAM_JSON_FILE)
existing_accounts = []
if team_json_path.exists():
try:
with open(team_json_path, "r", encoding="utf-8") as f:
existing_accounts = json.load(f)
if not isinstance(existing_accounts, list):
existing_accounts = [existing_accounts]
except Exception:
existing_accounts = []
# 检查重复
existing_emails = set()
for acc in existing_accounts:
email = acc.get("account") or acc.get("user", {}).get("email", "")
if email:
existing_emails.add(email.lower())
added = 0
skipped = 0
for acc in valid_accounts:
email = acc.get("account", "").lower()
if email in existing_emails:
skipped += 1
else:
existing_accounts.append(acc)
existing_emails.add(email)
added += 1
# 保存到 team.json
try:
with open(team_json_path, "w", encoding="utf-8") as f:
json.dump(existing_accounts, f, ensure_ascii=False, indent=2)
await update.message.reply_text(
f"<b>✅ 导入完成</b>\n\n"
f"新增: {added}\n"
f"跳过 (重复): {skipped}\n"
f"team.json 总数: {len(existing_accounts)}\n\n"
f"使用 /run_all 或 /run &lt;n&gt; 开始处理",
parse_mode="HTML"
)
except Exception as e:
await update.message.reply_text(f"❌ 保存到 team.json 失败: {e}")
async def main():
"""主函数"""
if not TELEGRAM_ENABLED:
print("Telegram Bot is disabled. Set telegram.enabled = true in config.toml")
sys.exit(1)
if not TELEGRAM_BOT_TOKEN:
print("Telegram Bot Token not configured. Set telegram.bot_token in config.toml")
sys.exit(1)
if not TELEGRAM_ADMIN_CHAT_IDS:
print("No admin chat IDs configured. Set telegram.admin_chat_ids in config.toml")
sys.exit(1)
bot = ProvisionerBot()
# 处理 Ctrl+C
import signal
def signal_handler(sig, frame):
log.info("Shutting down...")
bot.request_shutdown()
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
await bot.start()
if __name__ == "__main__":
asyncio.run(main())