From 828757077d00f9c84f1cb4da249b0c552e905f2d Mon Sep 17 00:00:00 2001 From: kyx236 Date: Tue, 20 Jan 2026 19:34:13 +0800 Subject: [PATCH] update --- s2a_service.py | 221 ++++++++++++++++++++++++++++++++++++++++++++++++ telegram_bot.py | 133 ++++++++++++++++++++++++++++- 2 files changed, 352 insertions(+), 2 deletions(-) diff --git a/s2a_service.py b/s2a_service.py index 857ff35..1f44758 100644 --- a/s2a_service.py +++ b/s2a_service.py @@ -814,3 +814,224 @@ def s2a_batch_import_accounts( progress_callback(i + 1, total, email, status) return results + + +# ==================== API 密钥用量查询 ==================== +def s2a_get_api_keys(page: int = 1, page_size: int = 50, timezone: str = "Asia/Shanghai") -> Optional[Dict[str, Any]]: + """获取 API 密钥列表 + + Args: + page: 页码 + page_size: 每页数量 + timezone: 时区 + + Returns: + dict: API 响应数据 或 None + """ + if not S2A_API_BASE or (not S2A_ADMIN_KEY and not S2A_ADMIN_TOKEN): + return None + + headers = build_s2a_headers() + + try: + response = http_session.get( + f"{S2A_API_BASE}/api/v1/keys", + headers=headers, + params={"page": page, "page_size": page_size, "timezone": timezone}, + timeout=REQUEST_TIMEOUT + ) + + if response.status_code == 200: + result = response.json() + if result.get("code") == 0: + return result.get("data", {}) + else: + log.warning(f"S2A 获取密钥列表失败: {result.get('message', 'Unknown error')}") + else: + log.warning(f"S2A 获取密钥列表失败: HTTP {response.status_code}") + + except Exception as e: + log.warning(f"S2A 获取密钥列表异常: {e}") + + return None + + +def s2a_get_key_usage_stats( + api_key_id: int, + start_date: str, + end_date: str, + timezone: str = "Asia/Shanghai" +) -> Optional[Dict[str, Any]]: + """获取单个 API 密钥的详细用量统计 + + Args: + api_key_id: 密钥 ID + start_date: 开始日期 (YYYY-MM-DD) + end_date: 结束日期 (YYYY-MM-DD) + timezone: 时区 + + Returns: + dict: 用量统计数据 或 None + """ + if not S2A_API_BASE or (not S2A_ADMIN_KEY and not S2A_ADMIN_TOKEN): + return None + + headers = build_s2a_headers() + + try: + response = http_session.get( + f"{S2A_API_BASE}/api/v1/admin/usage/stats", + headers=headers, + params={ + "api_key_id": api_key_id, + "start_date": start_date, + "end_date": end_date, + "timezone": timezone + }, + timeout=REQUEST_TIMEOUT + ) + + if response.status_code == 200: + result = response.json() + if result.get("code") == 0: + return result.get("data", {}) + else: + log.warning(f"S2A 获取密钥用量失败: {result.get('message', 'Unknown error')}") + else: + log.warning(f"S2A 获取密钥用量失败: HTTP {response.status_code}") + + except Exception as e: + log.warning(f"S2A 获取密钥用量异常: {e}") + + return None + + +def s2a_get_keys_with_usage(start_date: str = None, end_date: str = None, timezone: str = "Asia/Shanghai") -> Optional[List[Dict[str, Any]]]: + """获取密钥列表并合并用量数据 + + Args: + start_date: 开始日期 (YYYY-MM-DD),默认今日 + end_date: 结束日期 (YYYY-MM-DD),默认今日 + timezone: 时区 + + Returns: + list: 包含用量信息的密钥列表 或 None + """ + from datetime import datetime + + # 获取密钥列表 + keys_data = s2a_get_api_keys(page=1, page_size=100, timezone=timezone) + if not keys_data: + return None + + keys = keys_data.get("items", []) + if not keys: + return [] + + # 默认今日 + if not start_date: + start_date = datetime.now().strftime("%Y-%m-%d") + if not end_date: + end_date = datetime.now().strftime("%Y-%m-%d") + + # 获取每个密钥的用量 + for key in keys: + key_id = key.get("id") + if key_id: + usage = s2a_get_key_usage_stats(key_id, start_date, end_date, timezone) + key["usage"] = usage if usage else {} + else: + key["usage"] = {} + + return keys + + +def format_keys_usage(keys: List[Dict[str, Any]], period_text: str = "今日") -> str: + """格式化密钥用量为可读文本 + + Args: + keys: 密钥列表 (包含 usage 字段) + period_text: 时间段描述 + + Returns: + str: 格式化后的文本 + """ + if not keys: + return "暂无密钥数据" + + def fmt_cost(n): + """格式化费用""" + if n >= 1000: + return f"${n:,.2f}" + elif n >= 1: + return f"${n:.2f}" + return f"${n:.4f}" + + def fmt_tokens(n): + """格式化 Token 数量""" + if n >= 1_000_000_000: + return f"{n / 1_000_000_000:.2f}B" + elif n >= 1_000_000: + return f"{n / 1_000_000:.2f}M" + elif n >= 1_000: + return f"{n / 1_000:.1f}K" + return str(int(n)) + + def fmt_duration(ms): + """格式化耗时""" + if ms >= 1000: + return f"{ms / 1000:.2f}s" + return f"{ms:.0f}ms" + + lines = [f"🔑 API 密钥用量 ({period_text})", ""] + + total_requests = 0 + total_tokens = 0 + total_cost = 0 + + for key in keys: + name = key.get("name", "未命名") + key_str = key.get("key", "") + status = key.get("status", "unknown") + group = key.get("group", {}) + group_name = group.get("name", "默认") if group else "默认" + + usage = key.get("usage", {}) + requests = usage.get("total_requests", 0) + tokens = usage.get("total_tokens", 0) + input_tokens = usage.get("total_input_tokens", 0) + output_tokens = usage.get("total_output_tokens", 0) + cache_tokens = usage.get("total_cache_tokens", 0) + cost = usage.get("total_actual_cost", 0) + avg_duration = usage.get("average_duration_ms", 0) + + total_requests += requests + total_tokens += tokens + total_cost += cost + + # 状态图标 + status_icon = "✅" if status == "active" else "⏸️" + + # 密钥脱敏显示 + if len(key_str) > 12: + key_display = f"{key_str[:6]}...{key_str[-4:]}" + else: + key_display = key_str[:8] + "..." if key_str else "N/A" + + lines.append(f"{status_icon} {name} ({group_name})") + lines.append(f" 密钥: {key_display}") + lines.append(f" 请求: {requests:,} | 耗时: {fmt_duration(avg_duration)}") + lines.append(f" Token: {fmt_tokens(tokens)} (入:{fmt_tokens(input_tokens)} 出:{fmt_tokens(output_tokens)})") + if cache_tokens > 0: + lines.append(f" 缓存: {fmt_tokens(cache_tokens)}") + lines.append(f" 费用: {fmt_cost(cost)}") + lines.append("") + + # 汇总 + lines.append(f"📊 {period_text}汇总") + lines.append(f" 密钥数: {len(keys)}") + lines.append(f" 总请求: {total_requests:,}") + lines.append(f" 总 Token: {fmt_tokens(total_tokens)}") + lines.append(f" 总费用: {fmt_cost(total_cost)}") + + return "\n".join(lines) diff --git a/telegram_bot.py b/telegram_bot.py index ac81954..a581d46 100644 --- a/telegram_bot.py +++ b/telegram_bot.py @@ -8,11 +8,12 @@ from functools import wraps from pathlib import Path from typing import Optional -from telegram import Update, Bot, BotCommand +from telegram import Update, Bot, BotCommand, InlineKeyboardButton, InlineKeyboardMarkup from telegram.ext import ( Application, CommandHandler, MessageHandler, + CallbackQueryHandler, filters, ContextTypes, ) @@ -50,7 +51,7 @@ from config import ( ) from utils import load_team_tracker, get_all_incomplete_accounts from bot_notifier import BotNotifier, set_notifier, progress_finish -from s2a_service import s2a_get_dashboard_stats, format_dashboard_stats +from s2a_service import s2a_get_dashboard_stats, format_dashboard_stats, s2a_get_keys_with_usage, format_keys_usage from email_service import gptmail_service, unified_create_email from logger import log @@ -118,6 +119,7 @@ class ProvisionerBot: ("reload", self.cmd_reload), ("s2a_config", self.cmd_s2a_config), ("clean", self.cmd_clean), + ("keys_usage", self.cmd_keys_usage), ] for cmd, handler in handlers: self.app.add_handler(CommandHandler(cmd, handler)) @@ -128,6 +130,12 @@ class ProvisionerBot: self.handle_json_file )) + # 注册回调查询处理器 (InlineKeyboard) + self.app.add_handler(CallbackQueryHandler( + self.callback_keys_usage, + pattern="^keys_usage:" + )) + # 注册定时检查任务 if TELEGRAM_CHECK_INTERVAL > 0 and AUTH_PROVIDER == "s2a": self.app.job_queue.run_repeating( @@ -189,6 +197,7 @@ class ProvisionerBot: BotCommand("reload", "重载配置文件"), BotCommand("clean", "清理数据文件"), BotCommand("dashboard", "查看 S2A 仪表盘"), + BotCommand("keys_usage", "查看 API 密钥用量"), BotCommand("stock", "查看账号库存"), BotCommand("s2a_config", "配置 S2A 参数"), BotCommand("import", "导入账号到 team.json"), @@ -231,6 +240,7 @@ class ProvisionerBot: 📊 S2A 专属: /dashboard - 查看 S2A 仪表盘 +/keys_usage - 查看 API 密钥用量 /stock - 查看账号库存 /s2a_config - 配置 S2A 参数 @@ -1184,6 +1194,125 @@ class ProvisionerBot: except Exception as e: await update.message.reply_text(f"❌ 错误: {e}") + @admin_only + async def cmd_keys_usage(self, update: Update, context: ContextTypes.DEFAULT_TYPE): + """查看 API 密钥用量 - 显示时间选择菜单""" + if AUTH_PROVIDER != "s2a": + await update.message.reply_text( + f"⚠️ 密钥用量查询仅支持 S2A 模式\n" + f"当前模式: {AUTH_PROVIDER}" + ) + return + + # 创建时间选择按钮 + keyboard = [ + [ + InlineKeyboardButton("📅 今天", callback_data="keys_usage:today"), + InlineKeyboardButton("📅 昨天", callback_data="keys_usage:yesterday"), + ], + [ + InlineKeyboardButton("📆 近 7 天", callback_data="keys_usage:7d"), + InlineKeyboardButton("📆 近 14 天", callback_data="keys_usage:14d"), + ], + [ + InlineKeyboardButton("📆 近 30 天", callback_data="keys_usage:30d"), + InlineKeyboardButton("📆 本月", callback_data="keys_usage:this_month"), + ], + [ + InlineKeyboardButton("📆 上月", callback_data="keys_usage:last_month"), + ], + ] + reply_markup = InlineKeyboardMarkup(keyboard) + + await update.message.reply_text( + "🔑 API 密钥用量查询\n\n请选择时间范围:", + reply_markup=reply_markup, + parse_mode="HTML" + ) + + async def callback_keys_usage(self, update: Update, context: ContextTypes.DEFAULT_TYPE): + """处理密钥用量查询的回调""" + query = update.callback_query + await query.answer() + + # 验证权限 + user_id = update.effective_user.id + if user_id not in TELEGRAM_ADMIN_CHAT_IDS: + await query.edit_message_text("⛔ 无权限") + return + + from datetime import datetime, timedelta + + # 解析回调数据 + data = query.data.replace("keys_usage:", "") + today = datetime.now() + + if data == "today": + start_date = today.strftime("%Y-%m-%d") + end_date = today.strftime("%Y-%m-%d") + period_text = "今天" + elif data == "yesterday": + yesterday = today - timedelta(days=1) + start_date = yesterday.strftime("%Y-%m-%d") + end_date = yesterday.strftime("%Y-%m-%d") + period_text = "昨天" + elif data == "7d": + start_date = (today - timedelta(days=6)).strftime("%Y-%m-%d") + end_date = today.strftime("%Y-%m-%d") + period_text = "近 7 天" + elif data == "14d": + start_date = (today - timedelta(days=13)).strftime("%Y-%m-%d") + end_date = today.strftime("%Y-%m-%d") + period_text = "近 14 天" + elif data == "30d": + start_date = (today - timedelta(days=29)).strftime("%Y-%m-%d") + end_date = today.strftime("%Y-%m-%d") + period_text = "近 30 天" + elif data == "this_month": + start_date = today.replace(day=1).strftime("%Y-%m-%d") + end_date = today.strftime("%Y-%m-%d") + period_text = "本月" + elif data == "last_month": + first_of_this_month = today.replace(day=1) + last_month_end = first_of_this_month - timedelta(days=1) + last_month_start = last_month_end.replace(day=1) + start_date = last_month_start.strftime("%Y-%m-%d") + end_date = last_month_end.strftime("%Y-%m-%d") + period_text = "上月" + else: + await query.edit_message_text("❌ 未知的时间选项") + return + + await query.edit_message_text(f"⏳ 正在获取密钥用量 ({period_text})...") + + try: + keys = s2a_get_keys_with_usage(start_date, end_date) + if keys is not None: + text = format_keys_usage(keys, period_text) + # 消息太长时分段发送 + if len(text) > 4000: + # 先更新原消息 + await query.edit_message_text(text[:4000], parse_mode="HTML") + # 剩余部分新消息发送 + remaining = text[4000:] + while remaining: + chunk = remaining[:4000] + remaining = remaining[4000:] + await context.bot.send_message( + chat_id=update.effective_chat.id, + text=chunk, + parse_mode="HTML" + ) + else: + await query.edit_message_text(text, parse_mode="HTML") + else: + await query.edit_message_text( + "❌ 获取密钥用量失败\n" + "请检查 S2A 配置和 API 连接" + ) + except Exception as e: + await query.edit_message_text(f"❌ 错误: {e}") + @admin_only async def cmd_stock(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """查看账号存货"""