This commit is contained in:
2026-01-20 19:34:13 +08:00
parent 24905a773c
commit 828757077d
2 changed files with 352 additions and 2 deletions

View File

@@ -814,3 +814,224 @@ def s2a_batch_import_accounts(
progress_callback(i + 1, total, email, status)
return results
# ==================== API 密钥用量查询 ====================
def s2a_get_api_keys(page: int = 1, page_size: int = 50, timezone: str = "Asia/Shanghai") -> Optional[Dict[str, Any]]:
"""获取 API 密钥列表
Args:
page: 页码
page_size: 每页数量
timezone: 时区
Returns:
dict: API 响应数据 或 None
"""
if not S2A_API_BASE or (not S2A_ADMIN_KEY and not S2A_ADMIN_TOKEN):
return None
headers = build_s2a_headers()
try:
response = http_session.get(
f"{S2A_API_BASE}/api/v1/keys",
headers=headers,
params={"page": page, "page_size": page_size, "timezone": timezone},
timeout=REQUEST_TIMEOUT
)
if response.status_code == 200:
result = response.json()
if result.get("code") == 0:
return result.get("data", {})
else:
log.warning(f"S2A 获取密钥列表失败: {result.get('message', 'Unknown error')}")
else:
log.warning(f"S2A 获取密钥列表失败: HTTP {response.status_code}")
except Exception as e:
log.warning(f"S2A 获取密钥列表异常: {e}")
return None
def s2a_get_key_usage_stats(
api_key_id: int,
start_date: str,
end_date: str,
timezone: str = "Asia/Shanghai"
) -> Optional[Dict[str, Any]]:
"""获取单个 API 密钥的详细用量统计
Args:
api_key_id: 密钥 ID
start_date: 开始日期 (YYYY-MM-DD)
end_date: 结束日期 (YYYY-MM-DD)
timezone: 时区
Returns:
dict: 用量统计数据 或 None
"""
if not S2A_API_BASE or (not S2A_ADMIN_KEY and not S2A_ADMIN_TOKEN):
return None
headers = build_s2a_headers()
try:
response = http_session.get(
f"{S2A_API_BASE}/api/v1/admin/usage/stats",
headers=headers,
params={
"api_key_id": api_key_id,
"start_date": start_date,
"end_date": end_date,
"timezone": timezone
},
timeout=REQUEST_TIMEOUT
)
if response.status_code == 200:
result = response.json()
if result.get("code") == 0:
return result.get("data", {})
else:
log.warning(f"S2A 获取密钥用量失败: {result.get('message', 'Unknown error')}")
else:
log.warning(f"S2A 获取密钥用量失败: HTTP {response.status_code}")
except Exception as e:
log.warning(f"S2A 获取密钥用量异常: {e}")
return None
def s2a_get_keys_with_usage(start_date: str = None, end_date: str = None, timezone: str = "Asia/Shanghai") -> Optional[List[Dict[str, Any]]]:
"""获取密钥列表并合并用量数据
Args:
start_date: 开始日期 (YYYY-MM-DD),默认今日
end_date: 结束日期 (YYYY-MM-DD),默认今日
timezone: 时区
Returns:
list: 包含用量信息的密钥列表 或 None
"""
from datetime import datetime
# 获取密钥列表
keys_data = s2a_get_api_keys(page=1, page_size=100, timezone=timezone)
if not keys_data:
return None
keys = keys_data.get("items", [])
if not keys:
return []
# 默认今日
if not start_date:
start_date = datetime.now().strftime("%Y-%m-%d")
if not end_date:
end_date = datetime.now().strftime("%Y-%m-%d")
# 获取每个密钥的用量
for key in keys:
key_id = key.get("id")
if key_id:
usage = s2a_get_key_usage_stats(key_id, start_date, end_date, timezone)
key["usage"] = usage if usage else {}
else:
key["usage"] = {}
return keys
def format_keys_usage(keys: List[Dict[str, Any]], period_text: str = "今日") -> str:
"""格式化密钥用量为可读文本
Args:
keys: 密钥列表 (包含 usage 字段)
period_text: 时间段描述
Returns:
str: 格式化后的文本
"""
if not keys:
return "暂无密钥数据"
def fmt_cost(n):
"""格式化费用"""
if n >= 1000:
return f"${n:,.2f}"
elif n >= 1:
return f"${n:.2f}"
return f"${n:.4f}"
def fmt_tokens(n):
"""格式化 Token 数量"""
if n >= 1_000_000_000:
return f"{n / 1_000_000_000:.2f}B"
elif n >= 1_000_000:
return f"{n / 1_000_000:.2f}M"
elif n >= 1_000:
return f"{n / 1_000:.1f}K"
return str(int(n))
def fmt_duration(ms):
"""格式化耗时"""
if ms >= 1000:
return f"{ms / 1000:.2f}s"
return f"{ms:.0f}ms"
lines = [f"<b>🔑 API 密钥用量 ({period_text})</b>", ""]
total_requests = 0
total_tokens = 0
total_cost = 0
for key in keys:
name = key.get("name", "未命名")
key_str = key.get("key", "")
status = key.get("status", "unknown")
group = key.get("group", {})
group_name = group.get("name", "默认") if group else "默认"
usage = key.get("usage", {})
requests = usage.get("total_requests", 0)
tokens = usage.get("total_tokens", 0)
input_tokens = usage.get("total_input_tokens", 0)
output_tokens = usage.get("total_output_tokens", 0)
cache_tokens = usage.get("total_cache_tokens", 0)
cost = usage.get("total_actual_cost", 0)
avg_duration = usage.get("average_duration_ms", 0)
total_requests += requests
total_tokens += tokens
total_cost += cost
# 状态图标
status_icon = "" if status == "active" else "⏸️"
# 密钥脱敏显示
if len(key_str) > 12:
key_display = f"{key_str[:6]}...{key_str[-4:]}"
else:
key_display = key_str[:8] + "..." if key_str else "N/A"
lines.append(f"{status_icon} <b>{name}</b> ({group_name})")
lines.append(f" 密钥: <code>{key_display}</code>")
lines.append(f" 请求: {requests:,} | 耗时: {fmt_duration(avg_duration)}")
lines.append(f" Token: {fmt_tokens(tokens)} (入:{fmt_tokens(input_tokens)} 出:{fmt_tokens(output_tokens)})")
if cache_tokens > 0:
lines.append(f" 缓存: {fmt_tokens(cache_tokens)}")
lines.append(f" 费用: {fmt_cost(cost)}")
lines.append("")
# 汇总
lines.append(f"<b>📊 {period_text}汇总</b>")
lines.append(f" 密钥数: {len(keys)}")
lines.append(f" 总请求: {total_requests:,}")
lines.append(f" 总 Token: {fmt_tokens(total_tokens)}")
lines.append(f" 总费用: {fmt_cost(total_cost)}")
return "\n".join(lines)