From 6600195a3fdf07332d61aa62facfb4de7b6144c5 Mon Sep 17 00:00:00 2001 From: kyx236 Date: Wed, 21 Jan 2026 22:05:56 +0800 Subject: [PATCH] update --- s2a_service.py | 172 ++++++++++++++++++++++++++++++++++++++++++++++++ telegram_bot.py | 172 +++++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 343 insertions(+), 1 deletion(-) diff --git a/s2a_service.py b/s2a_service.py index a37683b..8a5df71 100644 --- a/s2a_service.py +++ b/s2a_service.py @@ -507,6 +507,178 @@ def s2a_get_accounts(platform: str = "openai") -> List[Dict[str, Any]]: return [] +def s2a_get_error_accounts( + platform: str = "", + page_size: int = 100, + timezone: str = "Asia/Shanghai" +) -> Tuple[List[Dict[str, Any]], int]: + """获取所有错误状态的账号(支持分页获取全部) + + Args: + platform: 平台筛选 (默认为空,获取所有平台) + page_size: 每页数量 + timezone: 时区 + + Returns: + tuple: (账号列表, 总数) + """ + if not S2A_API_BASE or (not S2A_ADMIN_KEY and not S2A_ADMIN_TOKEN): + return [], 0 + + headers = build_s2a_headers() + all_accounts = [] + total_count = 0 + page = 1 + + try: + while True: + params = { + "page": page, + "page_size": page_size, + "platform": platform, + "type": "", + "status": "error", + "search": "", + "timezone": timezone + } + + response = http_session.get( + f"{S2A_API_BASE}/admin/accounts", + headers=headers, + params=params, + timeout=REQUEST_TIMEOUT + ) + + if response.status_code != 200: + log.warning(f"S2A 获取错误账号失败: HTTP {response.status_code}") + break + + result = response.json() + if result.get("code") != 0: + log.warning(f"S2A 获取错误账号失败: {result.get('message', 'Unknown error')}") + break + + data = result.get("data", {}) + items = data.get("items", []) + total_count = data.get("total", 0) + total_pages = data.get("pages", 1) + + all_accounts.extend(items) + + # 如果已获取所有页面,退出循环 + if page >= total_pages or not items: + break + + page += 1 + + return all_accounts, total_count + + except Exception as e: + log.warning(f"S2A 获取错误账号异常: {e}") + return [], 0 + + +def s2a_delete_account(account_id: int) -> Tuple[bool, str]: + """删除单个账号 + + Args: + account_id: 账号 ID + + Returns: + tuple: (是否成功, 消息) + """ + if not S2A_API_BASE or (not S2A_ADMIN_KEY and not S2A_ADMIN_TOKEN): + return False, "S2A not configured" + + headers = build_s2a_headers() + + try: + response = http_session.delete( + f"{S2A_API_BASE}/admin/accounts/{account_id}", + headers=headers, + timeout=REQUEST_TIMEOUT + ) + + if response.status_code == 200: + result = response.json() + if result.get("code") == 0: + return True, "Deleted" + else: + return False, result.get("message", "Unknown error") + else: + return False, f"HTTP {response.status_code}" + + except Exception as e: + return False, str(e) + + +def s2a_batch_delete_error_accounts( + progress_callback: Optional[callable] = None +) -> Dict[str, Any]: + """批量删除所有错误状态的账号 + + Args: + progress_callback: 进度回调函数 (current, total, account_name, success) + + Returns: + dict: {"success": int, "failed": int, "total": int, "details": [...]} + """ + results = { + "success": 0, + "failed": 0, + "total": 0, + "details": [] + } + + # 获取所有错误账号 + error_accounts, total = s2a_get_error_accounts() + results["total"] = total + + if not error_accounts: + return results + + log.info(f"开始批量删除 {len(error_accounts)} 个错误账号...") + + for i, account in enumerate(error_accounts): + account_id = account.get("id") + account_name = account.get("name", "") + error_message = account.get("error_message", "") + + if not account_id: + results["failed"] += 1 + results["details"].append({ + "name": account_name, + "status": "failed", + "message": "Missing account ID" + }) + continue + + success, message = s2a_delete_account(account_id) + + if success: + results["success"] += 1 + results["details"].append({ + "id": account_id, + "name": account_name, + "error": error_message[:50] if error_message else "", + "status": "deleted" + }) + else: + results["failed"] += 1 + results["details"].append({ + "id": account_id, + "name": account_name, + "status": "failed", + "message": message + }) + + if progress_callback: + progress_callback(i + 1, len(error_accounts), account_name, success) + + log.success(f"批量删除完成: 成功 {results['success']}, 失败 {results['failed']}") + return results + + def s2a_check_account_exists(email: str, platform: str = "openai") -> bool: """检查账号是否已存在""" accounts = s2a_get_accounts(platform) diff --git a/telegram_bot.py b/telegram_bot.py index 745cbb3..3c1e3c8 100644 --- a/telegram_bot.py +++ b/telegram_bot.py @@ -51,7 +51,10 @@ from config import ( ) from utils import load_team_tracker, get_all_incomplete_accounts from bot_notifier import BotNotifier, set_notifier, progress_finish -from s2a_service import s2a_get_dashboard_stats, format_dashboard_stats, s2a_get_keys_with_usage, format_keys_usage +from s2a_service import ( + s2a_get_dashboard_stats, format_dashboard_stats, s2a_get_keys_with_usage, format_keys_usage, + s2a_get_error_accounts, s2a_delete_account, s2a_batch_delete_error_accounts +) from email_service import gptmail_service, unified_create_email from logger import log @@ -131,6 +134,7 @@ class ProvisionerBot: ("reload", self.cmd_reload), ("s2a_config", self.cmd_s2a_config), ("clean", self.cmd_clean), + ("clean_errors", self.cmd_clean_errors), ("keys_usage", self.cmd_keys_usage), ] for cmd, handler in handlers: @@ -147,6 +151,10 @@ class ProvisionerBot: self.callback_keys_usage, pattern="^keys_usage:" )) + self.app.add_handler(CallbackQueryHandler( + self.callback_clean_errors, + pattern="^clean_errors:" + )) # 注册定时检查任务 if TELEGRAM_CHECK_INTERVAL > 0 and AUTH_PROVIDER == "s2a": @@ -255,6 +263,7 @@ class ProvisionerBot: /keys_usage - 查看 API 密钥用量 /stock - 查看账号库存 /s2a_config - 配置 S2A 参数 +/clean_errors - 清理错误状态账号 📤 导入账号: /import - 导入账号到 team.json @@ -1414,6 +1423,167 @@ class ProvisionerBot: return "\n".join(lines) + @admin_only + async def cmd_clean_errors(self, update: Update, context: ContextTypes.DEFAULT_TYPE): + """清理错误状态的账号""" + if AUTH_PROVIDER != "s2a": + await update.message.reply_text( + f"⚠️ 清理错误账号仅支持 S2A 模式\n" + f"当前模式: {AUTH_PROVIDER}" + ) + return + + # 获取错误账号 + error_accounts, total = s2a_get_error_accounts() + + if total == 0: + await update.message.reply_text("✅ 没有错误状态的账号需要清理") + return + + # 存储账号数据到 context.bot_data 供分页使用 + context.bot_data["clean_errors_accounts"] = error_accounts + context.bot_data["clean_errors_total"] = total + + # 显示第一页 + text, keyboard = self._build_clean_errors_page(error_accounts, total, page=0) + await update.message.reply_text(text, reply_markup=keyboard, parse_mode="HTML") + + def _build_clean_errors_page(self, accounts: list, total: int, page: int = 0, page_size: int = 10): + """构建错误账号预览页面""" + total_pages = (total + page_size - 1) // page_size + start_idx = page * page_size + end_idx = min(start_idx + page_size, total) + page_accounts = accounts[start_idx:end_idx] + + # 按错误类型分组统计(全部账号) + error_types = {} + for acc in accounts: + error_msg = acc.get("error_message", "Unknown") + error_key = error_msg[:50] if error_msg else "Unknown" + error_types[error_key] = error_types.get(error_key, 0) + 1 + + lines = [ + "🗑️ 清理错误账号 (预览)", + "", + f"共发现 {total} 个错误状态账号", + "", + "错误类型统计:", + ] + + # 显示前5种错误类型 + sorted_errors = sorted(error_types.items(), key=lambda x: x[1], reverse=True)[:5] + for error_msg, count in sorted_errors: + lines.append(f"• {count}x: {error_msg}...") + + if len(error_types) > 5: + lines.append(f"• ... 还有 {len(error_types) - 5} 种其他错误") + + lines.extend([ + "", + f"账号列表 (第 {page + 1}/{total_pages} 页):", + ]) + + # 显示当前页的账号 + for i, acc in enumerate(page_accounts, start=start_idx + 1): + name = acc.get("name", "Unknown")[:25] + error_msg = acc.get("error_message", "")[:30] + lines.append(f"{i}. {name} - {error_msg}") + + lines.extend([ + "", + "⚠️ 此操作不可撤销!", + ]) + + text = "\n".join(lines) + + # 构建分页按钮 + nav_buttons = [] + if page > 0: + nav_buttons.append(InlineKeyboardButton("⬅️ 上一页", callback_data=f"clean_errors:page:{page - 1}")) + if page < total_pages - 1: + nav_buttons.append(InlineKeyboardButton("下一页 ➡️", callback_data=f"clean_errors:page:{page + 1}")) + + keyboard = [ + nav_buttons, + [InlineKeyboardButton(f"🗑️ 确认删除全部 ({total})", callback_data="clean_errors:confirm")], + [InlineKeyboardButton("❌ 取消", callback_data="clean_errors:cancel")], + ] + # 过滤空行 + keyboard = [row for row in keyboard if row] + + return text, InlineKeyboardMarkup(keyboard) + + async def callback_clean_errors(self, update: Update, context: ContextTypes.DEFAULT_TYPE): + """处理清理错误账号的回调""" + query = update.callback_query + await query.answer() + + # 验证权限 + user_id = update.effective_user.id + if user_id not in TELEGRAM_ADMIN_CHAT_IDS: + await query.edit_message_text("⛔ 无权限") + return + + # 解析回调数据 + data = query.data.replace("clean_errors:", "") + + if data.startswith("page:"): + # 分页浏览 + page = int(data.replace("page:", "")) + accounts = context.bot_data.get("clean_errors_accounts", []) + total = context.bot_data.get("clean_errors_total", 0) + + if not accounts: + await query.edit_message_text("❌ 数据已过期,请重新使用 /clean_errors") + return + + text, keyboard = self._build_clean_errors_page(accounts, total, page) + await query.edit_message_text(text, reply_markup=keyboard, parse_mode="HTML") + + elif data == "cancel": + # 取消操作 + context.bot_data.pop("clean_errors_accounts", None) + context.bot_data.pop("clean_errors_total", None) + await query.edit_message_text("✅ 已取消清理操作") + + elif data == "confirm": + # 执行删除 + total = context.bot_data.get("clean_errors_total", 0) + + await query.edit_message_text( + f"🗑️ 正在删除 {total} 个错误账号...\n\n" + "进度: 0%", + parse_mode="HTML" + ) + + # 同步执行删除 + results = s2a_batch_delete_error_accounts() + + # 清理缓存数据 + context.bot_data.pop("clean_errors_accounts", None) + context.bot_data.pop("clean_errors_total", None) + + # 显示结果 + lines = [ + "✅ 清理完成", + "", + f"成功删除: {results['success']}", + f"删除失败: {results['failed']}", + f"总计: {results['total']}", + ] + + # 如果有失败的,显示部分失败详情 + failed_details = [d for d in results.get("details", []) if d.get("status") == "failed"] + if failed_details: + lines.append("") + lines.append("失败详情:") + for detail in failed_details[:5]: + lines.append(f"• {detail.get('name', '')}: {detail.get('message', '')}") + if len(failed_details) > 5: + lines.append(f"• ... 还有 {len(failed_details) - 5} 个失败") + + await query.edit_message_text("\n".join(lines), parse_mode="HTML") + @admin_only async def cmd_import(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """上传账号到 team.json"""