diff --git a/s2a_service.py b/s2a_service.py
index 857ff35..1f44758 100644
--- a/s2a_service.py
+++ b/s2a_service.py
@@ -814,3 +814,224 @@ def s2a_batch_import_accounts(
progress_callback(i + 1, total, email, status)
return results
+
+
+# ==================== API 密钥用量查询 ====================
+def s2a_get_api_keys(page: int = 1, page_size: int = 50, timezone: str = "Asia/Shanghai") -> Optional[Dict[str, Any]]:
+ """获取 API 密钥列表
+
+ Args:
+ page: 页码
+ page_size: 每页数量
+ timezone: 时区
+
+ Returns:
+ dict: API 响应数据 或 None
+ """
+ if not S2A_API_BASE or (not S2A_ADMIN_KEY and not S2A_ADMIN_TOKEN):
+ return None
+
+ headers = build_s2a_headers()
+
+ try:
+ response = http_session.get(
+ f"{S2A_API_BASE}/api/v1/keys",
+ headers=headers,
+ params={"page": page, "page_size": page_size, "timezone": timezone},
+ timeout=REQUEST_TIMEOUT
+ )
+
+ if response.status_code == 200:
+ result = response.json()
+ if result.get("code") == 0:
+ return result.get("data", {})
+ else:
+ log.warning(f"S2A 获取密钥列表失败: {result.get('message', 'Unknown error')}")
+ else:
+ log.warning(f"S2A 获取密钥列表失败: HTTP {response.status_code}")
+
+ except Exception as e:
+ log.warning(f"S2A 获取密钥列表异常: {e}")
+
+ return None
+
+
+def s2a_get_key_usage_stats(
+ api_key_id: int,
+ start_date: str,
+ end_date: str,
+ timezone: str = "Asia/Shanghai"
+) -> Optional[Dict[str, Any]]:
+ """获取单个 API 密钥的详细用量统计
+
+ Args:
+ api_key_id: 密钥 ID
+ start_date: 开始日期 (YYYY-MM-DD)
+ end_date: 结束日期 (YYYY-MM-DD)
+ timezone: 时区
+
+ Returns:
+ dict: 用量统计数据 或 None
+ """
+ if not S2A_API_BASE or (not S2A_ADMIN_KEY and not S2A_ADMIN_TOKEN):
+ return None
+
+ headers = build_s2a_headers()
+
+ try:
+ response = http_session.get(
+ f"{S2A_API_BASE}/api/v1/admin/usage/stats",
+ headers=headers,
+ params={
+ "api_key_id": api_key_id,
+ "start_date": start_date,
+ "end_date": end_date,
+ "timezone": timezone
+ },
+ timeout=REQUEST_TIMEOUT
+ )
+
+ if response.status_code == 200:
+ result = response.json()
+ if result.get("code") == 0:
+ return result.get("data", {})
+ else:
+ log.warning(f"S2A 获取密钥用量失败: {result.get('message', 'Unknown error')}")
+ else:
+ log.warning(f"S2A 获取密钥用量失败: HTTP {response.status_code}")
+
+ except Exception as e:
+ log.warning(f"S2A 获取密钥用量异常: {e}")
+
+ return None
+
+
+def s2a_get_keys_with_usage(start_date: str = None, end_date: str = None, timezone: str = "Asia/Shanghai") -> Optional[List[Dict[str, Any]]]:
+ """获取密钥列表并合并用量数据
+
+ Args:
+ start_date: 开始日期 (YYYY-MM-DD),默认今日
+ end_date: 结束日期 (YYYY-MM-DD),默认今日
+ timezone: 时区
+
+ Returns:
+ list: 包含用量信息的密钥列表 或 None
+ """
+ from datetime import datetime
+
+ # 获取密钥列表
+ keys_data = s2a_get_api_keys(page=1, page_size=100, timezone=timezone)
+ if not keys_data:
+ return None
+
+ keys = keys_data.get("items", [])
+ if not keys:
+ return []
+
+ # 默认今日
+ if not start_date:
+ start_date = datetime.now().strftime("%Y-%m-%d")
+ if not end_date:
+ end_date = datetime.now().strftime("%Y-%m-%d")
+
+ # 获取每个密钥的用量
+ for key in keys:
+ key_id = key.get("id")
+ if key_id:
+ usage = s2a_get_key_usage_stats(key_id, start_date, end_date, timezone)
+ key["usage"] = usage if usage else {}
+ else:
+ key["usage"] = {}
+
+ return keys
+
+
+def format_keys_usage(keys: List[Dict[str, Any]], period_text: str = "今日") -> str:
+ """格式化密钥用量为可读文本
+
+ Args:
+ keys: 密钥列表 (包含 usage 字段)
+ period_text: 时间段描述
+
+ Returns:
+ str: 格式化后的文本
+ """
+ if not keys:
+ return "暂无密钥数据"
+
+ def fmt_cost(n):
+ """格式化费用"""
+ if n >= 1000:
+ return f"${n:,.2f}"
+ elif n >= 1:
+ return f"${n:.2f}"
+ return f"${n:.4f}"
+
+ def fmt_tokens(n):
+ """格式化 Token 数量"""
+ if n >= 1_000_000_000:
+ return f"{n / 1_000_000_000:.2f}B"
+ elif n >= 1_000_000:
+ return f"{n / 1_000_000:.2f}M"
+ elif n >= 1_000:
+ return f"{n / 1_000:.1f}K"
+ return str(int(n))
+
+ def fmt_duration(ms):
+ """格式化耗时"""
+ if ms >= 1000:
+ return f"{ms / 1000:.2f}s"
+ return f"{ms:.0f}ms"
+
+ lines = [f"🔑 API 密钥用量 ({period_text})", ""]
+
+ total_requests = 0
+ total_tokens = 0
+ total_cost = 0
+
+ for key in keys:
+ name = key.get("name", "未命名")
+ key_str = key.get("key", "")
+ status = key.get("status", "unknown")
+ group = key.get("group", {})
+ group_name = group.get("name", "默认") if group else "默认"
+
+ usage = key.get("usage", {})
+ requests = usage.get("total_requests", 0)
+ tokens = usage.get("total_tokens", 0)
+ input_tokens = usage.get("total_input_tokens", 0)
+ output_tokens = usage.get("total_output_tokens", 0)
+ cache_tokens = usage.get("total_cache_tokens", 0)
+ cost = usage.get("total_actual_cost", 0)
+ avg_duration = usage.get("average_duration_ms", 0)
+
+ total_requests += requests
+ total_tokens += tokens
+ total_cost += cost
+
+ # 状态图标
+ status_icon = "✅" if status == "active" else "⏸️"
+
+ # 密钥脱敏显示
+ if len(key_str) > 12:
+ key_display = f"{key_str[:6]}...{key_str[-4:]}"
+ else:
+ key_display = key_str[:8] + "..." if key_str else "N/A"
+
+ lines.append(f"{status_icon} {name} ({group_name})")
+ lines.append(f" 密钥: {key_display}")
+ lines.append(f" 请求: {requests:,} | 耗时: {fmt_duration(avg_duration)}")
+ lines.append(f" Token: {fmt_tokens(tokens)} (入:{fmt_tokens(input_tokens)} 出:{fmt_tokens(output_tokens)})")
+ if cache_tokens > 0:
+ lines.append(f" 缓存: {fmt_tokens(cache_tokens)}")
+ lines.append(f" 费用: {fmt_cost(cost)}")
+ lines.append("")
+
+ # 汇总
+ lines.append(f"📊 {period_text}汇总")
+ lines.append(f" 密钥数: {len(keys)}")
+ lines.append(f" 总请求: {total_requests:,}")
+ lines.append(f" 总 Token: {fmt_tokens(total_tokens)}")
+ lines.append(f" 总费用: {fmt_cost(total_cost)}")
+
+ return "\n".join(lines)
diff --git a/telegram_bot.py b/telegram_bot.py
index ac81954..a581d46 100644
--- a/telegram_bot.py
+++ b/telegram_bot.py
@@ -8,11 +8,12 @@ from functools import wraps
from pathlib import Path
from typing import Optional
-from telegram import Update, Bot, BotCommand
+from telegram import Update, Bot, BotCommand, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import (
Application,
CommandHandler,
MessageHandler,
+ CallbackQueryHandler,
filters,
ContextTypes,
)
@@ -50,7 +51,7 @@ from config import (
)
from utils import load_team_tracker, get_all_incomplete_accounts
from bot_notifier import BotNotifier, set_notifier, progress_finish
-from s2a_service import s2a_get_dashboard_stats, format_dashboard_stats
+from s2a_service import s2a_get_dashboard_stats, format_dashboard_stats, s2a_get_keys_with_usage, format_keys_usage
from email_service import gptmail_service, unified_create_email
from logger import log
@@ -118,6 +119,7 @@ class ProvisionerBot:
("reload", self.cmd_reload),
("s2a_config", self.cmd_s2a_config),
("clean", self.cmd_clean),
+ ("keys_usage", self.cmd_keys_usage),
]
for cmd, handler in handlers:
self.app.add_handler(CommandHandler(cmd, handler))
@@ -128,6 +130,12 @@ class ProvisionerBot:
self.handle_json_file
))
+ # 注册回调查询处理器 (InlineKeyboard)
+ self.app.add_handler(CallbackQueryHandler(
+ self.callback_keys_usage,
+ pattern="^keys_usage:"
+ ))
+
# 注册定时检查任务
if TELEGRAM_CHECK_INTERVAL > 0 and AUTH_PROVIDER == "s2a":
self.app.job_queue.run_repeating(
@@ -189,6 +197,7 @@ class ProvisionerBot:
BotCommand("reload", "重载配置文件"),
BotCommand("clean", "清理数据文件"),
BotCommand("dashboard", "查看 S2A 仪表盘"),
+ BotCommand("keys_usage", "查看 API 密钥用量"),
BotCommand("stock", "查看账号库存"),
BotCommand("s2a_config", "配置 S2A 参数"),
BotCommand("import", "导入账号到 team.json"),
@@ -231,6 +240,7 @@ class ProvisionerBot:
📊 S2A 专属:
/dashboard - 查看 S2A 仪表盘
+/keys_usage - 查看 API 密钥用量
/stock - 查看账号库存
/s2a_config - 配置 S2A 参数
@@ -1184,6 +1194,125 @@ class ProvisionerBot:
except Exception as e:
await update.message.reply_text(f"❌ 错误: {e}")
+ @admin_only
+ async def cmd_keys_usage(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
+ """查看 API 密钥用量 - 显示时间选择菜单"""
+ if AUTH_PROVIDER != "s2a":
+ await update.message.reply_text(
+ f"⚠️ 密钥用量查询仅支持 S2A 模式\n"
+ f"当前模式: {AUTH_PROVIDER}"
+ )
+ return
+
+ # 创建时间选择按钮
+ keyboard = [
+ [
+ InlineKeyboardButton("📅 今天", callback_data="keys_usage:today"),
+ InlineKeyboardButton("📅 昨天", callback_data="keys_usage:yesterday"),
+ ],
+ [
+ InlineKeyboardButton("📆 近 7 天", callback_data="keys_usage:7d"),
+ InlineKeyboardButton("📆 近 14 天", callback_data="keys_usage:14d"),
+ ],
+ [
+ InlineKeyboardButton("📆 近 30 天", callback_data="keys_usage:30d"),
+ InlineKeyboardButton("📆 本月", callback_data="keys_usage:this_month"),
+ ],
+ [
+ InlineKeyboardButton("📆 上月", callback_data="keys_usage:last_month"),
+ ],
+ ]
+ reply_markup = InlineKeyboardMarkup(keyboard)
+
+ await update.message.reply_text(
+ "🔑 API 密钥用量查询\n\n请选择时间范围:",
+ reply_markup=reply_markup,
+ parse_mode="HTML"
+ )
+
+ async def callback_keys_usage(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
+ """处理密钥用量查询的回调"""
+ query = update.callback_query
+ await query.answer()
+
+ # 验证权限
+ user_id = update.effective_user.id
+ if user_id not in TELEGRAM_ADMIN_CHAT_IDS:
+ await query.edit_message_text("⛔ 无权限")
+ return
+
+ from datetime import datetime, timedelta
+
+ # 解析回调数据
+ data = query.data.replace("keys_usage:", "")
+ today = datetime.now()
+
+ if data == "today":
+ start_date = today.strftime("%Y-%m-%d")
+ end_date = today.strftime("%Y-%m-%d")
+ period_text = "今天"
+ elif data == "yesterday":
+ yesterday = today - timedelta(days=1)
+ start_date = yesterday.strftime("%Y-%m-%d")
+ end_date = yesterday.strftime("%Y-%m-%d")
+ period_text = "昨天"
+ elif data == "7d":
+ start_date = (today - timedelta(days=6)).strftime("%Y-%m-%d")
+ end_date = today.strftime("%Y-%m-%d")
+ period_text = "近 7 天"
+ elif data == "14d":
+ start_date = (today - timedelta(days=13)).strftime("%Y-%m-%d")
+ end_date = today.strftime("%Y-%m-%d")
+ period_text = "近 14 天"
+ elif data == "30d":
+ start_date = (today - timedelta(days=29)).strftime("%Y-%m-%d")
+ end_date = today.strftime("%Y-%m-%d")
+ period_text = "近 30 天"
+ elif data == "this_month":
+ start_date = today.replace(day=1).strftime("%Y-%m-%d")
+ end_date = today.strftime("%Y-%m-%d")
+ period_text = "本月"
+ elif data == "last_month":
+ first_of_this_month = today.replace(day=1)
+ last_month_end = first_of_this_month - timedelta(days=1)
+ last_month_start = last_month_end.replace(day=1)
+ start_date = last_month_start.strftime("%Y-%m-%d")
+ end_date = last_month_end.strftime("%Y-%m-%d")
+ period_text = "上月"
+ else:
+ await query.edit_message_text("❌ 未知的时间选项")
+ return
+
+ await query.edit_message_text(f"⏳ 正在获取密钥用量 ({period_text})...")
+
+ try:
+ keys = s2a_get_keys_with_usage(start_date, end_date)
+ if keys is not None:
+ text = format_keys_usage(keys, period_text)
+ # 消息太长时分段发送
+ if len(text) > 4000:
+ # 先更新原消息
+ await query.edit_message_text(text[:4000], parse_mode="HTML")
+ # 剩余部分新消息发送
+ remaining = text[4000:]
+ while remaining:
+ chunk = remaining[:4000]
+ remaining = remaining[4000:]
+ await context.bot.send_message(
+ chat_id=update.effective_chat.id,
+ text=chunk,
+ parse_mode="HTML"
+ )
+ else:
+ await query.edit_message_text(text, parse_mode="HTML")
+ else:
+ await query.edit_message_text(
+ "❌ 获取密钥用量失败\n"
+ "请检查 S2A 配置和 API 连接"
+ )
+ except Exception as e:
+ await query.edit_message_text(f"❌ 错误: {e}")
+
@admin_only
async def cmd_stock(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""查看账号存货"""