# ==================== Telegram Bot 主程序 ==================== # 通过 Telegram 远程控制 OpenAI Team 批量注册任务 import asyncio import sys from concurrent.futures import ThreadPoolExecutor from functools import wraps from typing import Optional from telegram import Update, Bot, BotCommand from telegram.ext import ( Application, CommandHandler, MessageHandler, filters, ContextTypes, ) from config import ( TELEGRAM_BOT_TOKEN, TELEGRAM_ADMIN_CHAT_IDS, TEAMS, AUTH_PROVIDER, TEAM_JSON_FILE, TELEGRAM_CHECK_INTERVAL, TELEGRAM_LOW_STOCK_THRESHOLD, CONFIG_FILE, EMAIL_PROVIDER, ACCOUNTS_PER_TEAM, PROXY_ENABLED, PROXIES, S2A_API_BASE, CPA_API_BASE, CRS_API_BASE, get_gptmail_keys, add_gptmail_key, remove_gptmail_key, GPTMAIL_API_KEYS, INCLUDE_TEAM_OWNERS, reload_config, S2A_CONCURRENCY, S2A_PRIORITY, S2A_GROUP_NAMES, S2A_GROUP_IDS, S2A_ADMIN_KEY, ) from utils import load_team_tracker, get_all_incomplete_accounts from bot_notifier import BotNotifier, set_notifier, progress_finish from s2a_service import s2a_get_dashboard_stats, format_dashboard_stats from email_service import gptmail_service, unified_create_email from logger import log def admin_only(func): """装饰器: 仅允许管理员执行命令""" @wraps(func) async def wrapper(self, update: Update, context: ContextTypes.DEFAULT_TYPE): user_id = update.effective_user.id if user_id not in TELEGRAM_ADMIN_CHAT_IDS: await update.message.reply_text("⛔ 无权限,你的 ID 不在管理员列表中") return return await func(self, update, context) return wrapper class ProvisionerBot: """OpenAI Team Provisioner Telegram Bot""" def __init__(self): self.executor = ThreadPoolExecutor(max_workers=1) self.current_task: Optional[asyncio.Task] = None self.current_team: Optional[str] = None self.app: Optional[Application] = None self.notifier: Optional[BotNotifier] = None self._shutdown_event = asyncio.Event() async def start(self): """启动 Bot""" if not TELEGRAM_BOT_TOKEN: log.error("Telegram Bot Token not configured") return # 创建 Application self.app = Application.builder().token(TELEGRAM_BOT_TOKEN).build() # 初始化通知器 self.notifier = BotNotifier(self.app.bot, TELEGRAM_ADMIN_CHAT_IDS) set_notifier(self.notifier) # 注册命令处理器 handlers = [ ("start", self.cmd_help), ("help", self.cmd_help), ("status", self.cmd_status), ("team", self.cmd_team), ("list", self.cmd_list), ("config", self.cmd_config), ("fingerprint", self.cmd_fingerprint), ("run", self.cmd_run), ("run_all", self.cmd_run_all), ("resume", self.cmd_resume), ("stop", self.cmd_stop), ("logs", self.cmd_logs), ("logs_live", self.cmd_logs_live), ("logs_stop", self.cmd_logs_stop), ("dashboard", self.cmd_dashboard), ("import", self.cmd_import), ("stock", self.cmd_stock), ("gptmail_keys", self.cmd_gptmail_keys), ("gptmail_add", self.cmd_gptmail_add), ("gptmail_del", self.cmd_gptmail_del), ("test_email", self.cmd_test_email), ("include_owners", self.cmd_include_owners), ("reload", self.cmd_reload), ("s2a_config", self.cmd_s2a_config), ] for cmd, handler in handlers: self.app.add_handler(CommandHandler(cmd, handler)) # 注册文件上传处理器 (JSON 文件) self.app.add_handler(MessageHandler( filters.Document.MimeType("application/json"), self.handle_json_file )) # 注册定时检查任务 if TELEGRAM_CHECK_INTERVAL > 0 and AUTH_PROVIDER == "s2a": self.app.job_queue.run_repeating( self.scheduled_stock_check, interval=TELEGRAM_CHECK_INTERVAL, first=60, # 启动后1分钟执行第一次 name="stock_check" ) log.info(f"Stock check scheduled every {TELEGRAM_CHECK_INTERVAL}s") # 启动通知器 await self.notifier.start() log.success("Telegram Bot started") log.info(f"Admin Chat IDs: {TELEGRAM_ADMIN_CHAT_IDS}") # 发送启动通知 await self.notifier.notify("🤖 Bot 已启动\n准备就绪,发送 /help 查看帮助") # 运行 Bot await self.app.initialize() await self.app.start() # 设置命令菜单提示 await self._set_commands() await self.app.updater.start_polling(drop_pending_updates=True) # 等待关闭信号 await self._shutdown_event.wait() # 清理 await self.app.updater.stop() await self.app.stop() await self.app.shutdown() await self.notifier.stop() def request_shutdown(self): """请求关闭 Bot""" self._shutdown_event.set() async def _set_commands(self): """设置 Bot 命令菜单提示""" commands = [ BotCommand("help", "查看帮助信息"), BotCommand("list", "查看 team.json 账号列表"), BotCommand("status", "查看任务处理状态"), BotCommand("team", "查看指定 Team 详情"), BotCommand("config", "查看系统配置"), BotCommand("run", "处理指定 Team"), BotCommand("run_all", "处理所有 Team"), BotCommand("resume", "继续处理未完成账号"), BotCommand("stop", "停止当前任务"), BotCommand("logs", "查看最近日志"), BotCommand("dashboard", "查看 S2A 仪表盘"), BotCommand("stock", "查看账号库存"), BotCommand("import", "导入账号到 team.json"), BotCommand("gptmail_keys", "查看 GPTMail API Keys"), BotCommand("gptmail_add", "添加 GPTMail API Key"), BotCommand("gptmail_del", "删除 GPTMail API Key"), BotCommand("test_email", "测试邮箱创建功能"), BotCommand("include_owners", "切换 Owner 入库开关"), BotCommand("reload", "重载配置文件"), BotCommand("s2a_config", "配置 S2A 服务参数"), ] try: await self.app.bot.set_my_commands(commands) log.info("Bot 命令菜单已设置") except Exception as e: log.warning(f"设置命令菜单失败: {e}") @admin_only async def cmd_help(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """显示帮助信息""" help_text = """🤖 OpenAI Team 批量注册 Bot 📋 查看信息: /list - 查看 team.json 账号列表 /status - 查看任务处理状态 /team <n> - 查看第 n 个 Team 处理详情 /config - 查看系统配置 /logs [n] - 查看最近 n 条日志 /logs_live - 启用实时日志推送 /logs_stop - 停止实时日志推送 🚀 任务控制: /run <n> - 开始处理第 n 个 Team /run_all - 开始处理所有 Team /resume - 继续处理未完成账号 /stop - 停止当前任务 ⚙️ 配置管理: /fingerprint - 开启/关闭随机指纹 /include_owners - 开启/关闭 Owner 入库 /reload - 重载配置文件 (无需重启) 📊 S2A 专属: /dashboard - 查看 S2A 仪表盘 /stock - 查看账号库存 /s2a_config - 配置 S2A 参数 📤 导入账号: /import - 导入账号到 team.json 或直接发送 JSON 文件 📧 GPTMail 管理: /gptmail_keys - 查看所有 API Keys /gptmail_add <key> - 添加 API Key /gptmail_del <key> - 删除 API Key /test_email - 测试邮箱创建 💡 示例: /list - 查看所有待处理账号 /run 0 - 处理第一个 Team /gptmail_add my-api-key - 添加 Key""" await update.message.reply_text(help_text, parse_mode="HTML") @admin_only async def cmd_status(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """查看所有 Team 状态""" tracker = load_team_tracker() teams_data = tracker.get("teams", {}) if not teams_data: await update.message.reply_text("📭 暂无数据,请先运行任务") return lines = ["📊 Team 状态总览\n"] for team_name, accounts in teams_data.items(): total = len(accounts) completed = sum(1 for a in accounts if a.get("status") == "completed") failed = sum(1 for a in accounts if "fail" in a.get("status", "").lower()) pending = total - completed - failed status_icon = "✅" if completed == total else ("❌" if failed > 0 else "⏳") lines.append( f"{status_icon} {team_name}: {completed}/{total} " f"(失败:{failed} 待处理:{pending})" ) # 当前任务状态 if self.current_task and not self.current_task.done(): lines.append(f"\n🔄 运行中: {self.current_team or '未知'}") await update.message.reply_text("\n".join(lines), parse_mode="HTML") @admin_only async def cmd_team(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """查看指定 Team 详情""" if not context.args: await update.message.reply_text("用法: /team <序号>\n示例: /team 0") return try: team_idx = int(context.args[0]) except ValueError: await update.message.reply_text("❌ 无效的序号,必须是数字") return if team_idx < 0 or team_idx >= len(TEAMS): await update.message.reply_text(f"❌ 序号超出范围,有效范围: 0-{len(TEAMS)-1}") return team = TEAMS[team_idx] team_name = team.get("name", f"Team{team_idx}") tracker = load_team_tracker() accounts = tracker.get("teams", {}).get(team_name, []) lines = [f"📁 Team {team_idx}: {team_name}\n"] lines.append(f"👤 Owner: {team.get('owner_email', '无')}") lines.append(f"📊 账号数: {len(accounts)}\n") if accounts: for acc in accounts: email = acc.get("email", "") status = acc.get("status", "unknown") role = acc.get("role", "member") icon = {"completed": "✅", "authorized": "🔐", "registered": "📝"}.get( status, "❌" if "fail" in status.lower() else "⏳" ) role_tag = " [Owner]" if role == "owner" else "" lines.append(f"{icon} {email}{role_tag}") else: lines.append("📭 暂无已处理的账号") await update.message.reply_text("\n".join(lines), parse_mode="HTML") @admin_only async def cmd_list(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """查看 team.json 中的账号列表""" if not TEAMS: await update.message.reply_text("📭 team.json 中没有账号") return lines = [f"📋 team.json 账号列表 (共 {len(TEAMS)} 个)\n"] for i, team in enumerate(TEAMS): email = team.get("owner_email", "") has_token = "🔑" if team.get("auth_token") else "🔒" authorized = "✅" if team.get("authorized") else "" needs_login = " [需登录]" if team.get("needs_login") else "" lines.append(f"{i}. {has_token} {email}{authorized}{needs_login}") # 统计 with_token = sum(1 for t in TEAMS if t.get("auth_token")) authorized = sum(1 for t in TEAMS if t.get("authorized")) lines.append(f"\n📊 统计:") lines.append(f"有 Token: {with_token}/{len(TEAMS)}") lines.append(f"已授权: {authorized}/{len(TEAMS)}") # 消息太长时分段发送 text = "\n".join(lines) if len(text) > 4000: # 分段 for i in range(0, len(lines), 30): chunk = "\n".join(lines[i:i+30]) await update.message.reply_text(chunk, parse_mode="HTML") else: await update.message.reply_text(text, parse_mode="HTML") @admin_only async def cmd_config(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """查看当前系统配置""" # 授权服务地址 if AUTH_PROVIDER == "s2a": auth_url = S2A_API_BASE or "未配置" elif AUTH_PROVIDER == "cpa": auth_url = CPA_API_BASE or "未配置" else: auth_url = CRS_API_BASE or "未配置" # 代理信息 if PROXY_ENABLED and PROXIES: proxy_info = f"已启用 ({len(PROXIES)} 个)" else: proxy_info = "未启用" # Owner 入库状态 include_owners_status = "✅ 已开启" if INCLUDE_TEAM_OWNERS else "❌ 未开启" lines = [ "⚙️ 系统配置", "", "📧 邮箱服务", f" 提供商: {EMAIL_PROVIDER}", "", "🔐 授权服务", f" 模式: {AUTH_PROVIDER.upper()}", f" 地址: {auth_url}", f" Owner 入库: {include_owners_status}", "", "👥 账号设置", f" 每 Team 账号数: {ACCOUNTS_PER_TEAM}", f" team.json 账号: {len(TEAMS)}", "", "🔗 代理", f" 状态: {proxy_info}", "", "💡 提示:", "/include_owners - 切换 Owner 入库", "/s2a_config - 配置 S2A 参数", ] await update.message.reply_text("\n".join(lines), parse_mode="HTML") @admin_only async def cmd_fingerprint(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """切换随机指纹""" import tomli_w try: # 读取当前配置 with open(CONFIG_FILE, "rb") as f: import tomllib config = tomllib.load(f) # 获取当前状态 current = config.get("browser", {}).get("random_fingerprint", True) new_value = not current # 更新配置 if "browser" not in config: config["browser"] = {} config["browser"]["random_fingerprint"] = new_value # 写回文件 with open(CONFIG_FILE, "wb") as f: tomli_w.dump(config, f) status = "✅ 已开启" if new_value else "❌ 已关闭" await update.message.reply_text( f"🎭 随机指纹\n\n" f"状态: {status}\n\n" f"开启后每次启动浏览器将随机使用不同的:\n" f"• User-Agent (Chrome 133-144)\n" f"• WebGL 显卡指纹\n" f"• 屏幕分辨率\n\n" f"💡 使用 /reload 立即生效", parse_mode="HTML" ) except ImportError: await update.message.reply_text( "❌ 缺少 tomli_w 依赖\n" "请运行: uv add tomli_w" ) except Exception as e: await update.message.reply_text(f"❌ 修改配置失败: {e}") @admin_only async def cmd_include_owners(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """切换 Owner 入库开关""" import tomli_w try: # 读取当前配置 with open(CONFIG_FILE, "rb") as f: import tomllib config = tomllib.load(f) # 获取当前状态 (顶层配置) current = config.get("include_team_owners", False) new_value = not current # 更新配置 (写到顶层) config["include_team_owners"] = new_value # 写回文件 with open(CONFIG_FILE, "wb") as f: tomli_w.dump(config, f) status = "✅ 已开启" if new_value else "❌ 已关闭" desc = "运行任务时会将 Team Owner 账号也入库到授权服务" if new_value else "运行任务时不会入库 Team Owner 账号" await update.message.reply_text( f"👤 Owner 入库开关\n\n" f"状态: {status}\n" f"{desc}\n\n" f"💡 使用 /reload 立即生效", parse_mode="HTML" ) except ImportError: await update.message.reply_text( "❌ 缺少 tomli_w 依赖\n" "请运行: uv add tomli_w" ) except Exception as e: await update.message.reply_text(f"❌ 修改配置失败: {e}") @admin_only async def cmd_reload(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """重载配置文件""" # 检查是否有任务正在运行 if self.current_task and not self.current_task.done(): await update.message.reply_text( f"⚠️ 有任务正在运行: {self.current_team}\n" "请等待任务完成或使用 /stop 停止后再重载配置" ) return await update.message.reply_text("⏳ 正在重载配置...") try: # 调用重载函数 result = reload_config() if result["success"]: # 重新导入更新后的配置变量 from config import ( EMAIL_PROVIDER as new_email_provider, AUTH_PROVIDER as new_auth_provider, INCLUDE_TEAM_OWNERS as new_include_owners, ACCOUNTS_PER_TEAM as new_accounts_per_team, ) lines = [ "✅ 配置重载成功", "", f"📁 team.json: {result['teams_count']} 个账号", f"📄 config.toml: {'已更新' if result['config_updated'] else '未变化'}", "", "当前配置:", f" 邮箱服务: {new_email_provider}", f" 授权服务: {new_auth_provider}", f" Owner 入库: {'✅' if new_include_owners else '❌'}", f" 每 Team 账号: {new_accounts_per_team}", ] await update.message.reply_text("\n".join(lines), parse_mode="HTML") else: await update.message.reply_text( f"❌ 配置重载失败\n\n{result['message']}", parse_mode="HTML" ) except Exception as e: await update.message.reply_text(f"❌ 重载配置失败: {e}") @admin_only async def cmd_s2a_config(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """配置 S2A 服务参数""" import tomli_w # 无参数时显示当前配置 if not context.args: # 脱敏显示 API Key key_display = "未配置" if S2A_ADMIN_KEY: if len(S2A_ADMIN_KEY) > 10: key_display = f"{S2A_ADMIN_KEY[:4]}...{S2A_ADMIN_KEY[-4:]}" else: key_display = S2A_ADMIN_KEY[:4] + "..." groups_display = ", ".join(S2A_GROUP_NAMES) if S2A_GROUP_NAMES else "默认分组" group_ids_display = ", ".join(str(x) for x in S2A_GROUP_IDS) if S2A_GROUP_IDS else "无" lines = [ "📊 S2A 服务配置", "", f"API 地址: {S2A_API_BASE or '未配置'}", f"Admin Key: {key_display}", f"并发数: {S2A_CONCURRENCY}", f"优先级: {S2A_PRIORITY}", f"分组名称: {groups_display}", f"分组 ID: {group_ids_display}", "", "💡 修改配置:", "/s2a_config concurrency 10", "/s2a_config priority 50", "/s2a_config groups 分组1,分组2", "/s2a_config api_base https://...", "/s2a_config admin_key sk-xxx", ] await update.message.reply_text("\n".join(lines), parse_mode="HTML") return # 解析参数 param = context.args[0].lower() value = " ".join(context.args[1:]) if len(context.args) > 1 else None if not value: await update.message.reply_text( f"❌ 缺少值\n用法: /s2a_config {param} <值>" ) return try: # 读取当前配置 with open(CONFIG_FILE, "rb") as f: import tomllib config = tomllib.load(f) # 确保 s2a section 存在 if "s2a" not in config: config["s2a"] = {} # 根据参数类型处理 updated_key = None updated_value = None if param == "concurrency": try: new_val = int(value) if new_val < 1 or new_val > 100: await update.message.reply_text("❌ 并发数范围: 1-100") return config["s2a"]["concurrency"] = new_val updated_key = "并发数" updated_value = str(new_val) except ValueError: await update.message.reply_text("❌ 并发数必须是数字") return elif param == "priority": try: new_val = int(value) if new_val < 0 or new_val > 100: await update.message.reply_text("❌ 优先级范围: 0-100") return config["s2a"]["priority"] = new_val updated_key = "优先级" updated_value = str(new_val) except ValueError: await update.message.reply_text("❌ 优先级必须是数字") return elif param in ("groups", "group_names"): # 支持逗号分隔的分组名称 groups = [g.strip() for g in value.split(",") if g.strip()] config["s2a"]["group_names"] = groups updated_key = "分组名称" updated_value = ", ".join(groups) if groups else "默认分组" elif param == "group_ids": # 支持逗号分隔的分组 ID ids = [i.strip() for i in value.split(",") if i.strip()] config["s2a"]["group_ids"] = ids updated_key = "分组 ID" updated_value = ", ".join(ids) if ids else "无" elif param == "api_base": config["s2a"]["api_base"] = value updated_key = "API 地址" updated_value = value elif param == "admin_key": config["s2a"]["admin_key"] = value updated_key = "Admin Key" # 脱敏显示 if len(value) > 10: updated_value = f"{value[:4]}...{value[-4:]}" else: updated_value = value[:4] + "..." else: await update.message.reply_text( f"❌ 未知参数: {param}\n\n" "可用参数:\n" "• concurrency - 并发数\n" "• priority - 优先级\n" "• groups - 分组名称\n" "• group_ids - 分组 ID\n" "• api_base - API 地址\n" "• admin_key - Admin Key" ) return # 写回文件 with open(CONFIG_FILE, "wb") as f: tomli_w.dump(config, f) await update.message.reply_text( f"✅ S2A 配置已更新\n\n" f"{updated_key}: {updated_value}\n\n" f"💡 使用 /reload 立即生效", parse_mode="HTML" ) except ImportError: await update.message.reply_text( "❌ 缺少 tomli_w 依赖\n" "请运行: uv add tomli_w" ) except Exception as e: await update.message.reply_text(f"❌ 修改配置失败: {e}") @admin_only async def cmd_run(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """启动处理指定 Team""" if self.current_task and not self.current_task.done(): await update.message.reply_text( f"⚠️ 任务正在运行: {self.current_team}\n使用 /stop 停止" ) return if not context.args: await update.message.reply_text("用法: /run <序号>\n示例: /run 0") return try: team_idx = int(context.args[0]) except ValueError: await update.message.reply_text("❌ 无效的序号,必须是数字") return if team_idx < 0 or team_idx >= len(TEAMS): await update.message.reply_text(f"❌ 序号超出范围,有效范围: 0-{len(TEAMS)-1}") return team_name = TEAMS[team_idx].get("name", f"Team{team_idx}") self.current_team = team_name await update.message.reply_text(f"🚀 开始处理 Team {team_idx}: {team_name}...") # 在后台线程执行任务 loop = asyncio.get_event_loop() self.current_task = loop.run_in_executor( self.executor, self._run_team_task, team_idx ) # 添加完成回调 self.current_task = asyncio.ensure_future(self._wrap_task(self.current_task, team_name)) @admin_only async def cmd_run_all(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """启动处理所有 Team""" if self.current_task and not self.current_task.done(): await update.message.reply_text( f"⚠️ 任务正在运行: {self.current_team}\n使用 /stop 停止" ) return self.current_team = "全部" await update.message.reply_text(f"🚀 开始处理所有 Team (共 {len(TEAMS)} 个)...") loop = asyncio.get_event_loop() self.current_task = loop.run_in_executor( self.executor, self._run_all_teams_task ) self.current_task = asyncio.ensure_future(self._wrap_task(self.current_task, "全部")) @admin_only async def cmd_resume(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """继续处理未完成的账号""" if self.current_task and not self.current_task.done(): await update.message.reply_text( f"⚠️ 任务正在运行: {self.current_team}\n使用 /stop 停止" ) return # 检查是否有未完成的账号 tracker = load_team_tracker() all_incomplete = get_all_incomplete_accounts(tracker) if not all_incomplete: await update.message.reply_text("✅ 没有待处理的账号,所有任务已完成") return # 统计未完成账号 total_incomplete = sum(len(accs) for accs in all_incomplete.values()) teams_count = len(all_incomplete) # 构建消息 lines = [ f"⏳ 发现 {total_incomplete} 个未完成账号", f"涉及 {teams_count} 个 Team:", "" ] for team_name, accounts in all_incomplete.items(): lines.append(f" • {team_name}: {len(accounts)} 个") lines.append("") lines.append("🚀 开始继续处理...") await update.message.reply_text("\n".join(lines), parse_mode="HTML") # 启动任务 (run_all_teams 会自动处理未完成的账号) self.current_team = "继续处理" loop = asyncio.get_event_loop() self.current_task = loop.run_in_executor( self.executor, self._run_all_teams_task ) self.current_task = asyncio.ensure_future(self._wrap_task(self.current_task, "继续处理")) async def _wrap_task(self, task, team_name: str): """包装任务以处理完成通知""" try: result = await task # 收集成功和失败的账号 success_accounts = [r.get("email") for r in (result or []) if r.get("status") == "success"] failed_accounts = [r.get("email") for r in (result or []) if r.get("status") != "success"] log.info(f"任务完成: {team_name}, 成功: {len(success_accounts)}, 失败: {len(failed_accounts)}") await self.notifier.notify_task_completed(team_name, success_accounts, failed_accounts) except Exception as e: log.error(f"任务异常: {team_name}, 错误: {e}") await self.notifier.notify_error(f"任务失败: {team_name}", str(e)) finally: self.current_team = None # 清理进度跟踪 progress_finish() def _run_team_task(self, team_idx: int): """执行单个 Team 任务 (在线程池中运行)""" # 延迟导入避免循环依赖 from run import run_single_team from team_service import preload_all_account_ids from utils import load_team_tracker, save_team_tracker, add_team_owners_to_tracker from config import DEFAULT_PASSWORD # 预加载 account_id preload_all_account_ids() _tracker = load_team_tracker() add_team_owners_to_tracker(_tracker, DEFAULT_PASSWORD) save_team_tracker(_tracker) return run_single_team(team_idx) def _run_all_teams_task(self): """执行所有 Team 任务 (在线程池中运行)""" from run import run_all_teams from team_service import preload_all_account_ids from utils import load_team_tracker, save_team_tracker, add_team_owners_to_tracker from config import DEFAULT_PASSWORD # 预加载 account_id preload_all_account_ids() _tracker = load_team_tracker() add_team_owners_to_tracker(_tracker, DEFAULT_PASSWORD) save_team_tracker(_tracker) return run_all_teams() @admin_only async def cmd_stop(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """强制停止当前任务""" if not self.current_task or self.current_task.done(): await update.message.reply_text("📭 当前没有运行中的任务") return task_name = self.current_team or "未知任务" await update.message.reply_text(f"🛑 正在强制停止: {task_name}...") try: # 1. 设置全局停止标志 try: import run run._shutdown_requested = True # 获取当前运行结果 current_results = run._current_results.copy() if run._current_results else [] except Exception: current_results = [] # 2. 取消 asyncio 任务 if self.current_task and not self.current_task.done(): self.current_task.cancel() # 3. 强制关闭浏览器进程 try: from browser_automation import cleanup_chrome_processes cleanup_chrome_processes() except Exception as e: log.warning(f"清理浏览器进程失败: {e}") # 4. 重置状态 self.current_team = None # 5. 重置停止标志 (以便下次任务可以正常运行) try: import run run._shutdown_requested = False except Exception: pass # 清理进度跟踪 progress_finish() # 6. 生成停止报告 report_lines = [ f"🛑 任务已停止", f"任务: {task_name}", "", ] # 本次运行结果 if current_results: success_count = sum(1 for r in current_results if r.get("status") == "success") failed_count = len(current_results) - success_count report_lines.append(f"📊 本次运行结果:") report_lines.append(f" 成功: {success_count}") report_lines.append(f" 失败: {failed_count}") report_lines.append("") # 获取未完成账号信息 tracker = load_team_tracker() all_incomplete = get_all_incomplete_accounts(tracker) if all_incomplete: total_incomplete = sum(len(accs) for accs in all_incomplete.values()) report_lines.append(f"⏳ 待继续处理: {total_incomplete} 个账号") # 显示每个 Team 的未完成账号 (最多显示 3 个 Team) shown_teams = 0 for team_name, accounts in all_incomplete.items(): if shown_teams >= 3: remaining = len(all_incomplete) - 3 report_lines.append(f" ... 还有 {remaining} 个 Team") break report_lines.append(f" {team_name}: {len(accounts)} 个") # 显示第一个待处理账号 if accounts: first_acc = accounts[0] report_lines.append(f" 下一个: {first_acc['email']}") shown_teams += 1 report_lines.append("") report_lines.append("💡 使用 /resume 继续处理") else: report_lines.append("✅ 没有待处理的账号") await update.message.reply_text("\n".join(report_lines), parse_mode="HTML") except Exception as e: await update.message.reply_text(f"❌ 停止任务时出错: {e}") @admin_only async def cmd_logs(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """查看最近日志""" try: n = int(context.args[0]) if context.args else 10 except ValueError: n = 10 n = min(n, 50) # 限制最大条数 try: from config import BASE_DIR log_file = BASE_DIR / "logs" / "app.log" if not log_file.exists(): await update.message.reply_text("📭 日志文件不存在") return with open(log_file, "r", encoding="utf-8", errors="ignore") as f: lines = f.readlines() recent = lines[-n:] if len(lines) >= n else lines if not recent: await update.message.reply_text("📭 日志文件为空") return # 格式化日志 (移除 ANSI 颜色码) import re ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])') clean_lines = [ansi_escape.sub('', line.strip()) for line in recent] log_text = "\n".join(clean_lines) if len(log_text) > 4000: log_text = log_text[-4000:] await update.message.reply_text(f"{log_text}", parse_mode="HTML") except Exception as e: await update.message.reply_text(f"❌ 读取日志失败: {e}") @admin_only async def cmd_logs_live(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """启用实时日志推送""" from logger import log if log.is_telegram_logging_enabled(): await update.message.reply_text("📡 实时日志已经在运行中") return # 启用 Telegram 日志推送 def log_callback(message: str, level: str): """日志回调函数""" if self.notifier: self.notifier.queue_message(message, level) log.enable_telegram_logging(log_callback) await update.message.reply_text( "✅ 实时日志已启用\n" "所有日志将实时推送到此聊天\n" "使用 /logs_stop 停止推送" ) @admin_only async def cmd_logs_stop(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """停止实时日志推送""" from logger import log if not log.is_telegram_logging_enabled(): await update.message.reply_text("📭 实时日志未启用") return log.disable_telegram_logging() await update.message.reply_text("✅ 实时日志已停止") @admin_only async def cmd_dashboard(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """查看 S2A 仪表盘统计""" if AUTH_PROVIDER != "s2a": await update.message.reply_text( f"⚠️ 仪表盘仅支持 S2A 模式\n" f"当前模式: {AUTH_PROVIDER}" ) return await update.message.reply_text("⏳ 正在获取仪表盘数据...") try: stats = s2a_get_dashboard_stats() if stats: text = format_dashboard_stats(stats) await update.message.reply_text(text, parse_mode="HTML") else: await update.message.reply_text( "❌ 获取仪表盘数据失败\n" "请检查 S2A 配置和 API 连接" ) except Exception as e: await update.message.reply_text(f"❌ 错误: {e}") @admin_only async def cmd_stock(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """查看账号存货""" if AUTH_PROVIDER != "s2a": await update.message.reply_text( f"⚠️ 库存查询仅支持 S2A 模式\n" f"当前模式: {AUTH_PROVIDER}" ) return stats = s2a_get_dashboard_stats() if not stats: await update.message.reply_text("❌ 获取库存信息失败") return text = self._format_stock_message(stats) await update.message.reply_text(text, parse_mode="HTML") async def scheduled_stock_check(self, context: ContextTypes.DEFAULT_TYPE): """定时检查账号存货""" try: stats = s2a_get_dashboard_stats() if not stats: return normal = stats.get("normal_accounts", 0) total = stats.get("total_accounts", 0) # 只在低库存时发送通知 if normal <= TELEGRAM_LOW_STOCK_THRESHOLD: text = self._format_stock_message(stats, is_alert=True) for chat_id in TELEGRAM_ADMIN_CHAT_IDS: try: await context.bot.send_message( chat_id=chat_id, text=text, parse_mode="HTML" ) except Exception: pass except Exception as e: log.warning(f"Stock check failed: {e}") def _format_stock_message(self, stats: dict, is_alert: bool = False) -> str: """格式化存货消息""" total = stats.get("total_accounts", 0) normal = stats.get("normal_accounts", 0) error = stats.get("error_accounts", 0) ratelimit = stats.get("ratelimit_accounts", 0) overload = stats.get("overload_accounts", 0) # 计算健康度 health_pct = (normal / total * 100) if total > 0 else 0 # 状态图标 if normal <= TELEGRAM_LOW_STOCK_THRESHOLD: status_icon = "⚠️ 库存不足" status_line = f"{status_icon}" elif health_pct >= 80: status_icon = "✅ 正常" status_line = f"{status_icon}" elif health_pct >= 50: status_icon = "⚠️ 警告" status_line = f"{status_icon}" else: status_icon = "🔴 严重" status_line = f"{status_icon}" title = "🚨 库存不足警报" if is_alert else "📦 账号库存" lines = [ f"{title}", "", f"状态: {status_line}", f"健康度: {health_pct:.1f}%", "", f"正常: {normal}", f"异常: {error}", f"限流: {ratelimit}", f"总计: {total}", ] if is_alert: lines.append("") lines.append(f"预警阈值: {TELEGRAM_LOW_STOCK_THRESHOLD}") return "\n".join(lines) @admin_only async def cmd_import(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """上传账号到 team.json""" # 获取命令后的 JSON 数据 if not context.args: await update.message.reply_text( "📤 导入账号到 team.json\n\n" "用法:\n" "1. 直接发送 JSON 文件\n" "2. /import 后跟 JSON 数据\n\n" "JSON 格式:\n" "[{\"account\":\"邮箱\",\"password\":\"密码\",\"token\":\"jwt\"},...]\n\n" "导入后使用 /run 开始处理", parse_mode="HTML" ) return # 尝试解析 JSON json_text = " ".join(context.args) await self._process_import_json(update, json_text) @admin_only async def handle_json_file(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """处理上传的 JSON 文件""" # 检查是否是管理员 user_id = update.effective_user.id if user_id not in TELEGRAM_ADMIN_CHAT_IDS: await update.message.reply_text("⛔ 无权限") return document = update.message.document if not document: return await update.message.reply_text("⏳ 正在处理 JSON 文件...") try: # 下载文件 file = await document.get_file() file_bytes = await file.download_as_bytearray() json_text = file_bytes.decode("utf-8") await self._process_import_json(update, json_text) except Exception as e: await update.message.reply_text(f"❌ 读取文件失败: {e}") async def _process_import_json(self, update: Update, json_text: str): """处理导入的 JSON 数据,保存到 team.json""" import json from pathlib import Path try: new_accounts = json.loads(json_text) except json.JSONDecodeError as e: await update.message.reply_text(f"❌ JSON 格式错误: {e}") return if not isinstance(new_accounts, list): # 如果是单个对象,转成列表 new_accounts = [new_accounts] if not new_accounts: await update.message.reply_text("📭 JSON 数据中没有账号") return # 验证格式 valid_accounts = [] for acc in new_accounts: if not isinstance(acc, dict): continue # 支持 account 或 email 字段 email = acc.get("account") or acc.get("email", "") token = acc.get("token", "") password = acc.get("password", "") if email and token: valid_accounts.append({ "account": email, "password": password, "token": token }) if not valid_accounts: await update.message.reply_text("❌ 未找到有效账号 (需要 account/email 和 token 字段)") return # 读取现有 team.json (不存在则自动创建) team_json_path = Path(TEAM_JSON_FILE) existing_accounts = [] is_new_file = not team_json_path.exists() if not is_new_file: try: with open(team_json_path, "r", encoding="utf-8") as f: existing_accounts = json.load(f) if not isinstance(existing_accounts, list): existing_accounts = [existing_accounts] except Exception: existing_accounts = [] # 检查重复 existing_emails = set() for acc in existing_accounts: email = acc.get("account") or acc.get("user", {}).get("email", "") if email: existing_emails.add(email.lower()) added = 0 skipped = 0 for acc in valid_accounts: email = acc.get("account", "").lower() if email in existing_emails: skipped += 1 else: existing_accounts.append(acc) existing_emails.add(email) added += 1 # 保存到 team.json (自动创建文件) try: # 确保父目录存在 team_json_path.parent.mkdir(parents=True, exist_ok=True) with open(team_json_path, "w", encoding="utf-8") as f: json.dump(existing_accounts, f, ensure_ascii=False, indent=2) file_status = "📄 已创建 team.json" if is_new_file else "📄 已更新 team.json" await update.message.reply_text( f"✅ 导入完成\n\n" f"{file_status}\n" f"新增: {added}\n" f"跳过 (重复): {skipped}\n" f"team.json 总数: {len(existing_accounts)}\n\n" f"💡 使用 /reload 刷新配置\n" f"使用 /run_all 或 /run <n> 开始处理", parse_mode="HTML" ) except Exception as e: await update.message.reply_text(f"❌ 保存到 team.json 失败: {e}") # ==================== GPTMail Key 管理命令 ==================== @admin_only async def cmd_gptmail_keys(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """查看所有 GPTMail API Keys""" keys = get_gptmail_keys() config_keys = set(GPTMAIL_API_KEYS) if not keys: await update.message.reply_text( "📧 GPTMail API Keys\n\n" "📭 暂无配置 Key\n\n" "使用 /gptmail_add <key> 添加", parse_mode="HTML" ) return lines = [f"📧 GPTMail API Keys (共 {len(keys)} 个)\n"] for i, key in enumerate(keys): # 脱敏显示 if len(key) > 10: masked = f"{key[:4]}...{key[-4:]}" else: masked = key[:4] + "..." if len(key) > 4 else key # 标记来源 source = "📁 配置" if key in config_keys else "🔧 动态" lines.append(f"{i+1}. {masked} {source}") lines.append(f"\n💡 管理:") lines.append(f"/gptmail_add <key> - 添加 Key") lines.append(f"/gptmail_del <key> - 删除动态 Key") lines.append(f"/test_email - 测试邮箱创建") await update.message.reply_text("\n".join(lines), parse_mode="HTML") @admin_only async def cmd_gptmail_add(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """添加 GPTMail API Key (支持批量导入)""" if not context.args: await update.message.reply_text( "📧 添加 GPTMail API Key\n\n" "单个添加:\n" "/gptmail_add gpt-xxx\n\n" "批量添加 (空格分隔):\n" "/gptmail_add key1 key2 key3\n\n" "批量添加 (换行分隔):\n" "/gptmail_add key1\nkey2\nkey3", parse_mode="HTML" ) return # 合并所有参数,支持空格和换行分隔 raw_input = " ".join(context.args) # 按空格和换行分割 keys = [] for part in raw_input.replace("\n", " ").split(): key = part.strip() if key: keys.append(key) if not keys: await update.message.reply_text("❌ Key 不能为空") return # 获取现有 keys existing_keys = set(get_gptmail_keys()) # 统计结果 added = [] skipped = [] invalid = [] await update.message.reply_text(f"⏳ 正在验证 {len(keys)} 个 Key...") for key in keys: # 检查是否已存在 if key in existing_keys: skipped.append(key) continue # 测试 Key 是否有效 success, message = gptmail_service.test_api_key(key) if not success: invalid.append(key) continue # 添加 Key if add_gptmail_key(key): added.append(key) existing_keys.add(key) # 生成结果报告 lines = ["📧 GPTMail Key 导入结果\n"] if added: lines.append(f"✅ 成功添加: {len(added)}") for k in added[:5]: # 最多显示5个 masked = f"{k[:4]}...{k[-4:]}" if len(k) > 10 else k lines.append(f" • {masked}") if len(added) > 5: lines.append(f" • ... 等 {len(added)} 个") if skipped: lines.append(f"\n⏭️ 已跳过 (已存在): {len(skipped)}") if invalid: lines.append(f"\n❌ 无效 Key: {len(invalid)}") for k in invalid[:3]: # 最多显示3个 masked = f"{k[:4]}...{k[-4:]}" if len(k) > 10 else k lines.append(f" • {masked}") lines.append(f"\n当前 Key 总数: {len(get_gptmail_keys())}") await update.message.reply_text("\n".join(lines), parse_mode="HTML") @admin_only async def cmd_gptmail_del(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """删除 GPTMail API Key""" if not context.args: await update.message.reply_text( "📧 删除 GPTMail API Key\n\n" "用法: /gptmail_del <key>\n\n" "注意: 只能删除动态添加的 Key,配置文件中的 Key 请直接修改 config.toml", parse_mode="HTML" ) return key = context.args[0].strip() # 检查是否是配置文件中的 Key if key in GPTMAIL_API_KEYS: await update.message.reply_text( "⚠️ 该 Key 在配置文件中,无法通过 Bot 删除\n" "请直接修改 config.toml" ) return # 删除 Key if remove_gptmail_key(key): await update.message.reply_text( f"✅ Key 已删除\n\n" f"当前 Key 总数: {len(get_gptmail_keys())}", parse_mode="HTML" ) else: await update.message.reply_text("❌ Key 不存在或删除失败") @admin_only async def cmd_test_email(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """测试邮箱创建功能""" if EMAIL_PROVIDER != "gptmail": await update.message.reply_text( f"⚠️ 当前邮箱提供商: {EMAIL_PROVIDER}\n" f"测试功能仅支持 GPTMail 模式" ) return await update.message.reply_text("⏳ 正在测试邮箱创建...") try: # 测试创建邮箱 email, password = unified_create_email() if email: await update.message.reply_text( f"✅ 邮箱创建成功\n\n" f"邮箱: {email}\n" f"密码: {password}\n\n" f"当前 Key 数量: {len(get_gptmail_keys())}", parse_mode="HTML" ) else: await update.message.reply_text( "❌ 邮箱创建失败\n\n" "请检查 GPTMail API Key 配置", parse_mode="HTML" ) except Exception as e: await update.message.reply_text(f"❌ 测试失败: {e}") async def main(): """主函数""" if not TELEGRAM_BOT_TOKEN: print("Telegram Bot Token 未配置,请在 config.toml 中设置 telegram.bot_token") sys.exit(1) if not TELEGRAM_ADMIN_CHAT_IDS: print("管理员 Chat ID 未配置,请在 config.toml 中设置 telegram.admin_chat_ids") sys.exit(1) bot = ProvisionerBot() # 处理 Ctrl+C import signal def signal_handler(sig, frame): log.info("正在关闭...") bot.request_shutdown() signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) await bot.start() if __name__ == "__main__": asyncio.run(main())