- Add S2A_API_MODE configuration option to enable browser-less authorization - Implement S2AApiAuthorizer class using curl_cffi for browser fingerprint simulation - Add Sentinel PoW (Proof of Work) solver with FNV-1a hashing algorithm - Implement OAuth flow via direct API calls instead of browser automation - Add s2a_api_authorize() function to handle email/password authentication - Support proxy configuration for API requests - Add requirements token generation for API authentication - Update browser_automation.py to check S2A_API_MODE and route to API or browser flow - Update config.py to load S2A_API_MODE from configuration - Add api_mode option to config.toml.example with documentation - Improves performance and stability by eliminating browser overhead while maintaining compatibility with existing browser-based flow
1690 lines
53 KiB
Python
1690 lines
53 KiB
Python
# ==================== S2A (Sub2API) 服务模块 ====================
|
||
# 处理 Sub2API 系统相关功能 (OpenAI OAuth 授权、账号入库)
|
||
#
|
||
# S2A 与 CPA/CRS 的关键差异:
|
||
# - 认证方式: S2A 支持 Admin API Key (x-api-key) 或 JWT Token (Bearer)
|
||
# - 会话标识: S2A 使用 session_id
|
||
# - 授权流程: S2A 生成授权 URL -> 用户授权 -> 提交 code 换取 token -> 创建账号
|
||
# - 账号入库: S2A 可一步完成 (create-from-oauth) 或分步完成 (exchange + add_account)
|
||
#
|
||
# 新增: 纯 API 授权模式 (无需浏览器)
|
||
# - 使用 curl_cffi 模拟浏览器指纹
|
||
# - 支持 Sentinel PoW 验证
|
||
# - 直接通过 API 完成 OAuth 流程
|
||
|
||
import requests
|
||
from requests.adapters import HTTPAdapter
|
||
from urllib3.util.retry import Retry
|
||
from urllib.parse import urlparse, parse_qs
|
||
from typing import Optional, Tuple, Dict, List, Any
|
||
import json
|
||
import uuid
|
||
import time
|
||
import random
|
||
import base64
|
||
import hashlib
|
||
from datetime import datetime, timedelta, timezone
|
||
|
||
from config import (
|
||
S2A_API_BASE,
|
||
S2A_ADMIN_KEY,
|
||
S2A_ADMIN_TOKEN,
|
||
S2A_CONCURRENCY,
|
||
S2A_PRIORITY,
|
||
S2A_GROUP_IDS,
|
||
S2A_GROUP_NAMES,
|
||
REQUEST_TIMEOUT,
|
||
USER_AGENT,
|
||
get_next_proxy,
|
||
format_proxy_url,
|
||
)
|
||
from logger import log
|
||
|
||
|
||
# ==================== 分组 ID 缓存 ====================
|
||
_resolved_group_ids = None # 缓存解析后的 group_ids
|
||
|
||
|
||
def create_session_with_retry() -> requests.Session:
|
||
"""创建带重试机制的 HTTP Session"""
|
||
session = requests.Session()
|
||
retry_strategy = Retry(
|
||
total=5,
|
||
backoff_factor=1,
|
||
status_forcelist=[429, 500, 502, 503, 504],
|
||
allowed_methods=["HEAD", "GET", "POST", "PUT", "DELETE", "OPTIONS"]
|
||
)
|
||
adapter = HTTPAdapter(max_retries=retry_strategy)
|
||
session.mount("https://", adapter)
|
||
session.mount("http://", adapter)
|
||
return session
|
||
|
||
|
||
http_session = create_session_with_retry()
|
||
|
||
|
||
# ==================== PoW Solver (Sentinel 验证) ====================
|
||
|
||
def _fnv1a_32(data: bytes) -> int:
|
||
"""FNV-1a 32-bit hash"""
|
||
h = 2166136261
|
||
for byte in data:
|
||
h ^= byte
|
||
h = (h * 16777619) & 0xFFFFFFFF
|
||
h ^= (h >> 16)
|
||
h = (h * 2246822507) & 0xFFFFFFFF
|
||
h ^= (h >> 13)
|
||
h = (h * 3266489909) & 0xFFFFFFFF
|
||
h ^= (h >> 16)
|
||
return h
|
||
|
||
|
||
def _get_parse_time() -> str:
|
||
"""生成 JS Date().toString() 格式的时间戳"""
|
||
now = datetime.now(timezone(timedelta(hours=8)))
|
||
return now.strftime("%a %b %d %Y %H:%M:%S") + " GMT+0800 (中国标准时间)"
|
||
|
||
|
||
def _get_pow_config(user_agent: str, sid: str = None) -> list:
|
||
"""生成 PoW 配置数组"""
|
||
if not sid:
|
||
sid = str(uuid.uuid4())
|
||
return [
|
||
random.randint(2500, 3500),
|
||
_get_parse_time(),
|
||
4294967296,
|
||
0,
|
||
user_agent,
|
||
"chrome-extension://pgojnojmmhpofjgdmaebadhbocahppod/assets/aW5qZWN0X2hhc2g/aW5qZ",
|
||
None,
|
||
"zh-CN",
|
||
"zh-CN",
|
||
0,
|
||
f"canShare−function canShare() {{ [native code] }}",
|
||
f"_reactListening{random.randint(1000000, 9999999)}",
|
||
"onhashchange",
|
||
time.perf_counter() * 1000,
|
||
sid,
|
||
"",
|
||
24,
|
||
int(time.time() * 1000 - random.randint(10000, 50000))
|
||
]
|
||
|
||
|
||
def _solve_pow(seed: str, difficulty: str, config: list, max_iterations: int = 5000000) -> Optional[str]:
|
||
"""CPU 求解 PoW"""
|
||
start_time = time.perf_counter()
|
||
seed_bytes = seed.encode()
|
||
|
||
for iteration in range(max_iterations):
|
||
config[3] = iteration
|
||
config[9] = 0
|
||
|
||
json_str = json.dumps(config, separators=(',', ':'))
|
||
encoded = base64.b64encode(json_str.encode())
|
||
|
||
h = _fnv1a_32(seed_bytes + encoded)
|
||
hex_hash = f"{h:08x}"
|
||
|
||
if hex_hash[:len(difficulty)] <= difficulty:
|
||
elapsed = time.perf_counter() - start_time
|
||
log.debug(f"[PoW] 求解完成: {elapsed:.2f}s (迭代 {iteration:,}, 难度={difficulty})")
|
||
return f"{encoded.decode()}~S"
|
||
|
||
return None
|
||
|
||
|
||
def _get_requirements_token(user_agent: str, sid: str = None) -> str:
|
||
"""生成 requirements token"""
|
||
if not sid:
|
||
sid = str(uuid.uuid4())
|
||
config = _get_pow_config(user_agent, sid)
|
||
config[3] = 0
|
||
config[9] = 0
|
||
json_str = json.dumps(config, separators=(',', ':'))
|
||
encoded = base64.b64encode(json_str.encode()).decode()
|
||
return f"gAAAAAC{encoded}~S"
|
||
|
||
|
||
# ==================== S2A API 授权器 ====================
|
||
|
||
class S2AApiAuthorizer:
|
||
"""S2A 纯 API 授权器 - 无需浏览器"""
|
||
|
||
def __init__(self, email: str, password: str, proxy: str = None):
|
||
self.email = email
|
||
self.password = password
|
||
|
||
# 尝试导入 curl_cffi,如果失败则使用 requests
|
||
try:
|
||
from curl_cffi import requests as cffi_requests
|
||
self.session = cffi_requests.Session(impersonate="chrome110")
|
||
self._use_cffi = True
|
||
except ImportError:
|
||
log.warning("curl_cffi 未安装,使用 requests (可能被检测)")
|
||
self.session = requests.Session()
|
||
self._use_cffi = False
|
||
|
||
if proxy:
|
||
self.session.proxies = {"http": proxy, "https": proxy}
|
||
|
||
self.ua = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36"
|
||
self.sid = str(uuid.uuid4())
|
||
self.device_id = str(uuid.uuid4())
|
||
self.sentinel_token = None
|
||
self.solved_pow = None
|
||
|
||
self.session.headers.update({
|
||
"User-Agent": self.ua,
|
||
"Accept": "*/*",
|
||
"Accept-Language": "en-US,en;q=0.9",
|
||
"sec-ch-ua": '"Not(A:Brand";v="8", "Chromium";v="144", "Microsoft Edge";v="144"',
|
||
"sec-ch-ua-mobile": "?0",
|
||
"sec-ch-ua-platform": '"Windows"',
|
||
"sec-fetch-dest": "empty",
|
||
"sec-fetch-mode": "cors",
|
||
"sec-fetch-site": "same-origin",
|
||
})
|
||
|
||
def _call_sentinel_req(self, flow: str) -> Optional[dict]:
|
||
"""调用 sentinel 获取 token 和处理 PoW"""
|
||
init_token = _get_requirements_token(self.ua, self.sid)
|
||
payload = {"p": init_token, "id": self.device_id, "flow": flow}
|
||
|
||
try:
|
||
resp = self.session.post(
|
||
"https://sentinel.openai.com/backend-api/sentinel/req",
|
||
json=payload,
|
||
timeout=15
|
||
)
|
||
if resp.status_code != 200:
|
||
log.warning(f"Sentinel 请求失败: {resp.status_code}")
|
||
return None
|
||
|
||
data = resp.json()
|
||
self.sentinel_token = data.get('token')
|
||
pow_req = data.get('proofofwork', {})
|
||
|
||
if pow_req.get('required'):
|
||
seed = pow_req.get('seed', '')
|
||
difficulty = pow_req.get('difficulty', '')
|
||
config = _get_pow_config(self.ua, self.sid)
|
||
solved = _solve_pow(seed, difficulty, config)
|
||
if solved:
|
||
self.solved_pow = f"gAAAAAB{solved}"
|
||
else:
|
||
log.error("PoW 求解失败")
|
||
return None
|
||
else:
|
||
self.solved_pow = init_token
|
||
return data
|
||
except Exception as e:
|
||
log.error(f"Sentinel 异常: {e}")
|
||
return None
|
||
|
||
def _get_sentinel_header(self, header_flow: str) -> str:
|
||
"""构建 sentinel header"""
|
||
sentinel_obj = {"p": self.solved_pow, "id": self.device_id, "flow": header_flow}
|
||
if self.sentinel_token:
|
||
sentinel_obj["c"] = self.sentinel_token
|
||
return json.dumps(sentinel_obj)
|
||
|
||
def get_authorization_code(self, auth_url: str) -> Optional[str]:
|
||
"""执行 OAuth 流程,返回 authorization code
|
||
|
||
Args:
|
||
auth_url: S2A 生成的授权 URL
|
||
|
||
Returns:
|
||
str: 授权码 或 None
|
||
"""
|
||
log.step("开始 API 授权流程...")
|
||
|
||
headers = {
|
||
"Origin": "https://auth.openai.com",
|
||
"Referer": "https://auth.openai.com/log-in",
|
||
"Content-Type": "application/json"
|
||
}
|
||
|
||
try:
|
||
# 1. 访问授权端点
|
||
log.step("访问授权端点...")
|
||
resp = self.session.get(auth_url, allow_redirects=True)
|
||
headers["Referer"] = resp.url
|
||
|
||
# 2. 提交邮箱
|
||
log.step("提交邮箱...")
|
||
if not self._call_sentinel_req("login_web_init"):
|
||
return None
|
||
|
||
auth_headers = headers.copy()
|
||
auth_headers["OpenAI-Sentinel-Token"] = self._get_sentinel_header("authorize_continue")
|
||
resp = self.session.post(
|
||
"https://auth.openai.com/api/accounts/authorize/continue",
|
||
json={"username": {"kind": "email", "value": self.email}},
|
||
headers=auth_headers
|
||
)
|
||
if resp.status_code != 200:
|
||
log.error(f"邮箱提交失败: {resp.status_code} - {resp.text[:200]}")
|
||
return None
|
||
|
||
data = resp.json()
|
||
page_type = data.get("page", {}).get("type", "")
|
||
|
||
# 3. 验证密码
|
||
if page_type == "password" or "password" in str(data):
|
||
log.step("验证密码...")
|
||
if not self._call_sentinel_req("authorize_continue__auto"):
|
||
return None
|
||
|
||
verify_headers = headers.copy()
|
||
verify_headers["OpenAI-Sentinel-Token"] = self._get_sentinel_header("password_verify")
|
||
resp = self.session.post(
|
||
"https://auth.openai.com/api/accounts/password/verify",
|
||
json={"username": self.email, "password": self.password},
|
||
headers=verify_headers
|
||
)
|
||
if resp.status_code != 200:
|
||
log.error(f"密码验证失败: {resp.status_code} - {resp.text[:200]}")
|
||
return None
|
||
|
||
# 4. 获取 continue_url (无需选择 workspace,S2A 授权链接已包含)
|
||
data = resp.json()
|
||
continue_url = data.get("continue_url")
|
||
|
||
# 如果没有 continue_url,可能需要额外的 sentinel 调用
|
||
if not continue_url:
|
||
log.step("获取重定向 URL...")
|
||
if not self._call_sentinel_req("password_verify__auto"):
|
||
return None
|
||
|
||
# 尝试再次获取
|
||
resp = self.session.post(
|
||
"https://auth.openai.com/api/accounts/authorize/continue",
|
||
json={},
|
||
headers=auth_headers
|
||
)
|
||
if resp.status_code == 200:
|
||
data = resp.json()
|
||
continue_url = data.get("continue_url")
|
||
|
||
if not continue_url:
|
||
log.error(f"无法获取 continue_url: {data}")
|
||
return None
|
||
|
||
# 5. 跟踪重定向直到获取 code
|
||
log.step("跟踪重定向...")
|
||
for _ in range(10):
|
||
resp = self.session.get(continue_url, allow_redirects=False)
|
||
if resp.status_code in (301, 302, 303, 307, 308):
|
||
location = resp.headers.get('Location', '')
|
||
if "localhost:1455" in location:
|
||
parsed = urlparse(location)
|
||
query = parse_qs(parsed.query)
|
||
code = query.get('code', [None])[0]
|
||
if code:
|
||
log.success("成功获取授权码")
|
||
return code
|
||
continue_url = location
|
||
else:
|
||
break
|
||
|
||
log.error("无法获取授权码")
|
||
return None
|
||
|
||
except Exception as e:
|
||
log.error(f"API 授权异常: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
return None
|
||
|
||
|
||
def s2a_api_authorize(
|
||
email: str,
|
||
password: str,
|
||
proxy: str = None
|
||
) -> Tuple[bool, Optional[Dict[str, Any]]]:
|
||
"""S2A 纯 API 授权 (无需浏览器)
|
||
|
||
使用 OpenAI 认证 API 直接完成授权流程,无需浏览器自动化。
|
||
|
||
Args:
|
||
email: 账号邮箱
|
||
password: 账号密码
|
||
proxy: 代理地址 (可选,格式: http://host:port 或 socks5://user:pass@host:port)
|
||
|
||
Returns:
|
||
tuple: (是否成功, 账号数据或None)
|
||
"""
|
||
if not S2A_API_BASE or (not S2A_ADMIN_KEY and not S2A_ADMIN_TOKEN):
|
||
log.error("S2A 未配置")
|
||
return False, None
|
||
|
||
# 使用配置的代理
|
||
if not proxy:
|
||
proxy_config = get_next_proxy()
|
||
if proxy_config:
|
||
proxy = format_proxy_url(proxy_config)
|
||
|
||
log.info(f"开始 S2A API 授权: {email}", icon="code")
|
||
if proxy:
|
||
log.debug(f"使用代理: {proxy[:30]}...")
|
||
|
||
try:
|
||
# 1. 生成授权 URL
|
||
auth_url, session_id = s2a_generate_auth_url()
|
||
if not auth_url or not session_id:
|
||
log.error("无法获取 S2A 授权 URL")
|
||
return False, None
|
||
|
||
log.debug(f"授权 URL: {auth_url[:80]}...")
|
||
log.debug(f"Session ID: {session_id[:16]}...")
|
||
|
||
# 2. 使用 API 授权器获取 code
|
||
authorizer = S2AApiAuthorizer(email, password, proxy)
|
||
code = authorizer.get_authorization_code(auth_url)
|
||
|
||
if not code:
|
||
log.error("API 授权失败,无法获取授权码")
|
||
return False, None
|
||
|
||
log.debug(f"授权码: {code[:20]}...")
|
||
|
||
# 3. 提交授权码创建账号
|
||
log.step("提交授权码到 S2A...")
|
||
result = s2a_create_account_from_oauth(code, session_id, name=email)
|
||
|
||
if result:
|
||
log.success(f"S2A API 授权成功: {email}")
|
||
return True, result
|
||
else:
|
||
log.error("S2A 账号入库失败")
|
||
return False, None
|
||
|
||
except Exception as e:
|
||
log.error(f"S2A API 授权异常: {e}")
|
||
return False, None
|
||
|
||
|
||
def build_s2a_headers() -> Dict[str, str]:
|
||
"""构建 S2A API 请求的 Headers
|
||
|
||
优先使用 Admin API Key,如果未配置则使用 JWT Token
|
||
"""
|
||
headers = {
|
||
"accept": "application/json",
|
||
"content-type": "application/json",
|
||
"user-agent": USER_AGENT
|
||
}
|
||
|
||
if S2A_ADMIN_KEY:
|
||
headers["x-api-key"] = S2A_ADMIN_KEY
|
||
elif S2A_ADMIN_TOKEN:
|
||
headers["authorization"] = f"Bearer {S2A_ADMIN_TOKEN}"
|
||
|
||
return headers
|
||
|
||
|
||
def get_auth_method() -> Tuple[str, str]:
|
||
"""获取当前使用的认证方式
|
||
|
||
Returns:
|
||
tuple: (method_name, credential_preview)
|
||
"""
|
||
if S2A_ADMIN_KEY:
|
||
preview = S2A_ADMIN_KEY[:16] + "..." if len(S2A_ADMIN_KEY) > 16 else S2A_ADMIN_KEY
|
||
return "Admin API Key", preview
|
||
elif S2A_ADMIN_TOKEN:
|
||
preview = S2A_ADMIN_TOKEN[:16] + "..." if len(S2A_ADMIN_TOKEN) > 16 else S2A_ADMIN_TOKEN
|
||
return "JWT Token", preview
|
||
return "None", ""
|
||
|
||
|
||
# ==================== 分组管理 ====================
|
||
def s2a_get_groups() -> List[Dict[str, Any]]:
|
||
"""获取所有分组列表"""
|
||
headers = build_s2a_headers()
|
||
|
||
try:
|
||
response = http_session.get(
|
||
f"{S2A_API_BASE}/admin/groups",
|
||
headers=headers,
|
||
params={"page": 1, "page_size": 100},
|
||
timeout=REQUEST_TIMEOUT
|
||
)
|
||
|
||
if response.status_code == 200:
|
||
result = response.json()
|
||
if result.get("code") == 0:
|
||
data = result.get("data", {})
|
||
return data.get("items", [])
|
||
|
||
except Exception as e:
|
||
log.warning(f"S2A 获取分组列表异常: {e}")
|
||
|
||
return []
|
||
|
||
|
||
def s2a_resolve_group_ids(silent: bool = False) -> List[int]:
|
||
"""解析分组 ID 列表
|
||
|
||
优先使用 S2A_GROUP_IDS (直接配置的 ID)
|
||
如果未配置,则通过 S2A_GROUP_NAMES 查询 API 获取对应的 ID
|
||
|
||
Args:
|
||
silent: 是否静默模式 (不输出日志)
|
||
"""
|
||
global _resolved_group_ids
|
||
|
||
# 使用缓存
|
||
if _resolved_group_ids is not None:
|
||
return _resolved_group_ids
|
||
|
||
# 优先使用直接配置的 group_ids
|
||
if S2A_GROUP_IDS:
|
||
_resolved_group_ids = S2A_GROUP_IDS
|
||
return _resolved_group_ids
|
||
|
||
# 通过 group_names 查询获取 ID
|
||
if not S2A_GROUP_NAMES:
|
||
_resolved_group_ids = []
|
||
return _resolved_group_ids
|
||
|
||
groups = s2a_get_groups()
|
||
if not groups:
|
||
if not silent:
|
||
log.warning("S2A 无法获取分组列表,group_names 解析失败")
|
||
_resolved_group_ids = []
|
||
return _resolved_group_ids
|
||
|
||
# 构建 name -> id 映射
|
||
name_to_id = {g.get("name", "").lower(): g.get("id") for g in groups}
|
||
|
||
resolved = []
|
||
not_found = []
|
||
for name in S2A_GROUP_NAMES:
|
||
group_id = name_to_id.get(name.lower())
|
||
if group_id is not None:
|
||
resolved.append(group_id)
|
||
else:
|
||
not_found.append(name)
|
||
|
||
if not_found and not silent:
|
||
log.warning(f"S2A 分组未找到: {', '.join(not_found)}")
|
||
|
||
_resolved_group_ids = resolved
|
||
return _resolved_group_ids
|
||
|
||
|
||
def get_s2a_group_ids() -> List[int]:
|
||
"""获取当前配置的分组 ID 列表 (供外部调用)"""
|
||
return s2a_resolve_group_ids()
|
||
|
||
|
||
# ==================== 连接验证 ====================
|
||
def s2a_verify_connection() -> Tuple[bool, str]:
|
||
"""验证 S2A 服务连接和认证有效性
|
||
|
||
Returns:
|
||
tuple: (is_valid, message)
|
||
"""
|
||
if not S2A_API_BASE:
|
||
return False, "S2A_API_BASE 未配置"
|
||
|
||
if not S2A_ADMIN_KEY and not S2A_ADMIN_TOKEN:
|
||
return False, "S2A_ADMIN_KEY 或 S2A_ADMIN_TOKEN 未配置"
|
||
|
||
auth_method, auth_preview = get_auth_method()
|
||
headers = build_s2a_headers()
|
||
|
||
try:
|
||
# 使用 /admin/groups 接口验证连接 (支持 x-api-key 认证)
|
||
response = http_session.get(
|
||
f"{S2A_API_BASE}/admin/groups",
|
||
headers=headers,
|
||
params={"page": 1, "page_size": 1},
|
||
timeout=REQUEST_TIMEOUT
|
||
)
|
||
|
||
if response.status_code == 200:
|
||
result = response.json()
|
||
if result.get("code") == 0:
|
||
# 解析分组配置
|
||
group_ids = s2a_resolve_group_ids(silent=True)
|
||
group_info = ""
|
||
if S2A_GROUP_NAMES:
|
||
group_info = f", 分组: {S2A_GROUP_NAMES} -> {group_ids}"
|
||
elif S2A_GROUP_IDS:
|
||
group_info = f", 分组 ID: {group_ids}"
|
||
|
||
return True, f"认证有效 (方式: {auth_method}{group_info})"
|
||
else:
|
||
return False, f"API 返回失败: {result.get('message', 'Unknown error')}"
|
||
|
||
elif response.status_code == 401:
|
||
return False, f"{auth_method} 无效或已过期 (HTTP 401)"
|
||
|
||
elif response.status_code == 403:
|
||
return False, f"{auth_method} 权限不足 (HTTP 403)"
|
||
|
||
else:
|
||
return False, f"服务异常 (HTTP {response.status_code})"
|
||
|
||
except requests.exceptions.Timeout:
|
||
return False, f"服务连接超时 ({S2A_API_BASE})"
|
||
|
||
except requests.exceptions.ConnectionError:
|
||
return False, f"无法连接到服务 ({S2A_API_BASE})"
|
||
|
||
except Exception as e:
|
||
return False, f"验证异常: {str(e)}"
|
||
|
||
|
||
# ==================== OAuth 授权 ====================
|
||
def s2a_generate_auth_url(proxy_id: Optional[int] = None) -> Tuple[Optional[str], Optional[str]]:
|
||
"""生成 OpenAI OAuth 授权 URL
|
||
|
||
Returns:
|
||
tuple: (auth_url, session_id) 或 (None, None)
|
||
"""
|
||
headers = build_s2a_headers()
|
||
payload = {}
|
||
|
||
if proxy_id is not None:
|
||
payload["proxy_id"] = proxy_id
|
||
|
||
try:
|
||
response = http_session.post(
|
||
f"{S2A_API_BASE}/admin/openai/generate-auth-url",
|
||
headers=headers,
|
||
json=payload,
|
||
timeout=REQUEST_TIMEOUT
|
||
)
|
||
|
||
if response.status_code == 200:
|
||
result = response.json()
|
||
if result.get("code") == 0:
|
||
data = result.get("data", {})
|
||
auth_url = data.get("auth_url")
|
||
session_id = data.get("session_id")
|
||
|
||
if auth_url and session_id:
|
||
log.success(f"生成 S2A 授权链接成功 (会话: {session_id[:16]}...)")
|
||
return auth_url, session_id
|
||
|
||
log.error(f"生成 S2A 授权链接失败: HTTP {response.status_code}")
|
||
return None, None
|
||
|
||
except Exception as e:
|
||
log.error(f"S2A API 异常: {e}")
|
||
return None, None
|
||
|
||
|
||
def s2a_verify_account_in_pool(email: str, timeout: int = 10) -> Tuple[bool, Optional[Dict[str, Any]]]:
|
||
"""验证账号是否已成功入库到 S2A 账号池
|
||
|
||
通过请求 /admin/accounts 接口,检查第一个账号的 name 是否匹配邮箱
|
||
|
||
Args:
|
||
email: 要验证的邮箱地址
|
||
timeout: 超时时间 (秒)
|
||
|
||
Returns:
|
||
tuple: (是否成功, 账号数据或None)
|
||
"""
|
||
headers = build_s2a_headers()
|
||
|
||
try:
|
||
# 使用 search 参数搜索该邮箱
|
||
params = {
|
||
"page": 1,
|
||
"page_size": 20,
|
||
"platform": "",
|
||
"type": "",
|
||
"status": "",
|
||
"search": email,
|
||
"timezone": "Asia/Shanghai"
|
||
}
|
||
|
||
response = http_session.get(
|
||
f"{S2A_API_BASE}/admin/accounts",
|
||
headers=headers,
|
||
params=params,
|
||
timeout=timeout
|
||
)
|
||
|
||
if response.status_code == 200:
|
||
result = response.json()
|
||
if result.get("code") == 0:
|
||
data = result.get("data", {})
|
||
items = data.get("items", [])
|
||
|
||
if items:
|
||
# 检查第一个账号的 name 是否匹配
|
||
first_account = items[0]
|
||
account_name = first_account.get("name", "")
|
||
|
||
# 邮箱匹配检查 (忽略大小写)
|
||
if email.lower() in account_name.lower() or account_name.lower() in email.lower():
|
||
return True, first_account
|
||
|
||
return False, None
|
||
else:
|
||
log.warning(f"S2A 验证账号失败: {result.get('message', '未知错误')}")
|
||
else:
|
||
log.warning(f"S2A 验证账号失败: HTTP {response.status_code}")
|
||
|
||
return False, None
|
||
|
||
except Exception as e:
|
||
log.warning(f"S2A 验证账号异常: {e}")
|
||
return False, None
|
||
|
||
|
||
def s2a_create_account_from_oauth(
|
||
code: str,
|
||
session_id: str,
|
||
name: str = "",
|
||
proxy_id: Optional[int] = None
|
||
) -> Optional[Dict[str, Any]]:
|
||
"""一步完成:用授权码换取 token 并创建账号
|
||
|
||
Args:
|
||
code: 授权码
|
||
session_id: 会话 ID
|
||
name: 账号名称 (可选)
|
||
proxy_id: 代理 ID (可选)
|
||
|
||
Returns:
|
||
dict: 账号数据 或 None
|
||
"""
|
||
headers = build_s2a_headers()
|
||
payload = {
|
||
"session_id": session_id,
|
||
"code": code,
|
||
"concurrency": S2A_CONCURRENCY,
|
||
"priority": S2A_PRIORITY,
|
||
}
|
||
|
||
# 获取完整邮箱用于后续验证
|
||
full_email = name if "@" in name else ""
|
||
|
||
if name:
|
||
payload["name"] = name
|
||
if proxy_id is not None:
|
||
payload["proxy_id"] = proxy_id
|
||
|
||
group_ids = get_s2a_group_ids()
|
||
if group_ids:
|
||
payload["group_ids"] = group_ids
|
||
|
||
try:
|
||
log.step("正在提交授权码到 S2A...")
|
||
response = http_session.post(
|
||
f"{S2A_API_BASE}/admin/openai/create-from-oauth",
|
||
headers=headers,
|
||
json=payload,
|
||
timeout=REQUEST_TIMEOUT
|
||
)
|
||
|
||
if response.status_code == 200:
|
||
result = response.json()
|
||
if result.get("code") == 0:
|
||
account_data = result.get("data", {})
|
||
account_id = account_data.get("id")
|
||
account_name = account_data.get("name")
|
||
log.success(f"S2A 授权成功 (ID: {account_id}, 名称: {account_name})")
|
||
|
||
# 验证账号是否成功入库
|
||
if full_email or account_name:
|
||
verify_email = full_email or account_name
|
||
log.step(f"正在验证账号入库状态...")
|
||
verified, verified_data = s2a_verify_account_in_pool(verify_email)
|
||
|
||
if verified:
|
||
verified_id = verified_data.get("id", "")
|
||
verified_name = verified_data.get("name", "")
|
||
log.success(f"✅ 账号入库验证成功 (ID: {verified_id}, 名称: {verified_name})")
|
||
else:
|
||
log.warning(f"⚠️ 账号入库验证失败,但授权已成功")
|
||
|
||
return account_data
|
||
else:
|
||
error_msg = result.get('message', '未知错误')
|
||
log.error(f"S2A 账号创建失败: {error_msg}")
|
||
else:
|
||
log.error(f"S2A 账号创建失败: HTTP {response.status_code}")
|
||
|
||
return None
|
||
|
||
except Exception as e:
|
||
log.error(f"S2A 创建账号异常: {e}")
|
||
return None
|
||
|
||
|
||
def s2a_add_account(
|
||
name: str,
|
||
token_info: Dict[str, Any],
|
||
proxy_id: Optional[int] = None
|
||
) -> Optional[Dict[str, Any]]:
|
||
"""将账号添加到 S2A 账号池
|
||
|
||
Args:
|
||
name: 账号名称 (通常是邮箱)
|
||
token_info: Token 信息 (包含 access_token, refresh_token, expires_at)
|
||
proxy_id: 代理 ID (可选)
|
||
|
||
Returns:
|
||
dict: 账号数据 或 None
|
||
"""
|
||
headers = build_s2a_headers()
|
||
|
||
credentials = {
|
||
"access_token": token_info.get("access_token"),
|
||
"refresh_token": token_info.get("refresh_token"),
|
||
"expires_at": token_info.get("expires_at"),
|
||
}
|
||
|
||
if token_info.get("id_token"):
|
||
credentials["id_token"] = token_info.get("id_token")
|
||
if token_info.get("email"):
|
||
credentials["email"] = token_info.get("email")
|
||
|
||
payload = {
|
||
"name": name,
|
||
"platform": "openai",
|
||
"type": "oauth",
|
||
"credentials": credentials,
|
||
"concurrency": S2A_CONCURRENCY,
|
||
"priority": S2A_PRIORITY,
|
||
"auto_pause_on_expired": True,
|
||
}
|
||
|
||
if proxy_id is not None:
|
||
payload["proxy_id"] = proxy_id
|
||
|
||
group_ids = get_s2a_group_ids()
|
||
if group_ids:
|
||
payload["group_ids"] = group_ids
|
||
|
||
try:
|
||
response = http_session.post(
|
||
f"{S2A_API_BASE}/admin/accounts",
|
||
headers=headers,
|
||
json=payload,
|
||
timeout=REQUEST_TIMEOUT
|
||
)
|
||
|
||
if response.status_code == 200:
|
||
result = response.json()
|
||
if result.get("code") == 0:
|
||
account_data = result.get("data", {})
|
||
account_id = account_data.get("id")
|
||
log.success(f"S2A 账号添加成功 (ID: {account_id}, Name: {name})")
|
||
return account_data
|
||
else:
|
||
log.error(f"S2A 添加账号失败: {result.get('message', 'Unknown error')}")
|
||
else:
|
||
log.error(f"S2A 添加账号失败: HTTP {response.status_code}")
|
||
|
||
return None
|
||
|
||
except Exception as e:
|
||
log.error(f"S2A 添加账号异常: {e}")
|
||
return None
|
||
|
||
|
||
# ==================== 账号管理 ====================
|
||
def s2a_get_accounts(platform: str = "openai") -> List[Dict[str, Any]]:
|
||
"""获取账号列表"""
|
||
headers = build_s2a_headers()
|
||
|
||
try:
|
||
params = {"platform": platform} if platform else {}
|
||
response = http_session.get(
|
||
f"{S2A_API_BASE}/admin/accounts",
|
||
headers=headers,
|
||
params=params,
|
||
timeout=REQUEST_TIMEOUT
|
||
)
|
||
|
||
if response.status_code == 200:
|
||
result = response.json()
|
||
if result.get("code") == 0:
|
||
data = result.get("data", {})
|
||
if isinstance(data, dict) and "items" in data:
|
||
return data.get("items", [])
|
||
elif isinstance(data, list):
|
||
return data
|
||
return []
|
||
|
||
except Exception as e:
|
||
log.warning(f"S2A 获取账号列表异常: {e}")
|
||
|
||
return []
|
||
|
||
|
||
def s2a_get_error_accounts(
|
||
platform: str = "",
|
||
page_size: int = 100,
|
||
timezone: str = "Asia/Shanghai"
|
||
) -> Tuple[List[Dict[str, Any]], int]:
|
||
"""获取所有错误状态的账号(支持分页获取全部)
|
||
|
||
Args:
|
||
platform: 平台筛选 (默认为空,获取所有平台)
|
||
page_size: 每页数量
|
||
timezone: 时区
|
||
|
||
Returns:
|
||
tuple: (账号列表, 总数)
|
||
"""
|
||
if not S2A_API_BASE or (not S2A_ADMIN_KEY and not S2A_ADMIN_TOKEN):
|
||
return [], 0
|
||
|
||
headers = build_s2a_headers()
|
||
all_accounts = []
|
||
total_count = 0
|
||
page = 1
|
||
|
||
try:
|
||
while True:
|
||
params = {
|
||
"page": page,
|
||
"page_size": page_size,
|
||
"platform": platform,
|
||
"type": "",
|
||
"status": "error",
|
||
"search": "",
|
||
"timezone": timezone
|
||
}
|
||
|
||
response = http_session.get(
|
||
f"{S2A_API_BASE}/admin/accounts",
|
||
headers=headers,
|
||
params=params,
|
||
timeout=REQUEST_TIMEOUT
|
||
)
|
||
|
||
if response.status_code != 200:
|
||
log.warning(f"S2A 获取错误账号失败: HTTP {response.status_code}")
|
||
break
|
||
|
||
result = response.json()
|
||
if result.get("code") != 0:
|
||
log.warning(f"S2A 获取错误账号失败: {result.get('message', 'Unknown error')}")
|
||
break
|
||
|
||
data = result.get("data", {})
|
||
items = data.get("items", [])
|
||
total_count = data.get("total", 0)
|
||
total_pages = data.get("pages", 1)
|
||
|
||
all_accounts.extend(items)
|
||
|
||
# 如果已获取所有页面,退出循环
|
||
if page >= total_pages or not items:
|
||
break
|
||
|
||
page += 1
|
||
|
||
return all_accounts, total_count
|
||
|
||
except Exception as e:
|
||
log.warning(f"S2A 获取错误账号异常: {e}")
|
||
return [], 0
|
||
|
||
|
||
def s2a_delete_account(account_id: int) -> Tuple[bool, str]:
|
||
"""删除单个账号
|
||
|
||
Args:
|
||
account_id: 账号 ID
|
||
|
||
Returns:
|
||
tuple: (是否成功, 消息)
|
||
"""
|
||
if not S2A_API_BASE or (not S2A_ADMIN_KEY and not S2A_ADMIN_TOKEN):
|
||
return False, "S2A not configured"
|
||
|
||
headers = build_s2a_headers()
|
||
|
||
try:
|
||
response = http_session.delete(
|
||
f"{S2A_API_BASE}/admin/accounts/{account_id}",
|
||
headers=headers,
|
||
timeout=REQUEST_TIMEOUT
|
||
)
|
||
|
||
if response.status_code == 200:
|
||
result = response.json()
|
||
if result.get("code") == 0:
|
||
return True, "Deleted"
|
||
else:
|
||
return False, result.get("message", "Unknown error")
|
||
else:
|
||
return False, f"HTTP {response.status_code}"
|
||
|
||
except Exception as e:
|
||
return False, str(e)
|
||
|
||
|
||
def s2a_batch_delete_error_accounts(
|
||
progress_callback: Optional[callable] = None
|
||
) -> Dict[str, Any]:
|
||
"""批量删除所有错误状态的账号
|
||
|
||
Args:
|
||
progress_callback: 进度回调函数 (current, total, account_name, success)
|
||
|
||
Returns:
|
||
dict: {"success": int, "failed": int, "total": int, "details": [...]}
|
||
"""
|
||
results = {
|
||
"success": 0,
|
||
"failed": 0,
|
||
"total": 0,
|
||
"details": []
|
||
}
|
||
|
||
# 获取所有错误账号
|
||
error_accounts, total = s2a_get_error_accounts()
|
||
results["total"] = total
|
||
|
||
if not error_accounts:
|
||
return results
|
||
|
||
log.info(f"开始批量删除 {len(error_accounts)} 个错误账号...")
|
||
|
||
for i, account in enumerate(error_accounts):
|
||
account_id = account.get("id")
|
||
account_name = account.get("name", "")
|
||
error_message = account.get("error_message", "")
|
||
|
||
if not account_id:
|
||
results["failed"] += 1
|
||
results["details"].append({
|
||
"name": account_name,
|
||
"status": "failed",
|
||
"message": "Missing account ID"
|
||
})
|
||
continue
|
||
|
||
success, message = s2a_delete_account(account_id)
|
||
|
||
if success:
|
||
results["success"] += 1
|
||
results["details"].append({
|
||
"id": account_id,
|
||
"name": account_name,
|
||
"error": error_message[:50] if error_message else "",
|
||
"status": "deleted"
|
||
})
|
||
else:
|
||
results["failed"] += 1
|
||
results["details"].append({
|
||
"id": account_id,
|
||
"name": account_name,
|
||
"status": "failed",
|
||
"message": message
|
||
})
|
||
|
||
if progress_callback:
|
||
progress_callback(i + 1, len(error_accounts), account_name, success)
|
||
|
||
log.success(f"批量删除完成: 成功 {results['success']}, 失败 {results['failed']}")
|
||
return results
|
||
|
||
|
||
def s2a_check_account_exists(email: str, platform: str = "openai") -> bool:
|
||
"""检查账号是否已存在"""
|
||
accounts = s2a_get_accounts(platform)
|
||
|
||
for account in accounts:
|
||
account_name = account.get("name", "").lower()
|
||
credentials = account.get("credentials", {})
|
||
account_email = credentials.get("email", "").lower()
|
||
|
||
if account_name == email.lower() or account_email == email.lower():
|
||
return True
|
||
|
||
return False
|
||
|
||
|
||
# ==================== 工具函数 ====================
|
||
def extract_code_from_url(url: str) -> Optional[str]:
|
||
"""从回调 URL 中提取授权码"""
|
||
if not url:
|
||
return None
|
||
|
||
try:
|
||
parsed = urlparse(url)
|
||
params = parse_qs(parsed.query)
|
||
return params.get("code", [None])[0]
|
||
except Exception as e:
|
||
log.error(f"解析 URL 失败: {e}")
|
||
return None
|
||
|
||
|
||
def is_s2a_callback_url(url: str) -> bool:
|
||
"""检查 URL 是否为 S2A 回调 URL"""
|
||
if not url:
|
||
return False
|
||
return "localhost:1455/auth/callback" in url and "code=" in url
|
||
|
||
|
||
# ==================== 仪表盘统计 ====================
|
||
def s2a_get_dashboard_stats(timezone: str = "Asia/Shanghai") -> Optional[Dict[str, Any]]:
|
||
"""获取 S2A 仪表盘统计数据
|
||
|
||
Args:
|
||
timezone: 时区 (默认 Asia/Shanghai)
|
||
|
||
Returns:
|
||
dict: 仪表盘数据 或 None
|
||
"""
|
||
if not S2A_API_BASE or (not S2A_ADMIN_KEY and not S2A_ADMIN_TOKEN):
|
||
return None
|
||
|
||
headers = build_s2a_headers()
|
||
|
||
try:
|
||
response = http_session.get(
|
||
f"{S2A_API_BASE}/admin/dashboard/stats",
|
||
headers=headers,
|
||
params={"timezone": timezone},
|
||
timeout=REQUEST_TIMEOUT
|
||
)
|
||
|
||
if response.status_code == 200:
|
||
result = response.json()
|
||
if result.get("code") == 0:
|
||
return result.get("data", {})
|
||
else:
|
||
log.warning(f"S2A 仪表盘获取失败: {result.get('message', 'Unknown error')}")
|
||
else:
|
||
log.warning(f"S2A 仪表盘获取失败: HTTP {response.status_code}")
|
||
|
||
except Exception as e:
|
||
log.warning(f"S2A 仪表盘获取异常: {e}")
|
||
|
||
return None
|
||
|
||
|
||
def format_dashboard_stats(stats: Dict[str, Any]) -> str:
|
||
"""格式化仪表盘统计数据为可读文本
|
||
|
||
Args:
|
||
stats: 仪表盘原始数据
|
||
|
||
Returns:
|
||
str: 格式化后的文本
|
||
"""
|
||
if not stats:
|
||
return "暂无数据"
|
||
|
||
def fmt_num(n):
|
||
"""格式化数字 (添加千分位)"""
|
||
if isinstance(n, float):
|
||
return f"{n:,.2f}"
|
||
return f"{n:,}"
|
||
|
||
def fmt_tokens(n):
|
||
"""格式化 Token 数量 (简化显示)"""
|
||
if n >= 1_000_000_000:
|
||
return f"{n / 1_000_000_000:.2f}B"
|
||
elif n >= 1_000_000:
|
||
return f"{n / 1_000_000:.2f}M"
|
||
elif n >= 1_000:
|
||
return f"{n / 1_000:.1f}K"
|
||
return str(n)
|
||
|
||
# 账号状态
|
||
total_accounts = stats.get("total_accounts", 0)
|
||
normal_accounts = stats.get("normal_accounts", 0)
|
||
error_accounts = stats.get("error_accounts", 0)
|
||
ratelimit_accounts = stats.get("ratelimit_accounts", 0)
|
||
overload_accounts = stats.get("overload_accounts", 0)
|
||
|
||
# 今日统计
|
||
today_requests = stats.get("today_requests", 0)
|
||
today_tokens = stats.get("today_tokens", 0)
|
||
today_cost = stats.get("today_cost", 0)
|
||
today_input = stats.get("today_input_tokens", 0)
|
||
today_output = stats.get("today_output_tokens", 0)
|
||
today_cache_read = stats.get("today_cache_read_tokens", 0)
|
||
|
||
# 总计
|
||
total_requests = stats.get("total_requests", 0)
|
||
total_tokens = stats.get("total_tokens", 0)
|
||
total_cost = stats.get("total_cost", 0)
|
||
|
||
# 实时状态
|
||
rpm = stats.get("rpm", 0)
|
||
tpm = stats.get("tpm", 0)
|
||
active_users = stats.get("active_users", 0)
|
||
avg_duration = stats.get("average_duration_ms", 0)
|
||
|
||
lines = [
|
||
"<b>📊 S2A 仪表盘</b>",
|
||
"",
|
||
"<b>📦 账号状态</b>",
|
||
f" 总计: {total_accounts} | 正常: {normal_accounts}",
|
||
f" 异常: {error_accounts} | 限流: {ratelimit_accounts}",
|
||
"",
|
||
"<b>📅 今日统计</b>",
|
||
f" 请求数: {fmt_num(today_requests)}",
|
||
f" Token: {fmt_tokens(today_tokens)}",
|
||
f" 输入: {fmt_tokens(today_input)} | 输出: {fmt_tokens(today_output)}",
|
||
f" 缓存: {fmt_tokens(today_cache_read)}",
|
||
f" 费用: ${fmt_num(today_cost)}",
|
||
"",
|
||
"<b>📈 累计统计</b>",
|
||
f" 请求数: {fmt_num(total_requests)}",
|
||
f" Token: {fmt_tokens(total_tokens)}",
|
||
f" 费用: ${fmt_num(total_cost)}",
|
||
"",
|
||
"<b>⚡ 实时状态</b>",
|
||
f" RPM: {rpm} | TPM: {fmt_num(tpm)}",
|
||
f" 活跃用户: {active_users}",
|
||
f" 平均延迟: {avg_duration:.0f}ms",
|
||
]
|
||
|
||
return "\n".join(lines)
|
||
|
||
|
||
# ==================== 批量导入账号 ====================
|
||
def s2a_import_account_with_token(
|
||
email: str,
|
||
access_token: str,
|
||
password: str = "",
|
||
proxy_id: Optional[int] = None
|
||
) -> Tuple[bool, str]:
|
||
"""使用 access_token 直接导入账号到 S2A
|
||
|
||
Args:
|
||
email: 账号邮箱
|
||
access_token: OpenAI access token (JWT)
|
||
password: 账号密码 (可选,用于备注)
|
||
proxy_id: 代理 ID (可选)
|
||
|
||
Returns:
|
||
tuple: (success, message)
|
||
"""
|
||
if not S2A_API_BASE or (not S2A_ADMIN_KEY and not S2A_ADMIN_TOKEN):
|
||
return False, "S2A not configured"
|
||
|
||
headers = build_s2a_headers()
|
||
|
||
# 构建账号数据
|
||
credentials = {
|
||
"access_token": access_token,
|
||
}
|
||
|
||
payload = {
|
||
"name": email,
|
||
"platform": "openai",
|
||
"type": "access_token",
|
||
"credentials": credentials,
|
||
"concurrency": S2A_CONCURRENCY,
|
||
"priority": S2A_PRIORITY,
|
||
"auto_pause_on_expired": True,
|
||
}
|
||
|
||
if proxy_id is not None:
|
||
payload["proxy_id"] = proxy_id
|
||
|
||
group_ids = get_s2a_group_ids()
|
||
if group_ids:
|
||
payload["group_ids"] = group_ids
|
||
|
||
try:
|
||
response = http_session.post(
|
||
f"{S2A_API_BASE}/admin/accounts",
|
||
headers=headers,
|
||
json=payload,
|
||
timeout=REQUEST_TIMEOUT
|
||
)
|
||
|
||
if response.status_code == 200:
|
||
result = response.json()
|
||
if result.get("code") == 0:
|
||
account_data = result.get("data", {})
|
||
account_id = account_data.get("id")
|
||
return True, f"ID: {account_id}"
|
||
else:
|
||
error_msg = result.get("message", "Unknown error")
|
||
# 检查是否已存在
|
||
if "exist" in error_msg.lower() or "duplicate" in error_msg.lower():
|
||
return False, "Already exists"
|
||
return False, error_msg
|
||
else:
|
||
return False, f"HTTP {response.status_code}"
|
||
|
||
except Exception as e:
|
||
return False, str(e)
|
||
|
||
|
||
def s2a_batch_import_accounts(
|
||
accounts: List[Dict[str, str]],
|
||
progress_callback: Optional[callable] = None
|
||
) -> Dict[str, Any]:
|
||
"""批量导入账号到 S2A
|
||
|
||
Args:
|
||
accounts: 账号列表 [{"account": "email", "password": "pwd", "token": "jwt"}]
|
||
progress_callback: 进度回调函数 (current, total, email, status)
|
||
|
||
Returns:
|
||
dict: {"success": int, "failed": int, "skipped": int, "results": [...]}
|
||
"""
|
||
results = {
|
||
"success": 0,
|
||
"failed": 0,
|
||
"skipped": 0,
|
||
"details": []
|
||
}
|
||
|
||
total = len(accounts)
|
||
for i, acc in enumerate(accounts):
|
||
email = acc.get("account", "")
|
||
token = acc.get("token", "")
|
||
password = acc.get("password", "")
|
||
|
||
if not email or not token:
|
||
results["skipped"] += 1
|
||
results["details"].append({
|
||
"email": email or "unknown",
|
||
"status": "skipped",
|
||
"message": "Missing email or token"
|
||
})
|
||
continue
|
||
|
||
# 检查是否已存在
|
||
if s2a_check_account_exists(email):
|
||
results["skipped"] += 1
|
||
results["details"].append({
|
||
"email": email,
|
||
"status": "skipped",
|
||
"message": "Already exists"
|
||
})
|
||
if progress_callback:
|
||
progress_callback(i + 1, total, email, "skipped")
|
||
continue
|
||
|
||
# 导入账号
|
||
success, message = s2a_import_account_with_token(email, token, password)
|
||
|
||
if success:
|
||
results["success"] += 1
|
||
results["details"].append({
|
||
"email": email,
|
||
"status": "success",
|
||
"message": message
|
||
})
|
||
else:
|
||
if "exist" in message.lower():
|
||
results["skipped"] += 1
|
||
results["details"].append({
|
||
"email": email,
|
||
"status": "skipped",
|
||
"message": message
|
||
})
|
||
else:
|
||
results["failed"] += 1
|
||
results["details"].append({
|
||
"email": email,
|
||
"status": "failed",
|
||
"message": message
|
||
})
|
||
|
||
if progress_callback:
|
||
status = "success" if success else ("skipped" if "exist" in message.lower() else "failed")
|
||
progress_callback(i + 1, total, email, status)
|
||
|
||
return results
|
||
|
||
|
||
# ==================== API 密钥用量查询 ====================
|
||
def s2a_get_all_usage_stats(
|
||
start_date: str,
|
||
end_date: str,
|
||
timezone: str = "Asia/Shanghai"
|
||
) -> Optional[Dict[str, Any]]:
|
||
"""获取所有密钥的汇总用量统计(不传 api_key_id)
|
||
|
||
Returns:
|
||
dict: 汇总统计数据 或 None
|
||
"""
|
||
if not S2A_API_BASE or (not S2A_ADMIN_KEY and not S2A_ADMIN_TOKEN):
|
||
return None
|
||
|
||
headers = build_s2a_headers()
|
||
|
||
try:
|
||
response = http_session.get(
|
||
f"{S2A_API_BASE}/admin/usage/stats",
|
||
headers=headers,
|
||
params={
|
||
"start_date": start_date,
|
||
"end_date": end_date,
|
||
"timezone": timezone
|
||
},
|
||
timeout=REQUEST_TIMEOUT
|
||
)
|
||
|
||
if response.status_code == 200:
|
||
result = response.json()
|
||
if result.get("code") == 0:
|
||
return result.get("data", {})
|
||
else:
|
||
log.warning(f"S2A 获取汇总用量失败: HTTP {response.status_code}")
|
||
|
||
except Exception as e:
|
||
log.warning(f"S2A 获取汇总用量异常: {e}")
|
||
|
||
return None
|
||
|
||
|
||
def s2a_get_key_usage_stats(
|
||
api_key_id: int,
|
||
start_date: str,
|
||
end_date: str,
|
||
timezone: str = "Asia/Shanghai"
|
||
) -> Optional[Dict[str, Any]]:
|
||
"""获取单个 API 密钥的详细用量统计
|
||
|
||
Args:
|
||
api_key_id: 密钥 ID
|
||
start_date: 开始日期 (YYYY-MM-DD)
|
||
end_date: 结束日期 (YYYY-MM-DD)
|
||
timezone: 时区
|
||
|
||
Returns:
|
||
dict: 用量统计数据 或 None
|
||
"""
|
||
if not S2A_API_BASE or (not S2A_ADMIN_KEY and not S2A_ADMIN_TOKEN):
|
||
return None
|
||
|
||
headers = build_s2a_headers()
|
||
|
||
try:
|
||
response = http_session.get(
|
||
f"{S2A_API_BASE}/admin/usage/stats",
|
||
headers=headers,
|
||
params={
|
||
"api_key_id": api_key_id,
|
||
"start_date": start_date,
|
||
"end_date": end_date,
|
||
"timezone": timezone
|
||
},
|
||
timeout=REQUEST_TIMEOUT
|
||
)
|
||
|
||
if response.status_code == 200:
|
||
result = response.json()
|
||
if result.get("code") == 0:
|
||
return result.get("data", {})
|
||
else:
|
||
log.warning(f"S2A 获取密钥用量失败: {result.get('message', 'Unknown error')}")
|
||
else:
|
||
log.warning(f"S2A 获取密钥用量失败: HTTP {response.status_code}")
|
||
|
||
except Exception as e:
|
||
log.warning(f"S2A 获取密钥用量异常: {e}")
|
||
|
||
return None
|
||
|
||
|
||
# 密钥列表配置 (从 /api/v1/keys 接口获取的数据)
|
||
API_KEYS_LIST = [
|
||
{"id": 11, "name": "bohe", "key": "bohebohebohebohe", "group_name": "codex"},
|
||
{"id": 10, "name": "黑与白", "key": "oyqq114514oyqq114514", "group_name": "codex"},
|
||
{"id": 9, "name": "baozi", "key": "baozibaozibaozibaozi", "group_name": "codex"},
|
||
{"id": 7, "name": "Wong-灭鼠专家", "key": "Wong1234567891011", "group_name": "codex"},
|
||
{"id": 6, "name": "小马-莹佬", "key": "mazhichentaiqiangla", "group_name": "codex"},
|
||
{"id": 5, "name": "猫猫-byteBender", "key": "ByteBender114514", "group_name": "codex"},
|
||
{"id": 4, "name": "KFC", "key": "sk-babf3f6d15582b31e959311d6ceaa9448f2cf951dc23d02386d41f68046e6018", "group_name": "codex"},
|
||
{"id": 3, "name": "zhongruan", "key": "zhongruantaiqiangla", "group_name": "codex"},
|
||
]
|
||
|
||
|
||
def s2a_get_keys_with_usage(start_date: str = None, end_date: str = None, timezone: str = "Asia/Shanghai") -> Optional[List[Dict[str, Any]]]:
|
||
"""获取密钥列表并合并用量数据
|
||
|
||
Args:
|
||
start_date: 开始日期 (YYYY-MM-DD),默认今日
|
||
end_date: 结束日期 (YYYY-MM-DD),默认今日
|
||
timezone: 时区
|
||
|
||
Returns:
|
||
list: 包含用量信息的密钥列表 或 None
|
||
"""
|
||
from datetime import datetime
|
||
|
||
# 默认今日
|
||
if not start_date:
|
||
start_date = datetime.now().strftime("%Y-%m-%d")
|
||
if not end_date:
|
||
end_date = datetime.now().strftime("%Y-%m-%d")
|
||
|
||
keys = []
|
||
for key_info in API_KEYS_LIST:
|
||
key_id = key_info["id"]
|
||
usage = s2a_get_key_usage_stats(key_id, start_date, end_date, timezone)
|
||
keys.append({
|
||
"id": key_id,
|
||
"name": key_info["name"],
|
||
"key": key_info["key"],
|
||
"group_name": key_info["group_name"],
|
||
"status": "active",
|
||
"usage": usage if usage else {}
|
||
})
|
||
|
||
return keys if keys else None
|
||
|
||
|
||
def format_keys_usage(keys: List[Dict[str, Any]], period_text: str = "今日") -> str:
|
||
"""格式化密钥用量为可读文本
|
||
|
||
Args:
|
||
keys: 密钥列表 (包含 usage 字段)
|
||
period_text: 时间段描述
|
||
|
||
Returns:
|
||
str: 格式化后的文本
|
||
"""
|
||
if not keys:
|
||
return "暂无密钥数据"
|
||
|
||
def fmt_cost(n):
|
||
"""格式化费用"""
|
||
if n >= 1000:
|
||
return f"${n:,.2f}"
|
||
elif n >= 1:
|
||
return f"${n:.2f}"
|
||
return f"${n:.4f}"
|
||
|
||
def fmt_tokens(n):
|
||
"""格式化 Token 数量"""
|
||
if n >= 1_000_000_000:
|
||
return f"{n / 1_000_000_000:.2f}B"
|
||
elif n >= 1_000_000:
|
||
return f"{n / 1_000_000:.2f}M"
|
||
elif n >= 1_000:
|
||
return f"{n / 1_000:.1f}K"
|
||
return str(int(n))
|
||
|
||
def fmt_duration(ms):
|
||
"""格式化耗时"""
|
||
if ms >= 1000:
|
||
return f"{ms / 1000:.2f}s"
|
||
return f"{ms:.0f}ms"
|
||
|
||
# 按请求数排序(降序)
|
||
sorted_keys = sorted(keys, key=lambda k: k.get("usage", {}).get("total_requests", 0), reverse=True)
|
||
|
||
lines = [f"<b>◈ API 密钥用量 ({period_text})</b>", ""]
|
||
|
||
total_requests = 0
|
||
total_tokens = 0
|
||
total_cost = 0
|
||
|
||
for i, key in enumerate(sorted_keys, 1):
|
||
name = key.get("name", "未命名")
|
||
status = key.get("status", "active")
|
||
group_name = key.get("group_name", "默认")
|
||
|
||
usage = key.get("usage", {})
|
||
requests = usage.get("total_requests", 0)
|
||
tokens = usage.get("total_tokens", 0)
|
||
input_tokens = usage.get("total_input_tokens", 0)
|
||
output_tokens = usage.get("total_output_tokens", 0)
|
||
cache_tokens = usage.get("total_cache_tokens", 0)
|
||
cost = usage.get("total_actual_cost", 0)
|
||
avg_duration = usage.get("average_duration_ms", 0)
|
||
|
||
total_requests += requests
|
||
total_tokens += tokens
|
||
total_cost += cost
|
||
|
||
# 排名序号
|
||
rank = f"#{i}"
|
||
|
||
lines.append(f"{rank} <b>{name}</b>")
|
||
lines.append(f" ▸ {requests:,} 请求 · {fmt_duration(avg_duration)}")
|
||
lines.append(f" ▸ {fmt_tokens(tokens)} (↓{fmt_tokens(input_tokens)} ↑{fmt_tokens(output_tokens)})")
|
||
if cache_tokens > 0:
|
||
lines.append(f" ▸ 缓存 {fmt_tokens(cache_tokens)}")
|
||
lines.append(f" ▸ {fmt_cost(cost)}")
|
||
lines.append("")
|
||
|
||
# 汇总
|
||
lines.append("────────────────────")
|
||
lines.append(f"<b>◇ {period_text}汇总</b>")
|
||
lines.append(f" 密钥: {len(keys)} 个")
|
||
lines.append(f" 请求: {total_requests:,}")
|
||
lines.append(f" Token: {fmt_tokens(total_tokens)}")
|
||
lines.append(f" 费用: {fmt_cost(total_cost)}")
|
||
|
||
return "\n".join(lines)
|
||
|
||
|
||
# ==================== 批量 API 授权 ====================
|
||
|
||
def s2a_batch_api_authorize(
|
||
accounts: List[Dict[str, str]],
|
||
proxy: str = None,
|
||
progress_callback: Optional[callable] = None
|
||
) -> Dict[str, Any]:
|
||
"""批量使用 API 模式授权账号到 S2A
|
||
|
||
无需浏览器,直接通过 OpenAI 认证 API 完成授权。
|
||
|
||
Args:
|
||
accounts: 账号列表 [{"email": "xxx", "password": "xxx"}, ...]
|
||
proxy: 代理地址 (可选)
|
||
progress_callback: 进度回调函数 (current, total, email, status, message)
|
||
|
||
Returns:
|
||
dict: {
|
||
"success": int,
|
||
"failed": int,
|
||
"total": int,
|
||
"details": [{"email": "xxx", "status": "success/failed", "message": "xxx"}, ...]
|
||
}
|
||
"""
|
||
results = {
|
||
"success": 0,
|
||
"failed": 0,
|
||
"total": len(accounts),
|
||
"details": []
|
||
}
|
||
|
||
if not S2A_API_BASE or (not S2A_ADMIN_KEY and not S2A_ADMIN_TOKEN):
|
||
log.error("S2A 未配置")
|
||
return results
|
||
|
||
# 使用配置的代理
|
||
if not proxy:
|
||
proxy_config = get_next_proxy()
|
||
if proxy_config:
|
||
proxy = format_proxy_url(proxy_config)
|
||
|
||
log.info(f"开始批量 API 授权: {len(accounts)} 个账号")
|
||
|
||
for i, acc in enumerate(accounts):
|
||
email = acc.get("email", "")
|
||
password = acc.get("password", "")
|
||
|
||
if not email or not password:
|
||
results["failed"] += 1
|
||
results["details"].append({
|
||
"email": email or "unknown",
|
||
"status": "failed",
|
||
"message": "缺少邮箱或密码"
|
||
})
|
||
if progress_callback:
|
||
progress_callback(i + 1, len(accounts), email, "failed", "缺少邮箱或密码")
|
||
continue
|
||
|
||
try:
|
||
success, result = s2a_api_authorize(email, password, proxy)
|
||
|
||
if success:
|
||
results["success"] += 1
|
||
account_id = result.get("id", "") if result else ""
|
||
results["details"].append({
|
||
"email": email,
|
||
"status": "success",
|
||
"message": f"ID: {account_id}"
|
||
})
|
||
if progress_callback:
|
||
progress_callback(i + 1, len(accounts), email, "success", f"ID: {account_id}")
|
||
else:
|
||
results["failed"] += 1
|
||
results["details"].append({
|
||
"email": email,
|
||
"status": "failed",
|
||
"message": "授权失败"
|
||
})
|
||
if progress_callback:
|
||
progress_callback(i + 1, len(accounts), email, "failed", "授权失败")
|
||
|
||
except Exception as e:
|
||
results["failed"] += 1
|
||
results["details"].append({
|
||
"email": email,
|
||
"status": "failed",
|
||
"message": str(e)
|
||
})
|
||
if progress_callback:
|
||
progress_callback(i + 1, len(accounts), email, "failed", str(e))
|
||
|
||
log.success(f"批量授权完成: 成功 {results['success']}, 失败 {results['failed']}")
|
||
return results
|
||
|
||
|
||
def s2a_api_authorize_single(
|
||
email: str,
|
||
password: str,
|
||
proxy: str = None
|
||
) -> Tuple[bool, str]:
|
||
"""单个账号 API 授权 (简化返回值)
|
||
|
||
Args:
|
||
email: 账号邮箱
|
||
password: 账号密码
|
||
proxy: 代理地址 (可选)
|
||
|
||
Returns:
|
||
tuple: (是否成功, 消息)
|
||
"""
|
||
success, result = s2a_api_authorize(email, password, proxy)
|
||
|
||
if success:
|
||
account_id = result.get("id", "") if result else ""
|
||
return True, f"授权成功 (ID: {account_id})"
|
||
else:
|
||
return False, "授权失败"
|