# ==================== S2A (Sub2API) 服务模块 ====================
# 处理 Sub2API 系统相关功能 (OpenAI OAuth 授权、账号入库)
#
# S2A 与 CPA/CRS 的关键差异:
# - 认证方式: S2A 支持 Admin API Key (x-api-key) 或 JWT Token (Bearer)
# - 会话标识: S2A 使用 session_id
# - 授权流程: S2A 生成授权 URL -> 用户授权 -> 提交 code 换取 token -> 创建账号
# - 账号入库: S2A 可一步完成 (create-from-oauth) 或分步完成 (exchange + add_account)
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from urllib.parse import urlparse, parse_qs
from typing import Optional, Tuple, Dict, List, Any
from config import (
S2A_API_BASE,
S2A_ADMIN_KEY,
S2A_ADMIN_TOKEN,
S2A_CONCURRENCY,
S2A_PRIORITY,
S2A_GROUP_IDS,
S2A_GROUP_NAMES,
REQUEST_TIMEOUT,
USER_AGENT,
)
from logger import log
# ==================== 分组 ID 缓存 ====================
_resolved_group_ids = None # 缓存解析后的 group_ids
def create_session_with_retry() -> requests.Session:
"""创建带重试机制的 HTTP Session"""
session = requests.Session()
retry_strategy = Retry(
total=5,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["HEAD", "GET", "POST", "PUT", "DELETE", "OPTIONS"]
)
adapter = HTTPAdapter(max_retries=retry_strategy)
session.mount("https://", adapter)
session.mount("http://", adapter)
return session
http_session = create_session_with_retry()
def build_s2a_headers() -> Dict[str, str]:
"""构建 S2A API 请求的 Headers
优先使用 Admin API Key,如果未配置则使用 JWT Token
"""
headers = {
"accept": "application/json",
"content-type": "application/json",
"user-agent": USER_AGENT
}
if S2A_ADMIN_KEY:
headers["x-api-key"] = S2A_ADMIN_KEY
elif S2A_ADMIN_TOKEN:
headers["authorization"] = f"Bearer {S2A_ADMIN_TOKEN}"
return headers
def get_auth_method() -> Tuple[str, str]:
"""获取当前使用的认证方式
Returns:
tuple: (method_name, credential_preview)
"""
if S2A_ADMIN_KEY:
preview = S2A_ADMIN_KEY[:16] + "..." if len(S2A_ADMIN_KEY) > 16 else S2A_ADMIN_KEY
return "Admin API Key", preview
elif S2A_ADMIN_TOKEN:
preview = S2A_ADMIN_TOKEN[:16] + "..." if len(S2A_ADMIN_TOKEN) > 16 else S2A_ADMIN_TOKEN
return "JWT Token", preview
return "None", ""
# ==================== 分组管理 ====================
def s2a_get_groups() -> List[Dict[str, Any]]:
"""获取所有分组列表"""
headers = build_s2a_headers()
try:
response = http_session.get(
f"{S2A_API_BASE}/admin/groups",
headers=headers,
params={"page": 1, "page_size": 100},
timeout=REQUEST_TIMEOUT
)
if response.status_code == 200:
result = response.json()
if result.get("code") == 0:
data = result.get("data", {})
return data.get("items", [])
except Exception as e:
log.warning(f"S2A 获取分组列表异常: {e}")
return []
def s2a_resolve_group_ids(silent: bool = False) -> List[int]:
"""解析分组 ID 列表
优先使用 S2A_GROUP_IDS (直接配置的 ID)
如果未配置,则通过 S2A_GROUP_NAMES 查询 API 获取对应的 ID
Args:
silent: 是否静默模式 (不输出日志)
"""
global _resolved_group_ids
# 使用缓存
if _resolved_group_ids is not None:
return _resolved_group_ids
# 优先使用直接配置的 group_ids
if S2A_GROUP_IDS:
_resolved_group_ids = S2A_GROUP_IDS
return _resolved_group_ids
# 通过 group_names 查询获取 ID
if not S2A_GROUP_NAMES:
_resolved_group_ids = []
return _resolved_group_ids
groups = s2a_get_groups()
if not groups:
if not silent:
log.warning("S2A 无法获取分组列表,group_names 解析失败")
_resolved_group_ids = []
return _resolved_group_ids
# 构建 name -> id 映射
name_to_id = {g.get("name", "").lower(): g.get("id") for g in groups}
resolved = []
not_found = []
for name in S2A_GROUP_NAMES:
group_id = name_to_id.get(name.lower())
if group_id is not None:
resolved.append(group_id)
else:
not_found.append(name)
if not_found and not silent:
log.warning(f"S2A 分组未找到: {', '.join(not_found)}")
_resolved_group_ids = resolved
return _resolved_group_ids
def get_s2a_group_ids() -> List[int]:
"""获取当前配置的分组 ID 列表 (供外部调用)"""
return s2a_resolve_group_ids()
# ==================== 连接验证 ====================
def s2a_verify_connection() -> Tuple[bool, str]:
"""验证 S2A 服务连接和认证有效性
Returns:
tuple: (is_valid, message)
"""
if not S2A_API_BASE:
return False, "S2A_API_BASE 未配置"
if not S2A_ADMIN_KEY and not S2A_ADMIN_TOKEN:
return False, "S2A_ADMIN_KEY 或 S2A_ADMIN_TOKEN 未配置"
auth_method, auth_preview = get_auth_method()
headers = build_s2a_headers()
try:
# 使用 /admin/groups 接口验证连接 (支持 x-api-key 认证)
response = http_session.get(
f"{S2A_API_BASE}/admin/groups",
headers=headers,
params={"page": 1, "page_size": 1},
timeout=REQUEST_TIMEOUT
)
if response.status_code == 200:
result = response.json()
if result.get("code") == 0:
# 解析分组配置
group_ids = s2a_resolve_group_ids(silent=True)
group_info = ""
if S2A_GROUP_NAMES:
group_info = f", 分组: {S2A_GROUP_NAMES} -> {group_ids}"
elif S2A_GROUP_IDS:
group_info = f", 分组 ID: {group_ids}"
return True, f"认证有效 (方式: {auth_method}{group_info})"
else:
return False, f"API 返回失败: {result.get('message', 'Unknown error')}"
elif response.status_code == 401:
return False, f"{auth_method} 无效或已过期 (HTTP 401)"
elif response.status_code == 403:
return False, f"{auth_method} 权限不足 (HTTP 403)"
else:
return False, f"服务异常 (HTTP {response.status_code})"
except requests.exceptions.Timeout:
return False, f"服务连接超时 ({S2A_API_BASE})"
except requests.exceptions.ConnectionError:
return False, f"无法连接到服务 ({S2A_API_BASE})"
except Exception as e:
return False, f"验证异常: {str(e)}"
# ==================== OAuth 授权 ====================
def s2a_generate_auth_url(proxy_id: Optional[int] = None) -> Tuple[Optional[str], Optional[str]]:
"""生成 OpenAI OAuth 授权 URL
Returns:
tuple: (auth_url, session_id) 或 (None, None)
"""
headers = build_s2a_headers()
payload = {}
if proxy_id is not None:
payload["proxy_id"] = proxy_id
try:
response = http_session.post(
f"{S2A_API_BASE}/admin/openai/generate-auth-url",
headers=headers,
json=payload,
timeout=REQUEST_TIMEOUT
)
if response.status_code == 200:
result = response.json()
if result.get("code") == 0:
data = result.get("data", {})
auth_url = data.get("auth_url")
session_id = data.get("session_id")
if auth_url and session_id:
log.success(f"生成 S2A 授权链接成功 (会话: {session_id[:16]}...)")
return auth_url, session_id
log.error(f"生成 S2A 授权链接失败: HTTP {response.status_code}")
return None, None
except Exception as e:
log.error(f"S2A API 异常: {e}")
return None, None
def s2a_verify_account_in_pool(email: str, timeout: int = 10) -> Tuple[bool, Optional[Dict[str, Any]]]:
"""验证账号是否已成功入库到 S2A 账号池
通过请求 /admin/accounts 接口,检查第一个账号的 name 是否匹配邮箱
Args:
email: 要验证的邮箱地址
timeout: 超时时间 (秒)
Returns:
tuple: (是否成功, 账号数据或None)
"""
headers = build_s2a_headers()
try:
# 使用 search 参数搜索该邮箱
params = {
"page": 1,
"page_size": 20,
"platform": "",
"type": "",
"status": "",
"search": email,
"timezone": "Asia/Shanghai"
}
response = http_session.get(
f"{S2A_API_BASE}/admin/accounts",
headers=headers,
params=params,
timeout=timeout
)
if response.status_code == 200:
result = response.json()
if result.get("code") == 0:
data = result.get("data", {})
items = data.get("items", [])
if items:
# 检查第一个账号的 name 是否匹配
first_account = items[0]
account_name = first_account.get("name", "")
# 邮箱匹配检查 (忽略大小写)
if email.lower() in account_name.lower() or account_name.lower() in email.lower():
return True, first_account
return False, None
else:
log.warning(f"S2A 验证账号失败: {result.get('message', '未知错误')}")
else:
log.warning(f"S2A 验证账号失败: HTTP {response.status_code}")
return False, None
except Exception as e:
log.warning(f"S2A 验证账号异常: {e}")
return False, None
def s2a_create_account_from_oauth(
code: str,
session_id: str,
name: str = "",
proxy_id: Optional[int] = None
) -> Optional[Dict[str, Any]]:
"""一步完成:用授权码换取 token 并创建账号
Args:
code: 授权码
session_id: 会话 ID
name: 账号名称 (可选)
proxy_id: 代理 ID (可选)
Returns:
dict: 账号数据 或 None
"""
headers = build_s2a_headers()
payload = {
"session_id": session_id,
"code": code,
"concurrency": S2A_CONCURRENCY,
"priority": S2A_PRIORITY,
}
# 获取完整邮箱用于后续验证
full_email = name if "@" in name else ""
if name:
payload["name"] = name
if proxy_id is not None:
payload["proxy_id"] = proxy_id
group_ids = get_s2a_group_ids()
if group_ids:
payload["group_ids"] = group_ids
try:
log.step("正在提交授权码到 S2A...")
response = http_session.post(
f"{S2A_API_BASE}/admin/openai/create-from-oauth",
headers=headers,
json=payload,
timeout=REQUEST_TIMEOUT
)
if response.status_code == 200:
result = response.json()
if result.get("code") == 0:
account_data = result.get("data", {})
account_id = account_data.get("id")
account_name = account_data.get("name")
log.success(f"S2A 授权成功 (ID: {account_id}, 名称: {account_name})")
# 验证账号是否成功入库
if full_email or account_name:
verify_email = full_email or account_name
log.step(f"正在验证账号入库状态...")
verified, verified_data = s2a_verify_account_in_pool(verify_email)
if verified:
verified_id = verified_data.get("id", "")
verified_name = verified_data.get("name", "")
log.success(f"✅ 账号入库验证成功 (ID: {verified_id}, 名称: {verified_name})")
else:
log.warning(f"⚠️ 账号入库验证失败,但授权已成功")
return account_data
else:
error_msg = result.get('message', '未知错误')
log.error(f"S2A 账号创建失败: {error_msg}")
else:
log.error(f"S2A 账号创建失败: HTTP {response.status_code}")
return None
except Exception as e:
log.error(f"S2A 创建账号异常: {e}")
return None
def s2a_add_account(
name: str,
token_info: Dict[str, Any],
proxy_id: Optional[int] = None
) -> Optional[Dict[str, Any]]:
"""将账号添加到 S2A 账号池
Args:
name: 账号名称 (通常是邮箱)
token_info: Token 信息 (包含 access_token, refresh_token, expires_at)
proxy_id: 代理 ID (可选)
Returns:
dict: 账号数据 或 None
"""
headers = build_s2a_headers()
credentials = {
"access_token": token_info.get("access_token"),
"refresh_token": token_info.get("refresh_token"),
"expires_at": token_info.get("expires_at"),
}
if token_info.get("id_token"):
credentials["id_token"] = token_info.get("id_token")
if token_info.get("email"):
credentials["email"] = token_info.get("email")
payload = {
"name": name,
"platform": "openai",
"type": "oauth",
"credentials": credentials,
"concurrency": S2A_CONCURRENCY,
"priority": S2A_PRIORITY,
"auto_pause_on_expired": True,
}
if proxy_id is not None:
payload["proxy_id"] = proxy_id
group_ids = get_s2a_group_ids()
if group_ids:
payload["group_ids"] = group_ids
try:
response = http_session.post(
f"{S2A_API_BASE}/admin/accounts",
headers=headers,
json=payload,
timeout=REQUEST_TIMEOUT
)
if response.status_code == 200:
result = response.json()
if result.get("code") == 0:
account_data = result.get("data", {})
account_id = account_data.get("id")
log.success(f"S2A 账号添加成功 (ID: {account_id}, Name: {name})")
return account_data
else:
log.error(f"S2A 添加账号失败: {result.get('message', 'Unknown error')}")
else:
log.error(f"S2A 添加账号失败: HTTP {response.status_code}")
return None
except Exception as e:
log.error(f"S2A 添加账号异常: {e}")
return None
# ==================== 账号管理 ====================
def s2a_get_accounts(platform: str = "openai") -> List[Dict[str, Any]]:
"""获取账号列表"""
headers = build_s2a_headers()
try:
params = {"platform": platform} if platform else {}
response = http_session.get(
f"{S2A_API_BASE}/admin/accounts",
headers=headers,
params=params,
timeout=REQUEST_TIMEOUT
)
if response.status_code == 200:
result = response.json()
if result.get("code") == 0:
data = result.get("data", {})
if isinstance(data, dict) and "items" in data:
return data.get("items", [])
elif isinstance(data, list):
return data
return []
except Exception as e:
log.warning(f"S2A 获取账号列表异常: {e}")
return []
def s2a_check_account_exists(email: str, platform: str = "openai") -> bool:
"""检查账号是否已存在"""
accounts = s2a_get_accounts(platform)
for account in accounts:
account_name = account.get("name", "").lower()
credentials = account.get("credentials", {})
account_email = credentials.get("email", "").lower()
if account_name == email.lower() or account_email == email.lower():
return True
return False
# ==================== 工具函数 ====================
def extract_code_from_url(url: str) -> Optional[str]:
"""从回调 URL 中提取授权码"""
if not url:
return None
try:
parsed = urlparse(url)
params = parse_qs(parsed.query)
return params.get("code", [None])[0]
except Exception as e:
log.error(f"解析 URL 失败: {e}")
return None
def is_s2a_callback_url(url: str) -> bool:
"""检查 URL 是否为 S2A 回调 URL"""
if not url:
return False
return "localhost:1455/auth/callback" in url and "code=" in url
# ==================== 仪表盘统计 ====================
def s2a_get_dashboard_stats(timezone: str = "Asia/Shanghai") -> Optional[Dict[str, Any]]:
"""获取 S2A 仪表盘统计数据
Args:
timezone: 时区 (默认 Asia/Shanghai)
Returns:
dict: 仪表盘数据 或 None
"""
if not S2A_API_BASE or (not S2A_ADMIN_KEY and not S2A_ADMIN_TOKEN):
return None
headers = build_s2a_headers()
try:
response = http_session.get(
f"{S2A_API_BASE}/admin/dashboard/stats",
headers=headers,
params={"timezone": timezone},
timeout=REQUEST_TIMEOUT
)
if response.status_code == 200:
result = response.json()
if result.get("code") == 0:
return result.get("data", {})
else:
log.warning(f"S2A 仪表盘获取失败: {result.get('message', 'Unknown error')}")
else:
log.warning(f"S2A 仪表盘获取失败: HTTP {response.status_code}")
except Exception as e:
log.warning(f"S2A 仪表盘获取异常: {e}")
return None
def format_dashboard_stats(stats: Dict[str, Any]) -> str:
"""格式化仪表盘统计数据为可读文本
Args:
stats: 仪表盘原始数据
Returns:
str: 格式化后的文本
"""
if not stats:
return "暂无数据"
def fmt_num(n):
"""格式化数字 (添加千分位)"""
if isinstance(n, float):
return f"{n:,.2f}"
return f"{n:,}"
def fmt_tokens(n):
"""格式化 Token 数量 (简化显示)"""
if n >= 1_000_000_000:
return f"{n / 1_000_000_000:.2f}B"
elif n >= 1_000_000:
return f"{n / 1_000_000:.2f}M"
elif n >= 1_000:
return f"{n / 1_000:.1f}K"
return str(n)
# 账号状态
total_accounts = stats.get("total_accounts", 0)
normal_accounts = stats.get("normal_accounts", 0)
error_accounts = stats.get("error_accounts", 0)
ratelimit_accounts = stats.get("ratelimit_accounts", 0)
overload_accounts = stats.get("overload_accounts", 0)
# 今日统计
today_requests = stats.get("today_requests", 0)
today_tokens = stats.get("today_tokens", 0)
today_cost = stats.get("today_cost", 0)
today_input = stats.get("today_input_tokens", 0)
today_output = stats.get("today_output_tokens", 0)
today_cache_read = stats.get("today_cache_read_tokens", 0)
# 总计
total_requests = stats.get("total_requests", 0)
total_tokens = stats.get("total_tokens", 0)
total_cost = stats.get("total_cost", 0)
# 实时状态
rpm = stats.get("rpm", 0)
tpm = stats.get("tpm", 0)
active_users = stats.get("active_users", 0)
avg_duration = stats.get("average_duration_ms", 0)
lines = [
"📊 S2A 仪表盘",
"",
"📦 账号状态",
f" 总计: {total_accounts} | 正常: {normal_accounts}",
f" 异常: {error_accounts} | 限流: {ratelimit_accounts}",
"",
"📅 今日统计",
f" 请求数: {fmt_num(today_requests)}",
f" Token: {fmt_tokens(today_tokens)}",
f" 输入: {fmt_tokens(today_input)} | 输出: {fmt_tokens(today_output)}",
f" 缓存: {fmt_tokens(today_cache_read)}",
f" 费用: ${fmt_num(today_cost)}",
"",
"📈 累计统计",
f" 请求数: {fmt_num(total_requests)}",
f" Token: {fmt_tokens(total_tokens)}",
f" 费用: ${fmt_num(total_cost)}",
"",
"⚡ 实时状态",
f" RPM: {rpm} | TPM: {fmt_num(tpm)}",
f" 活跃用户: {active_users}",
f" 平均延迟: {avg_duration:.0f}ms",
]
return "\n".join(lines)
# ==================== 批量导入账号 ====================
def s2a_import_account_with_token(
email: str,
access_token: str,
password: str = "",
proxy_id: Optional[int] = None
) -> Tuple[bool, str]:
"""使用 access_token 直接导入账号到 S2A
Args:
email: 账号邮箱
access_token: OpenAI access token (JWT)
password: 账号密码 (可选,用于备注)
proxy_id: 代理 ID (可选)
Returns:
tuple: (success, message)
"""
if not S2A_API_BASE or (not S2A_ADMIN_KEY and not S2A_ADMIN_TOKEN):
return False, "S2A not configured"
headers = build_s2a_headers()
# 构建账号数据
credentials = {
"access_token": access_token,
}
payload = {
"name": email,
"platform": "openai",
"type": "access_token",
"credentials": credentials,
"concurrency": S2A_CONCURRENCY,
"priority": S2A_PRIORITY,
"auto_pause_on_expired": True,
}
if proxy_id is not None:
payload["proxy_id"] = proxy_id
group_ids = get_s2a_group_ids()
if group_ids:
payload["group_ids"] = group_ids
try:
response = http_session.post(
f"{S2A_API_BASE}/admin/accounts",
headers=headers,
json=payload,
timeout=REQUEST_TIMEOUT
)
if response.status_code == 200:
result = response.json()
if result.get("code") == 0:
account_data = result.get("data", {})
account_id = account_data.get("id")
return True, f"ID: {account_id}"
else:
error_msg = result.get("message", "Unknown error")
# 检查是否已存在
if "exist" in error_msg.lower() or "duplicate" in error_msg.lower():
return False, "Already exists"
return False, error_msg
else:
return False, f"HTTP {response.status_code}"
except Exception as e:
return False, str(e)
def s2a_batch_import_accounts(
accounts: List[Dict[str, str]],
progress_callback: Optional[callable] = None
) -> Dict[str, Any]:
"""批量导入账号到 S2A
Args:
accounts: 账号列表 [{"account": "email", "password": "pwd", "token": "jwt"}]
progress_callback: 进度回调函数 (current, total, email, status)
Returns:
dict: {"success": int, "failed": int, "skipped": int, "results": [...]}
"""
results = {
"success": 0,
"failed": 0,
"skipped": 0,
"details": []
}
total = len(accounts)
for i, acc in enumerate(accounts):
email = acc.get("account", "")
token = acc.get("token", "")
password = acc.get("password", "")
if not email or not token:
results["skipped"] += 1
results["details"].append({
"email": email or "unknown",
"status": "skipped",
"message": "Missing email or token"
})
continue
# 检查是否已存在
if s2a_check_account_exists(email):
results["skipped"] += 1
results["details"].append({
"email": email,
"status": "skipped",
"message": "Already exists"
})
if progress_callback:
progress_callback(i + 1, total, email, "skipped")
continue
# 导入账号
success, message = s2a_import_account_with_token(email, token, password)
if success:
results["success"] += 1
results["details"].append({
"email": email,
"status": "success",
"message": message
})
else:
if "exist" in message.lower():
results["skipped"] += 1
results["details"].append({
"email": email,
"status": "skipped",
"message": message
})
else:
results["failed"] += 1
results["details"].append({
"email": email,
"status": "failed",
"message": message
})
if progress_callback:
status = "success" if success else ("skipped" if "exist" in message.lower() else "failed")
progress_callback(i + 1, total, email, status)
return results
# ==================== API 密钥用量查询 ====================
def s2a_get_api_keys(page: int = 1, page_size: int = 50, timezone: str = "Asia/Shanghai") -> Optional[Dict[str, Any]]:
"""获取 API 密钥列表
Args:
page: 页码
page_size: 每页数量
timezone: 时区
Returns:
dict: API 响应数据 或 None
"""
if not S2A_API_BASE or (not S2A_ADMIN_KEY and not S2A_ADMIN_TOKEN):
return None
headers = build_s2a_headers()
try:
response = http_session.get(
f"{S2A_API_BASE}/keys",
headers=headers,
params={"page": page, "page_size": page_size, "timezone": timezone},
timeout=REQUEST_TIMEOUT
)
if response.status_code == 200:
result = response.json()
if result.get("code") == 0:
return result.get("data", {})
else:
log.warning(f"S2A 获取密钥列表失败: {result.get('message', 'Unknown error')}")
else:
log.warning(f"S2A 获取密钥列表失败: HTTP {response.status_code}")
except Exception as e:
log.warning(f"S2A 获取密钥列表异常: {e}")
return None
def s2a_get_key_usage_stats(
api_key_id: int,
start_date: str,
end_date: str,
timezone: str = "Asia/Shanghai"
) -> Optional[Dict[str, Any]]:
"""获取单个 API 密钥的详细用量统计
Args:
api_key_id: 密钥 ID
start_date: 开始日期 (YYYY-MM-DD)
end_date: 结束日期 (YYYY-MM-DD)
timezone: 时区
Returns:
dict: 用量统计数据 或 None
"""
if not S2A_API_BASE or (not S2A_ADMIN_KEY and not S2A_ADMIN_TOKEN):
return None
headers = build_s2a_headers()
try:
response = http_session.get(
f"{S2A_API_BASE}/admin/usage/stats",
headers=headers,
params={
"api_key_id": api_key_id,
"start_date": start_date,
"end_date": end_date,
"timezone": timezone
},
timeout=REQUEST_TIMEOUT
)
if response.status_code == 200:
result = response.json()
if result.get("code") == 0:
return result.get("data", {})
else:
log.warning(f"S2A 获取密钥用量失败: {result.get('message', 'Unknown error')}")
else:
log.warning(f"S2A 获取密钥用量失败: HTTP {response.status_code}")
except Exception as e:
log.warning(f"S2A 获取密钥用量异常: {e}")
return None
def s2a_get_keys_with_usage(start_date: str = None, end_date: str = None, timezone: str = "Asia/Shanghai") -> Optional[List[Dict[str, Any]]]:
"""获取密钥列表并合并用量数据
Args:
start_date: 开始日期 (YYYY-MM-DD),默认今日
end_date: 结束日期 (YYYY-MM-DD),默认今日
timezone: 时区
Returns:
list: 包含用量信息的密钥列表 或 None
"""
from datetime import datetime
# 获取密钥列表
keys_data = s2a_get_api_keys(page=1, page_size=100, timezone=timezone)
if not keys_data:
return None
keys = keys_data.get("items", [])
if not keys:
return []
# 默认今日
if not start_date:
start_date = datetime.now().strftime("%Y-%m-%d")
if not end_date:
end_date = datetime.now().strftime("%Y-%m-%d")
# 获取每个密钥的用量
for key in keys:
key_id = key.get("id")
if key_id:
usage = s2a_get_key_usage_stats(key_id, start_date, end_date, timezone)
key["usage"] = usage if usage else {}
else:
key["usage"] = {}
return keys
def format_keys_usage(keys: List[Dict[str, Any]], period_text: str = "今日") -> str:
"""格式化密钥用量为可读文本
Args:
keys: 密钥列表 (包含 usage 字段)
period_text: 时间段描述
Returns:
str: 格式化后的文本
"""
if not keys:
return "暂无密钥数据"
def fmt_cost(n):
"""格式化费用"""
if n >= 1000:
return f"${n:,.2f}"
elif n >= 1:
return f"${n:.2f}"
return f"${n:.4f}"
def fmt_tokens(n):
"""格式化 Token 数量"""
if n >= 1_000_000_000:
return f"{n / 1_000_000_000:.2f}B"
elif n >= 1_000_000:
return f"{n / 1_000_000:.2f}M"
elif n >= 1_000:
return f"{n / 1_000:.1f}K"
return str(int(n))
def fmt_duration(ms):
"""格式化耗时"""
if ms >= 1000:
return f"{ms / 1000:.2f}s"
return f"{ms:.0f}ms"
lines = [f"🔑 API 密钥用量 ({period_text})", ""]
total_requests = 0
total_tokens = 0
total_cost = 0
for key in keys:
name = key.get("name", "未命名")
key_str = key.get("key", "")
status = key.get("status", "unknown")
group = key.get("group", {})
group_name = group.get("name", "默认") if group else "默认"
usage = key.get("usage", {})
requests = usage.get("total_requests", 0)
tokens = usage.get("total_tokens", 0)
input_tokens = usage.get("total_input_tokens", 0)
output_tokens = usage.get("total_output_tokens", 0)
cache_tokens = usage.get("total_cache_tokens", 0)
cost = usage.get("total_actual_cost", 0)
avg_duration = usage.get("average_duration_ms", 0)
total_requests += requests
total_tokens += tokens
total_cost += cost
# 状态图标
status_icon = "✅" if status == "active" else "⏸️"
# 密钥脱敏显示
if len(key_str) > 12:
key_display = f"{key_str[:6]}...{key_str[-4:]}"
else:
key_display = key_str[:8] + "..." if key_str else "N/A"
lines.append(f"{status_icon} {name} ({group_name})")
lines.append(f" 密钥: {key_display}")
lines.append(f" 请求: {requests:,} | 耗时: {fmt_duration(avg_duration)}")
lines.append(f" Token: {fmt_tokens(tokens)} (入:{fmt_tokens(input_tokens)} 出:{fmt_tokens(output_tokens)})")
if cache_tokens > 0:
lines.append(f" 缓存: {fmt_tokens(cache_tokens)}")
lines.append(f" 费用: {fmt_cost(cost)}")
lines.append("")
# 汇总
lines.append(f"📊 {period_text}汇总")
lines.append(f" 密钥数: {len(keys)}")
lines.append(f" 总请求: {total_requests:,}")
lines.append(f" 总 Token: {fmt_tokens(total_tokens)}")
lines.append(f" 总费用: {fmt_cost(total_cost)}")
return "\n".join(lines)