# ==================== Telegram Bot 主程序 ==================== # 通过 Telegram 远程控制 OpenAI Team 批量注册任务 import asyncio import sys from concurrent.futures import ThreadPoolExecutor from functools import wraps from typing import Optional from telegram import Update, Bot from telegram.ext import ( Application, CommandHandler, MessageHandler, filters, ContextTypes, ) from config import ( TELEGRAM_BOT_TOKEN, TELEGRAM_ADMIN_CHAT_IDS, TELEGRAM_ENABLED, TEAMS, AUTH_PROVIDER, TEAM_JSON_FILE, TELEGRAM_CHECK_INTERVAL, TELEGRAM_LOW_STOCK_THRESHOLD, CONFIG_FILE, EMAIL_PROVIDER, BROWSER_HEADLESS, ACCOUNTS_PER_TEAM, PROXY_ENABLED, PROXIES, S2A_API_BASE, CPA_API_BASE, CRS_API_BASE, ) from utils import load_team_tracker from bot_notifier import BotNotifier, set_notifier, progress_finish from s2a_service import s2a_get_dashboard_stats, format_dashboard_stats from logger import log def admin_only(func): """装饰器: 仅允许管理员执行命令""" @wraps(func) async def wrapper(self, update: Update, context: ContextTypes.DEFAULT_TYPE): user_id = update.effective_user.id if user_id not in TELEGRAM_ADMIN_CHAT_IDS: await update.message.reply_text("⛔ 无权限,你的 ID 不在管理员列表中") return return await func(self, update, context) return wrapper class ProvisionerBot: """OpenAI Team Provisioner Telegram Bot""" def __init__(self): self.executor = ThreadPoolExecutor(max_workers=1) self.current_task: Optional[asyncio.Task] = None self.current_team: Optional[str] = None self.app: Optional[Application] = None self.notifier: Optional[BotNotifier] = None self._shutdown_event = asyncio.Event() async def start(self): """启动 Bot""" if not TELEGRAM_BOT_TOKEN: log.error("Telegram Bot Token not configured") return # 创建 Application self.app = Application.builder().token(TELEGRAM_BOT_TOKEN).build() # 初始化通知器 self.notifier = BotNotifier(self.app.bot, TELEGRAM_ADMIN_CHAT_IDS) set_notifier(self.notifier) # 注册命令处理器 handlers = [ ("start", self.cmd_help), ("help", self.cmd_help), ("status", self.cmd_status), ("team", self.cmd_team), ("list", self.cmd_list), ("config", self.cmd_config), ("headless", self.cmd_headless), ("run", self.cmd_run), ("run_all", self.cmd_run_all), ("stop", self.cmd_stop), ("logs", self.cmd_logs), ("dashboard", self.cmd_dashboard), ("import", self.cmd_import), ("stock", self.cmd_stock), ] for cmd, handler in handlers: self.app.add_handler(CommandHandler(cmd, handler)) # 注册文件上传处理器 (JSON 文件) self.app.add_handler(MessageHandler( filters.Document.MimeType("application/json"), self.handle_json_file )) # 注册定时检查任务 if TELEGRAM_CHECK_INTERVAL > 0 and AUTH_PROVIDER == "s2a": self.app.job_queue.run_repeating( self.scheduled_stock_check, interval=TELEGRAM_CHECK_INTERVAL, first=60, # 启动后1分钟执行第一次 name="stock_check" ) log.info(f"Stock check scheduled every {TELEGRAM_CHECK_INTERVAL}s") # 启动通知器 await self.notifier.start() log.success("Telegram Bot started") log.info(f"Admin Chat IDs: {TELEGRAM_ADMIN_CHAT_IDS}") # 发送启动通知 await self.notifier.notify("🤖 Bot 已启动\n准备就绪,发送 /help 查看帮助") # 运行 Bot await self.app.initialize() await self.app.start() await self.app.updater.start_polling(drop_pending_updates=True) # 等待关闭信号 await self._shutdown_event.wait() # 清理 await self.app.updater.stop() await self.app.stop() await self.app.shutdown() await self.notifier.stop() def request_shutdown(self): """请求关闭 Bot""" self._shutdown_event.set() @admin_only async def cmd_help(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """显示帮助信息""" help_text = """🤖 OpenAI Team 批量注册 Bot 📋 查看信息: /list - 查看 team.json 账号列表 /status - 查看任务处理状态 /team <n> - 查看第 n 个 Team 处理详情 /config - 查看系统配置 /logs [n] - 查看最近 n 条日志 🚀 任务控制: /run <n> - 开始处理第 n 个 Team /run_all - 开始处理所有 Team /stop - 停止当前任务 ⚙️ 配置管理: /headless - 开启/关闭无头模式 📊 S2A 专属: /dashboard - 查看 S2A 仪表盘 /stock - 查看账号库存 📤 导入账号: /import - 导入账号到 team.json 或直接发送 JSON 文件 💡 示例: /list - 查看所有待处理账号 /run 0 - 处理第一个 Team /config - 查看当前配置""" await update.message.reply_text(help_text, parse_mode="HTML") @admin_only async def cmd_status(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """查看所有 Team 状态""" tracker = load_team_tracker() teams_data = tracker.get("teams", {}) if not teams_data: await update.message.reply_text("📭 暂无数据,请先运行任务") return lines = ["📊 Team 状态总览\n"] for team_name, accounts in teams_data.items(): total = len(accounts) completed = sum(1 for a in accounts if a.get("status") == "completed") failed = sum(1 for a in accounts if "fail" in a.get("status", "").lower()) pending = total - completed - failed status_icon = "✅" if completed == total else ("❌" if failed > 0 else "⏳") lines.append( f"{status_icon} {team_name}: {completed}/{total} " f"(失败:{failed} 待处理:{pending})" ) # 当前任务状态 if self.current_task and not self.current_task.done(): lines.append(f"\n🔄 运行中: {self.current_team or '未知'}") await update.message.reply_text("\n".join(lines), parse_mode="HTML") @admin_only async def cmd_team(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """查看指定 Team 详情""" if not context.args: await update.message.reply_text("用法: /team <序号>\n示例: /team 0") return try: team_idx = int(context.args[0]) except ValueError: await update.message.reply_text("❌ 无效的序号,必须是数字") return if team_idx < 0 or team_idx >= len(TEAMS): await update.message.reply_text(f"❌ 序号超出范围,有效范围: 0-{len(TEAMS)-1}") return team = TEAMS[team_idx] team_name = team.get("name", f"Team{team_idx}") tracker = load_team_tracker() accounts = tracker.get("teams", {}).get(team_name, []) lines = [f"📁 Team {team_idx}: {team_name}\n"] lines.append(f"👤 Owner: {team.get('owner_email', '无')}") lines.append(f"📊 账号数: {len(accounts)}\n") if accounts: for acc in accounts: email = acc.get("email", "") status = acc.get("status", "unknown") role = acc.get("role", "member") icon = {"completed": "✅", "authorized": "🔐", "registered": "📝"}.get( status, "❌" if "fail" in status.lower() else "⏳" ) role_tag = " [Owner]" if role == "owner" else "" lines.append(f"{icon} {email}{role_tag}") else: lines.append("📭 暂无已处理的账号") await update.message.reply_text("\n".join(lines), parse_mode="HTML") @admin_only async def cmd_list(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """查看 team.json 中的账号列表""" if not TEAMS: await update.message.reply_text("📭 team.json 中没有账号") return lines = [f"📋 team.json 账号列表 (共 {len(TEAMS)} 个)\n"] for i, team in enumerate(TEAMS): email = team.get("owner_email", "") has_token = "🔑" if team.get("auth_token") else "🔒" authorized = "✅" if team.get("authorized") else "" needs_login = " [需登录]" if team.get("needs_login") else "" lines.append(f"{i}. {has_token} {email}{authorized}{needs_login}") # 统计 with_token = sum(1 for t in TEAMS if t.get("auth_token")) authorized = sum(1 for t in TEAMS if t.get("authorized")) lines.append(f"\n📊 统计:") lines.append(f"有 Token: {with_token}/{len(TEAMS)}") lines.append(f"已授权: {authorized}/{len(TEAMS)}") # 消息太长时分段发送 text = "\n".join(lines) if len(text) > 4000: # 分段 for i in range(0, len(lines), 30): chunk = "\n".join(lines[i:i+30]) await update.message.reply_text(chunk, parse_mode="HTML") else: await update.message.reply_text(text, parse_mode="HTML") @admin_only async def cmd_config(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """查看当前系统配置""" # 授权服务地址 if AUTH_PROVIDER == "s2a": auth_url = S2A_API_BASE or "未配置" elif AUTH_PROVIDER == "cpa": auth_url = CPA_API_BASE or "未配置" else: auth_url = CRS_API_BASE or "未配置" # 代理信息 if PROXY_ENABLED and PROXIES: proxy_info = f"已启用 ({len(PROXIES)} 个)" else: proxy_info = "未启用" # 无头模式状态 headless_status = "✅ 已开启" if BROWSER_HEADLESS else "❌ 未开启" lines = [ "⚙️ 系统配置", "", "📧 邮箱服务", f" 提供商: {EMAIL_PROVIDER}", "", "🔐 授权服务", f" 模式: {AUTH_PROVIDER.upper()}", f" 地址: {auth_url}", "", "🌐 浏览器", f" 无头模式: {headless_status}", "", "👥 账号设置", f" 每 Team 账号数: {ACCOUNTS_PER_TEAM}", f" team.json 账号: {len(TEAMS)}", "", "🔗 代理", f" 状态: {proxy_info}", "", "💡 提示:", "使用 /headless 开启/关闭无头模式", ] await update.message.reply_text("\n".join(lines), parse_mode="HTML") @admin_only async def cmd_headless(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """切换无头模式""" import tomli_w try: # 读取当前配置 with open(CONFIG_FILE, "rb") as f: import tomllib config = tomllib.load(f) # 获取当前状态 current = config.get("browser", {}).get("headless", False) new_value = not current # 更新配置 if "browser" not in config: config["browser"] = {} config["browser"]["headless"] = new_value # 写回文件 with open(CONFIG_FILE, "wb") as f: tomli_w.dump(config, f) status = "✅ 已开启" if new_value else "❌ 已关闭" await update.message.reply_text( f"🌐 无头模式\n\n" f"状态: {status}\n\n" f"⚠️ 需要重启 Bot 生效", parse_mode="HTML" ) except ImportError: await update.message.reply_text( "❌ 缺少 tomli_w 依赖\n" "请运行: uv add tomli_w" ) except Exception as e: await update.message.reply_text(f"❌ 修改配置失败: {e}") @admin_only async def cmd_run(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """启动处理指定 Team""" if self.current_task and not self.current_task.done(): await update.message.reply_text( f"⚠️ 任务正在运行: {self.current_team}\n使用 /stop 停止" ) return if not context.args: await update.message.reply_text("用法: /run <序号>\n示例: /run 0") return try: team_idx = int(context.args[0]) except ValueError: await update.message.reply_text("❌ 无效的序号,必须是数字") return if team_idx < 0 or team_idx >= len(TEAMS): await update.message.reply_text(f"❌ 序号超出范围,有效范围: 0-{len(TEAMS)-1}") return team_name = TEAMS[team_idx].get("name", f"Team{team_idx}") self.current_team = team_name await update.message.reply_text(f"🚀 开始处理 Team {team_idx}: {team_name}...") # 在后台线程执行任务 loop = asyncio.get_event_loop() self.current_task = loop.run_in_executor( self.executor, self._run_team_task, team_idx ) # 添加完成回调 self.current_task = asyncio.ensure_future(self._wrap_task(self.current_task, team_name)) @admin_only async def cmd_run_all(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """启动处理所有 Team""" if self.current_task and not self.current_task.done(): await update.message.reply_text( f"⚠️ 任务正在运行: {self.current_team}\n使用 /stop 停止" ) return self.current_team = "全部" await update.message.reply_text(f"🚀 开始处理所有 Team (共 {len(TEAMS)} 个)...") loop = asyncio.get_event_loop() self.current_task = loop.run_in_executor( self.executor, self._run_all_teams_task ) self.current_task = asyncio.ensure_future(self._wrap_task(self.current_task, "全部")) async def _wrap_task(self, task, team_name: str): """包装任务以处理完成通知""" try: result = await task success = sum(1 for r in (result or []) if r.get("status") == "completed") failed = len(result or []) - success await self.notifier.notify_task_completed(team_name, success, failed) except Exception as e: await self.notifier.notify_error(f"任务失败: {team_name}", str(e)) finally: self.current_team = None # 清理进度跟踪 progress_finish() def _run_team_task(self, team_idx: int): """执行单个 Team 任务 (在线程池中运行)""" # 延迟导入避免循环依赖 from run import run_single_team return run_single_team(team_idx) def _run_all_teams_task(self): """执行所有 Team 任务 (在线程池中运行)""" from run import run_all_teams return run_all_teams() @admin_only async def cmd_stop(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """停止当前任务""" if not self.current_task or self.current_task.done(): await update.message.reply_text("📭 当前没有运行中的任务") return # 注意: 由于任务在线程池中运行,无法直接取消 # 这里只能发送信号 await update.message.reply_text( f"🛑 正在停止: {self.current_team}\n" "注意: 当前账号处理完成后才会停止" ) # 设置全局停止标志 try: import run run._shutdown_requested = True except Exception: pass @admin_only async def cmd_logs(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """查看最近日志""" try: n = int(context.args[0]) if context.args else 10 except ValueError: n = 10 n = min(n, 50) # 限制最大条数 try: from config import BASE_DIR log_file = BASE_DIR / "logs" / "app.log" if not log_file.exists(): await update.message.reply_text("📭 日志文件不存在") return with open(log_file, "r", encoding="utf-8", errors="ignore") as f: lines = f.readlines() recent = lines[-n:] if len(lines) >= n else lines if not recent: await update.message.reply_text("📭 日志文件为空") return # 格式化日志 (移除 ANSI 颜色码) import re ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])') clean_lines = [ansi_escape.sub('', line.strip()) for line in recent] log_text = "\n".join(clean_lines) if len(log_text) > 4000: log_text = log_text[-4000:] await update.message.reply_text(f"{log_text}", parse_mode="HTML") except Exception as e: await update.message.reply_text(f"❌ 读取日志失败: {e}") @admin_only async def cmd_dashboard(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """查看 S2A 仪表盘统计""" if AUTH_PROVIDER != "s2a": await update.message.reply_text( f"⚠️ 仪表盘仅支持 S2A 模式\n" f"当前模式: {AUTH_PROVIDER}" ) return await update.message.reply_text("⏳ 正在获取仪表盘数据...") try: stats = s2a_get_dashboard_stats() if stats: text = format_dashboard_stats(stats) await update.message.reply_text(text, parse_mode="HTML") else: await update.message.reply_text( "❌ 获取仪表盘数据失败\n" "请检查 S2A 配置和 API 连接" ) except Exception as e: await update.message.reply_text(f"❌ 错误: {e}") @admin_only async def cmd_stock(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """查看账号存货""" if AUTH_PROVIDER != "s2a": await update.message.reply_text( f"⚠️ 库存查询仅支持 S2A 模式\n" f"当前模式: {AUTH_PROVIDER}" ) return stats = s2a_get_dashboard_stats() if not stats: await update.message.reply_text("❌ 获取库存信息失败") return text = self._format_stock_message(stats) await update.message.reply_text(text, parse_mode="HTML") async def scheduled_stock_check(self, context: ContextTypes.DEFAULT_TYPE): """定时检查账号存货""" try: stats = s2a_get_dashboard_stats() if not stats: return normal = stats.get("normal_accounts", 0) total = stats.get("total_accounts", 0) # 只在低库存时发送通知 if normal <= TELEGRAM_LOW_STOCK_THRESHOLD: text = self._format_stock_message(stats, is_alert=True) for chat_id in TELEGRAM_ADMIN_CHAT_IDS: try: await context.bot.send_message( chat_id=chat_id, text=text, parse_mode="HTML" ) except Exception: pass except Exception as e: log.warning(f"Stock check failed: {e}") def _format_stock_message(self, stats: dict, is_alert: bool = False) -> str: """格式化存货消息""" total = stats.get("total_accounts", 0) normal = stats.get("normal_accounts", 0) error = stats.get("error_accounts", 0) ratelimit = stats.get("ratelimit_accounts", 0) overload = stats.get("overload_accounts", 0) # 计算健康度 health_pct = (normal / total * 100) if total > 0 else 0 # 状态图标 if normal <= TELEGRAM_LOW_STOCK_THRESHOLD: status_icon = "⚠️ 库存不足" status_line = f"{status_icon}" elif health_pct >= 80: status_icon = "✅ 正常" status_line = f"{status_icon}" elif health_pct >= 50: status_icon = "⚠️ 警告" status_line = f"{status_icon}" else: status_icon = "🔴 严重" status_line = f"{status_icon}" title = "🚨 库存不足警报" if is_alert else "📦 账号库存" lines = [ f"{title}", "", f"状态: {status_line}", f"健康度: {health_pct:.1f}%", "", f"正常: {normal}", f"异常: {error}", f"限流: {ratelimit}", f"总计: {total}", ] if is_alert: lines.append("") lines.append(f"预警阈值: {TELEGRAM_LOW_STOCK_THRESHOLD}") return "\n".join(lines) @admin_only async def cmd_import(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """上传账号到 team.json""" # 获取命令后的 JSON 数据 if not context.args: await update.message.reply_text( "📤 导入账号到 team.json\n\n" "用法:\n" "1. 直接发送 JSON 文件\n" "2. /import 后跟 JSON 数据\n\n" "JSON 格式:\n" "[{\"account\":\"邮箱\",\"password\":\"密码\",\"token\":\"jwt\"},...]\n\n" "导入后使用 /run 开始处理", parse_mode="HTML" ) return # 尝试解析 JSON json_text = " ".join(context.args) await self._process_import_json(update, json_text) @admin_only async def handle_json_file(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """处理上传的 JSON 文件""" # 检查是否是管理员 user_id = update.effective_user.id if user_id not in TELEGRAM_ADMIN_CHAT_IDS: await update.message.reply_text("⛔ 无权限") return document = update.message.document if not document: return await update.message.reply_text("⏳ 正在处理 JSON 文件...") try: # 下载文件 file = await document.get_file() file_bytes = await file.download_as_bytearray() json_text = file_bytes.decode("utf-8") await self._process_import_json(update, json_text) except Exception as e: await update.message.reply_text(f"❌ 读取文件失败: {e}") async def _process_import_json(self, update: Update, json_text: str): """处理导入的 JSON 数据,保存到 team.json""" import json from pathlib import Path try: new_accounts = json.loads(json_text) except json.JSONDecodeError as e: await update.message.reply_text(f"❌ JSON 格式错误: {e}") return if not isinstance(new_accounts, list): # 如果是单个对象,转成列表 new_accounts = [new_accounts] if not new_accounts: await update.message.reply_text("📭 JSON 数据中没有账号") return # 验证格式 valid_accounts = [] for acc in new_accounts: if not isinstance(acc, dict): continue # 支持 account 或 email 字段 email = acc.get("account") or acc.get("email", "") token = acc.get("token", "") password = acc.get("password", "") if email and token: valid_accounts.append({ "account": email, "password": password, "token": token }) if not valid_accounts: await update.message.reply_text("❌ 未找到有效账号 (需要 account/email 和 token 字段)") return # 读取现有 team.json team_json_path = Path(TEAM_JSON_FILE) existing_accounts = [] if team_json_path.exists(): try: with open(team_json_path, "r", encoding="utf-8") as f: existing_accounts = json.load(f) if not isinstance(existing_accounts, list): existing_accounts = [existing_accounts] except Exception: existing_accounts = [] # 检查重复 existing_emails = set() for acc in existing_accounts: email = acc.get("account") or acc.get("user", {}).get("email", "") if email: existing_emails.add(email.lower()) added = 0 skipped = 0 for acc in valid_accounts: email = acc.get("account", "").lower() if email in existing_emails: skipped += 1 else: existing_accounts.append(acc) existing_emails.add(email) added += 1 # 保存到 team.json try: with open(team_json_path, "w", encoding="utf-8") as f: json.dump(existing_accounts, f, ensure_ascii=False, indent=2) await update.message.reply_text( f"✅ 导入完成\n\n" f"新增: {added}\n" f"跳过 (重复): {skipped}\n" f"team.json 总数: {len(existing_accounts)}\n\n" f"使用 /run_all 或 /run <n> 开始处理", parse_mode="HTML" ) except Exception as e: await update.message.reply_text(f"❌ 保存到 team.json 失败: {e}") async def main(): """主函数""" if not TELEGRAM_ENABLED: print("Telegram Bot is disabled. Set telegram.enabled = true in config.toml") sys.exit(1) if not TELEGRAM_BOT_TOKEN: print("Telegram Bot Token not configured. Set telegram.bot_token in config.toml") sys.exit(1) if not TELEGRAM_ADMIN_CHAT_IDS: print("No admin chat IDs configured. Set telegram.admin_chat_ids in config.toml") sys.exit(1) bot = ProvisionerBot() # 处理 Ctrl+C import signal def signal_handler(sig, frame): log.info("Shutting down...") bot.request_shutdown() signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) await bot.start() if __name__ == "__main__": asyncio.run(main())