# ==================== S2A (Sub2API) 服务模块 ==================== # 处理 Sub2API 系统相关功能 (OpenAI OAuth 授权、账号入库) # # S2A 与 CPA/CRS 的关键差异: # - 认证方式: S2A 支持 Admin API Key (x-api-key) 或 JWT Token (Bearer) # - 会话标识: S2A 使用 session_id # - 授权流程: S2A 生成授权 URL -> 用户授权 -> 提交 code 换取 token -> 创建账号 # - 账号入库: S2A 可一步完成 (create-from-oauth) 或分步完成 (exchange + add_account) import requests from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry from urllib.parse import urlparse, parse_qs from typing import Optional, Tuple, Dict, List, Any from config import ( S2A_API_BASE, S2A_ADMIN_KEY, S2A_ADMIN_TOKEN, S2A_CONCURRENCY, S2A_PRIORITY, S2A_GROUP_IDS, S2A_GROUP_NAMES, REQUEST_TIMEOUT, USER_AGENT, ) from logger import log # ==================== 分组 ID 缓存 ==================== _resolved_group_ids = None # 缓存解析后的 group_ids def create_session_with_retry() -> requests.Session: """创建带重试机制的 HTTP Session""" session = requests.Session() retry_strategy = Retry( total=5, backoff_factor=1, status_forcelist=[429, 500, 502, 503, 504], allowed_methods=["HEAD", "GET", "POST", "PUT", "DELETE", "OPTIONS"] ) adapter = HTTPAdapter(max_retries=retry_strategy) session.mount("https://", adapter) session.mount("http://", adapter) return session http_session = create_session_with_retry() def build_s2a_headers() -> Dict[str, str]: """构建 S2A API 请求的 Headers 优先使用 Admin API Key,如果未配置则使用 JWT Token """ headers = { "accept": "application/json", "content-type": "application/json", "user-agent": USER_AGENT } if S2A_ADMIN_KEY: headers["x-api-key"] = S2A_ADMIN_KEY elif S2A_ADMIN_TOKEN: headers["authorization"] = f"Bearer {S2A_ADMIN_TOKEN}" return headers def get_auth_method() -> Tuple[str, str]: """获取当前使用的认证方式 Returns: tuple: (method_name, credential_preview) """ if S2A_ADMIN_KEY: preview = S2A_ADMIN_KEY[:16] + "..." if len(S2A_ADMIN_KEY) > 16 else S2A_ADMIN_KEY return "Admin API Key", preview elif S2A_ADMIN_TOKEN: preview = S2A_ADMIN_TOKEN[:16] + "..." if len(S2A_ADMIN_TOKEN) > 16 else S2A_ADMIN_TOKEN return "JWT Token", preview return "None", "" # ==================== 分组管理 ==================== def s2a_get_groups() -> List[Dict[str, Any]]: """获取所有分组列表""" headers = build_s2a_headers() try: response = http_session.get( f"{S2A_API_BASE}/admin/groups", headers=headers, params={"page": 1, "page_size": 100}, timeout=REQUEST_TIMEOUT ) if response.status_code == 200: result = response.json() if result.get("code") == 0: data = result.get("data", {}) return data.get("items", []) except Exception as e: log.warning(f"S2A 获取分组列表异常: {e}") return [] def s2a_resolve_group_ids(silent: bool = False) -> List[int]: """解析分组 ID 列表 优先使用 S2A_GROUP_IDS (直接配置的 ID) 如果未配置,则通过 S2A_GROUP_NAMES 查询 API 获取对应的 ID Args: silent: 是否静默模式 (不输出日志) """ global _resolved_group_ids # 使用缓存 if _resolved_group_ids is not None: return _resolved_group_ids # 优先使用直接配置的 group_ids if S2A_GROUP_IDS: _resolved_group_ids = S2A_GROUP_IDS return _resolved_group_ids # 通过 group_names 查询获取 ID if not S2A_GROUP_NAMES: _resolved_group_ids = [] return _resolved_group_ids groups = s2a_get_groups() if not groups: if not silent: log.warning("S2A 无法获取分组列表,group_names 解析失败") _resolved_group_ids = [] return _resolved_group_ids # 构建 name -> id 映射 name_to_id = {g.get("name", "").lower(): g.get("id") for g in groups} resolved = [] not_found = [] for name in S2A_GROUP_NAMES: group_id = name_to_id.get(name.lower()) if group_id is not None: resolved.append(group_id) else: not_found.append(name) if not_found and not silent: log.warning(f"S2A 分组未找到: {', '.join(not_found)}") _resolved_group_ids = resolved return _resolved_group_ids def get_s2a_group_ids() -> List[int]: """获取当前配置的分组 ID 列表 (供外部调用)""" return s2a_resolve_group_ids() # ==================== 连接验证 ==================== def s2a_verify_connection() -> Tuple[bool, str]: """验证 S2A 服务连接和认证有效性 Returns: tuple: (is_valid, message) """ if not S2A_API_BASE: return False, "S2A_API_BASE 未配置" if not S2A_ADMIN_KEY and not S2A_ADMIN_TOKEN: return False, "S2A_ADMIN_KEY 或 S2A_ADMIN_TOKEN 未配置" auth_method, auth_preview = get_auth_method() headers = build_s2a_headers() try: # 使用 /admin/groups 接口验证连接 (支持 x-api-key 认证) response = http_session.get( f"{S2A_API_BASE}/admin/groups", headers=headers, params={"page": 1, "page_size": 1}, timeout=REQUEST_TIMEOUT ) if response.status_code == 200: result = response.json() if result.get("code") == 0: # 解析分组配置 group_ids = s2a_resolve_group_ids(silent=True) group_info = "" if S2A_GROUP_NAMES: group_info = f", 分组: {S2A_GROUP_NAMES} -> {group_ids}" elif S2A_GROUP_IDS: group_info = f", 分组 ID: {group_ids}" return True, f"认证有效 (方式: {auth_method}{group_info})" else: return False, f"API 返回失败: {result.get('message', 'Unknown error')}" elif response.status_code == 401: return False, f"{auth_method} 无效或已过期 (HTTP 401)" elif response.status_code == 403: return False, f"{auth_method} 权限不足 (HTTP 403)" else: return False, f"服务异常 (HTTP {response.status_code})" except requests.exceptions.Timeout: return False, f"服务连接超时 ({S2A_API_BASE})" except requests.exceptions.ConnectionError: return False, f"无法连接到服务 ({S2A_API_BASE})" except Exception as e: return False, f"验证异常: {str(e)}" # ==================== OAuth 授权 ==================== def s2a_generate_auth_url(proxy_id: Optional[int] = None) -> Tuple[Optional[str], Optional[str]]: """生成 OpenAI OAuth 授权 URL Returns: tuple: (auth_url, session_id) 或 (None, None) """ headers = build_s2a_headers() payload = {} if proxy_id is not None: payload["proxy_id"] = proxy_id try: response = http_session.post( f"{S2A_API_BASE}/admin/openai/generate-auth-url", headers=headers, json=payload, timeout=REQUEST_TIMEOUT ) if response.status_code == 200: result = response.json() if result.get("code") == 0: data = result.get("data", {}) auth_url = data.get("auth_url") session_id = data.get("session_id") if auth_url and session_id: log.success(f"生成 S2A 授权链接成功 (会话: {session_id[:16]}...)") return auth_url, session_id log.error(f"生成 S2A 授权链接失败: HTTP {response.status_code}") return None, None except Exception as e: log.error(f"S2A API 异常: {e}") return None, None def s2a_verify_account_in_pool(email: str, timeout: int = 10) -> Tuple[bool, Optional[Dict[str, Any]]]: """验证账号是否已成功入库到 S2A 账号池 通过请求 /admin/accounts 接口,检查第一个账号的 name 是否匹配邮箱 Args: email: 要验证的邮箱地址 timeout: 超时时间 (秒) Returns: tuple: (是否成功, 账号数据或None) """ headers = build_s2a_headers() try: # 使用 search 参数搜索该邮箱 params = { "page": 1, "page_size": 20, "platform": "", "type": "", "status": "", "search": email, "timezone": "Asia/Shanghai" } response = http_session.get( f"{S2A_API_BASE}/admin/accounts", headers=headers, params=params, timeout=timeout ) if response.status_code == 200: result = response.json() if result.get("code") == 0: data = result.get("data", {}) items = data.get("items", []) if items: # 检查第一个账号的 name 是否匹配 first_account = items[0] account_name = first_account.get("name", "") # 邮箱匹配检查 (忽略大小写) if email.lower() in account_name.lower() or account_name.lower() in email.lower(): return True, first_account return False, None else: log.warning(f"S2A 验证账号失败: {result.get('message', '未知错误')}") else: log.warning(f"S2A 验证账号失败: HTTP {response.status_code}") return False, None except Exception as e: log.warning(f"S2A 验证账号异常: {e}") return False, None def s2a_create_account_from_oauth( code: str, session_id: str, name: str = "", proxy_id: Optional[int] = None ) -> Optional[Dict[str, Any]]: """一步完成:用授权码换取 token 并创建账号 Args: code: 授权码 session_id: 会话 ID name: 账号名称 (可选) proxy_id: 代理 ID (可选) Returns: dict: 账号数据 或 None """ headers = build_s2a_headers() payload = { "session_id": session_id, "code": code, "concurrency": S2A_CONCURRENCY, "priority": S2A_PRIORITY, } # 获取完整邮箱用于后续验证 full_email = name if "@" in name else "" if name: payload["name"] = name if proxy_id is not None: payload["proxy_id"] = proxy_id group_ids = get_s2a_group_ids() if group_ids: payload["group_ids"] = group_ids try: log.step("正在提交授权码到 S2A...") response = http_session.post( f"{S2A_API_BASE}/admin/openai/create-from-oauth", headers=headers, json=payload, timeout=REQUEST_TIMEOUT ) if response.status_code == 200: result = response.json() if result.get("code") == 0: account_data = result.get("data", {}) account_id = account_data.get("id") account_name = account_data.get("name") log.success(f"S2A 授权成功 (ID: {account_id}, 名称: {account_name})") # 验证账号是否成功入库 if full_email or account_name: verify_email = full_email or account_name log.step(f"正在验证账号入库状态...") verified, verified_data = s2a_verify_account_in_pool(verify_email) if verified: verified_id = verified_data.get("id", "") verified_name = verified_data.get("name", "") log.success(f"✅ 账号入库验证成功 (ID: {verified_id}, 名称: {verified_name})") else: log.warning(f"⚠️ 账号入库验证失败,但授权已成功") return account_data else: error_msg = result.get('message', '未知错误') log.error(f"S2A 账号创建失败: {error_msg}") else: log.error(f"S2A 账号创建失败: HTTP {response.status_code}") return None except Exception as e: log.error(f"S2A 创建账号异常: {e}") return None def s2a_add_account( name: str, token_info: Dict[str, Any], proxy_id: Optional[int] = None ) -> Optional[Dict[str, Any]]: """将账号添加到 S2A 账号池 Args: name: 账号名称 (通常是邮箱) token_info: Token 信息 (包含 access_token, refresh_token, expires_at) proxy_id: 代理 ID (可选) Returns: dict: 账号数据 或 None """ headers = build_s2a_headers() credentials = { "access_token": token_info.get("access_token"), "refresh_token": token_info.get("refresh_token"), "expires_at": token_info.get("expires_at"), } if token_info.get("id_token"): credentials["id_token"] = token_info.get("id_token") if token_info.get("email"): credentials["email"] = token_info.get("email") payload = { "name": name, "platform": "openai", "type": "oauth", "credentials": credentials, "concurrency": S2A_CONCURRENCY, "priority": S2A_PRIORITY, "auto_pause_on_expired": True, } if proxy_id is not None: payload["proxy_id"] = proxy_id group_ids = get_s2a_group_ids() if group_ids: payload["group_ids"] = group_ids try: response = http_session.post( f"{S2A_API_BASE}/admin/accounts", headers=headers, json=payload, timeout=REQUEST_TIMEOUT ) if response.status_code == 200: result = response.json() if result.get("code") == 0: account_data = result.get("data", {}) account_id = account_data.get("id") log.success(f"S2A 账号添加成功 (ID: {account_id}, Name: {name})") return account_data else: log.error(f"S2A 添加账号失败: {result.get('message', 'Unknown error')}") else: log.error(f"S2A 添加账号失败: HTTP {response.status_code}") return None except Exception as e: log.error(f"S2A 添加账号异常: {e}") return None # ==================== 账号管理 ==================== def s2a_get_accounts(platform: str = "openai") -> List[Dict[str, Any]]: """获取账号列表""" headers = build_s2a_headers() try: params = {"platform": platform} if platform else {} response = http_session.get( f"{S2A_API_BASE}/admin/accounts", headers=headers, params=params, timeout=REQUEST_TIMEOUT ) if response.status_code == 200: result = response.json() if result.get("code") == 0: data = result.get("data", {}) if isinstance(data, dict) and "items" in data: return data.get("items", []) elif isinstance(data, list): return data return [] except Exception as e: log.warning(f"S2A 获取账号列表异常: {e}") return [] def s2a_check_account_exists(email: str, platform: str = "openai") -> bool: """检查账号是否已存在""" accounts = s2a_get_accounts(platform) for account in accounts: account_name = account.get("name", "").lower() credentials = account.get("credentials", {}) account_email = credentials.get("email", "").lower() if account_name == email.lower() or account_email == email.lower(): return True return False # ==================== 工具函数 ==================== def extract_code_from_url(url: str) -> Optional[str]: """从回调 URL 中提取授权码""" if not url: return None try: parsed = urlparse(url) params = parse_qs(parsed.query) return params.get("code", [None])[0] except Exception as e: log.error(f"解析 URL 失败: {e}") return None def is_s2a_callback_url(url: str) -> bool: """检查 URL 是否为 S2A 回调 URL""" if not url: return False return "localhost:1455/auth/callback" in url and "code=" in url # ==================== 仪表盘统计 ==================== def s2a_get_dashboard_stats(timezone: str = "Asia/Shanghai") -> Optional[Dict[str, Any]]: """获取 S2A 仪表盘统计数据 Args: timezone: 时区 (默认 Asia/Shanghai) Returns: dict: 仪表盘数据 或 None """ if not S2A_API_BASE or (not S2A_ADMIN_KEY and not S2A_ADMIN_TOKEN): return None headers = build_s2a_headers() try: response = http_session.get( f"{S2A_API_BASE}/admin/dashboard/stats", headers=headers, params={"timezone": timezone}, timeout=REQUEST_TIMEOUT ) if response.status_code == 200: result = response.json() if result.get("code") == 0: return result.get("data", {}) else: log.warning(f"S2A 仪表盘获取失败: {result.get('message', 'Unknown error')}") else: log.warning(f"S2A 仪表盘获取失败: HTTP {response.status_code}") except Exception as e: log.warning(f"S2A 仪表盘获取异常: {e}") return None def format_dashboard_stats(stats: Dict[str, Any]) -> str: """格式化仪表盘统计数据为可读文本 Args: stats: 仪表盘原始数据 Returns: str: 格式化后的文本 """ if not stats: return "暂无数据" def fmt_num(n): """格式化数字 (添加千分位)""" if isinstance(n, float): return f"{n:,.2f}" return f"{n:,}" def fmt_tokens(n): """格式化 Token 数量 (简化显示)""" if n >= 1_000_000_000: return f"{n / 1_000_000_000:.2f}B" elif n >= 1_000_000: return f"{n / 1_000_000:.2f}M" elif n >= 1_000: return f"{n / 1_000:.1f}K" return str(n) # 账号状态 total_accounts = stats.get("total_accounts", 0) normal_accounts = stats.get("normal_accounts", 0) error_accounts = stats.get("error_accounts", 0) ratelimit_accounts = stats.get("ratelimit_accounts", 0) overload_accounts = stats.get("overload_accounts", 0) # 今日统计 today_requests = stats.get("today_requests", 0) today_tokens = stats.get("today_tokens", 0) today_cost = stats.get("today_cost", 0) today_input = stats.get("today_input_tokens", 0) today_output = stats.get("today_output_tokens", 0) today_cache_read = stats.get("today_cache_read_tokens", 0) # 总计 total_requests = stats.get("total_requests", 0) total_tokens = stats.get("total_tokens", 0) total_cost = stats.get("total_cost", 0) # 实时状态 rpm = stats.get("rpm", 0) tpm = stats.get("tpm", 0) active_users = stats.get("active_users", 0) avg_duration = stats.get("average_duration_ms", 0) lines = [ "📊 S2A 仪表盘", "", "📦 账号状态", f" 总计: {total_accounts} | 正常: {normal_accounts}", f" 异常: {error_accounts} | 限流: {ratelimit_accounts}", "", "📅 今日统计", f" 请求数: {fmt_num(today_requests)}", f" Token: {fmt_tokens(today_tokens)}", f" 输入: {fmt_tokens(today_input)} | 输出: {fmt_tokens(today_output)}", f" 缓存: {fmt_tokens(today_cache_read)}", f" 费用: ${fmt_num(today_cost)}", "", "📈 累计统计", f" 请求数: {fmt_num(total_requests)}", f" Token: {fmt_tokens(total_tokens)}", f" 费用: ${fmt_num(total_cost)}", "", "⚡ 实时状态", f" RPM: {rpm} | TPM: {fmt_num(tpm)}", f" 活跃用户: {active_users}", f" 平均延迟: {avg_duration:.0f}ms", ] return "\n".join(lines) # ==================== 批量导入账号 ==================== def s2a_import_account_with_token( email: str, access_token: str, password: str = "", proxy_id: Optional[int] = None ) -> Tuple[bool, str]: """使用 access_token 直接导入账号到 S2A Args: email: 账号邮箱 access_token: OpenAI access token (JWT) password: 账号密码 (可选,用于备注) proxy_id: 代理 ID (可选) Returns: tuple: (success, message) """ if not S2A_API_BASE or (not S2A_ADMIN_KEY and not S2A_ADMIN_TOKEN): return False, "S2A not configured" headers = build_s2a_headers() # 构建账号数据 credentials = { "access_token": access_token, } payload = { "name": email, "platform": "openai", "type": "access_token", "credentials": credentials, "concurrency": S2A_CONCURRENCY, "priority": S2A_PRIORITY, "auto_pause_on_expired": True, } if proxy_id is not None: payload["proxy_id"] = proxy_id group_ids = get_s2a_group_ids() if group_ids: payload["group_ids"] = group_ids try: response = http_session.post( f"{S2A_API_BASE}/admin/accounts", headers=headers, json=payload, timeout=REQUEST_TIMEOUT ) if response.status_code == 200: result = response.json() if result.get("code") == 0: account_data = result.get("data", {}) account_id = account_data.get("id") return True, f"ID: {account_id}" else: error_msg = result.get("message", "Unknown error") # 检查是否已存在 if "exist" in error_msg.lower() or "duplicate" in error_msg.lower(): return False, "Already exists" return False, error_msg else: return False, f"HTTP {response.status_code}" except Exception as e: return False, str(e) def s2a_batch_import_accounts( accounts: List[Dict[str, str]], progress_callback: Optional[callable] = None ) -> Dict[str, Any]: """批量导入账号到 S2A Args: accounts: 账号列表 [{"account": "email", "password": "pwd", "token": "jwt"}] progress_callback: 进度回调函数 (current, total, email, status) Returns: dict: {"success": int, "failed": int, "skipped": int, "results": [...]} """ results = { "success": 0, "failed": 0, "skipped": 0, "details": [] } total = len(accounts) for i, acc in enumerate(accounts): email = acc.get("account", "") token = acc.get("token", "") password = acc.get("password", "") if not email or not token: results["skipped"] += 1 results["details"].append({ "email": email or "unknown", "status": "skipped", "message": "Missing email or token" }) continue # 检查是否已存在 if s2a_check_account_exists(email): results["skipped"] += 1 results["details"].append({ "email": email, "status": "skipped", "message": "Already exists" }) if progress_callback: progress_callback(i + 1, total, email, "skipped") continue # 导入账号 success, message = s2a_import_account_with_token(email, token, password) if success: results["success"] += 1 results["details"].append({ "email": email, "status": "success", "message": message }) else: if "exist" in message.lower(): results["skipped"] += 1 results["details"].append({ "email": email, "status": "skipped", "message": message }) else: results["failed"] += 1 results["details"].append({ "email": email, "status": "failed", "message": message }) if progress_callback: status = "success" if success else ("skipped" if "exist" in message.lower() else "failed") progress_callback(i + 1, total, email, status) return results # ==================== API 密钥用量查询 ==================== def s2a_get_all_usage_stats( start_date: str, end_date: str, timezone: str = "Asia/Shanghai" ) -> Optional[Dict[str, Any]]: """获取所有密钥的汇总用量统计(不传 api_key_id) Returns: dict: 汇总统计数据 或 None """ if not S2A_API_BASE or (not S2A_ADMIN_KEY and not S2A_ADMIN_TOKEN): return None headers = build_s2a_headers() try: response = http_session.get( f"{S2A_API_BASE}/admin/usage/stats", headers=headers, params={ "start_date": start_date, "end_date": end_date, "timezone": timezone }, timeout=REQUEST_TIMEOUT ) if response.status_code == 200: result = response.json() if result.get("code") == 0: return result.get("data", {}) else: log.warning(f"S2A 获取汇总用量失败: HTTP {response.status_code}") except Exception as e: log.warning(f"S2A 获取汇总用量异常: {e}") return None def s2a_get_key_usage_stats( api_key_id: int, start_date: str, end_date: str, timezone: str = "Asia/Shanghai" ) -> Optional[Dict[str, Any]]: """获取单个 API 密钥的详细用量统计 Args: api_key_id: 密钥 ID start_date: 开始日期 (YYYY-MM-DD) end_date: 结束日期 (YYYY-MM-DD) timezone: 时区 Returns: dict: 用量统计数据 或 None """ if not S2A_API_BASE or (not S2A_ADMIN_KEY and not S2A_ADMIN_TOKEN): return None headers = build_s2a_headers() try: response = http_session.get( f"{S2A_API_BASE}/admin/usage/stats", headers=headers, params={ "api_key_id": api_key_id, "start_date": start_date, "end_date": end_date, "timezone": timezone }, timeout=REQUEST_TIMEOUT ) if response.status_code == 200: result = response.json() if result.get("code") == 0: return result.get("data", {}) else: log.warning(f"S2A 获取密钥用量失败: {result.get('message', 'Unknown error')}") else: log.warning(f"S2A 获取密钥用量失败: HTTP {response.status_code}") except Exception as e: log.warning(f"S2A 获取密钥用量异常: {e}") return None # 密钥列表配置 (从 /api/v1/keys 接口获取的数据) API_KEYS_LIST = [ {"id": 11, "name": "bohe", "key": "bohebohebohebohe", "group_name": "codex"}, {"id": 10, "name": "黑与白", "key": "oyqq114514oyqq114514", "group_name": "codex"}, {"id": 9, "name": "baozi", "key": "baozibaozibaozibaozi", "group_name": "codex"}, {"id": 7, "name": "Wong-灭鼠专家", "key": "Wong1234567891011", "group_name": "codex"}, {"id": 6, "name": "小马-莹佬", "key": "mazhichentaiqiangla", "group_name": "codex"}, {"id": 5, "name": "猫猫-byteBender", "key": "ByteBender114514", "group_name": "codex"}, {"id": 4, "name": "KFC", "key": "sk-babf3f6d15582b31e959311d6ceaa9448f2cf951dc23d02386d41f68046e6018", "group_name": "codex"}, {"id": 3, "name": "zhongruan", "key": "zhongruantaiqiangla", "group_name": "codex"}, ] def s2a_get_keys_with_usage(start_date: str = None, end_date: str = None, timezone: str = "Asia/Shanghai") -> Optional[List[Dict[str, Any]]]: """获取密钥列表并合并用量数据 Args: start_date: 开始日期 (YYYY-MM-DD),默认今日 end_date: 结束日期 (YYYY-MM-DD),默认今日 timezone: 时区 Returns: list: 包含用量信息的密钥列表 或 None """ from datetime import datetime # 默认今日 if not start_date: start_date = datetime.now().strftime("%Y-%m-%d") if not end_date: end_date = datetime.now().strftime("%Y-%m-%d") keys = [] for key_info in API_KEYS_LIST: key_id = key_info["id"] usage = s2a_get_key_usage_stats(key_id, start_date, end_date, timezone) keys.append({ "id": key_id, "name": key_info["name"], "key": key_info["key"], "group_name": key_info["group_name"], "status": "active", "usage": usage if usage else {} }) return keys if keys else None def format_keys_usage(keys: List[Dict[str, Any]], period_text: str = "今日") -> str: """格式化密钥用量为可读文本 Args: keys: 密钥列表 (包含 usage 字段) period_text: 时间段描述 Returns: str: 格式化后的文本 """ if not keys: return "暂无密钥数据" def fmt_cost(n): """格式化费用""" if n >= 1000: return f"${n:,.2f}" elif n >= 1: return f"${n:.2f}" return f"${n:.4f}" def fmt_tokens(n): """格式化 Token 数量""" if n >= 1_000_000_000: return f"{n / 1_000_000_000:.2f}B" elif n >= 1_000_000: return f"{n / 1_000_000:.2f}M" elif n >= 1_000: return f"{n / 1_000:.1f}K" return str(int(n)) def fmt_duration(ms): """格式化耗时""" if ms >= 1000: return f"{ms / 1000:.2f}s" return f"{ms:.0f}ms" lines = [f"🔑 API 密钥用量 ({period_text})", ""] total_requests = 0 total_tokens = 0 total_cost = 0 for key in keys: name = key.get("name", "未命名") key_str = key.get("key", "") status = key.get("status", "active") group_name = key.get("group_name", "默认") usage = key.get("usage", {}) requests = usage.get("total_requests", 0) tokens = usage.get("total_tokens", 0) input_tokens = usage.get("total_input_tokens", 0) output_tokens = usage.get("total_output_tokens", 0) cache_tokens = usage.get("total_cache_tokens", 0) cost = usage.get("total_actual_cost", 0) avg_duration = usage.get("average_duration_ms", 0) total_requests += requests total_tokens += tokens total_cost += cost # 状态图标 status_icon = "✅" if status == "active" else "⏸️" # 密钥脱敏显示 if len(key_str) > 12: key_display = f"{key_str[:6]}...{key_str[-4:]}" else: key_display = key_str[:8] + "..." if key_str else "N/A" lines.append(f"{status_icon} {name} ({group_name})") lines.append(f" 请求: {requests:,} | 耗时: {fmt_duration(avg_duration)}") lines.append(f" Token: {fmt_tokens(tokens)} (入:{fmt_tokens(input_tokens)} 出:{fmt_tokens(output_tokens)})") if cache_tokens > 0: lines.append(f" 缓存: {fmt_tokens(cache_tokens)}") lines.append(f" 费用: {fmt_cost(cost)}") lines.append("") # 汇总 lines.append(f"📊 {period_text}汇总") lines.append(f" 密钥数: {len(keys)}") lines.append(f" 总请求: {total_requests:,}") lines.append(f" 总 Token: {fmt_tokens(total_tokens)}") lines.append(f" 总费用: {fmt_cost(total_cost)}") return "\n".join(lines)