Files
codexTool/s2a_service.py

1690 lines
53 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# ==================== S2A (Sub2API) 服务模块 ====================
# 处理 Sub2API 系统相关功能 (OpenAI OAuth 授权、账号入库)
#
# S2A 与 CPA/CRS 的关键差异:
# - 认证方式: S2A 支持 Admin API Key (x-api-key) 或 JWT Token (Bearer)
# - 会话标识: S2A 使用 session_id
# - 授权流程: S2A 生成授权 URL -> 用户授权 -> 提交 code 换取 token -> 创建账号
# - 账号入库: S2A 可一步完成 (create-from-oauth) 或分步完成 (exchange + add_account)
#
# 新增: 纯 API 授权模式 (无需浏览器)
# - 使用 curl_cffi 模拟浏览器指纹
# - 支持 Sentinel PoW 验证
# - 直接通过 API 完成 OAuth 流程
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from urllib.parse import urlparse, parse_qs
from typing import Optional, Tuple, Dict, List, Any
import json
import uuid
import time
import random
import base64
import hashlib
from datetime import datetime, timedelta, timezone
from config import (
S2A_API_BASE,
S2A_ADMIN_KEY,
S2A_ADMIN_TOKEN,
S2A_CONCURRENCY,
S2A_PRIORITY,
S2A_GROUP_IDS,
S2A_GROUP_NAMES,
REQUEST_TIMEOUT,
USER_AGENT,
get_next_proxy,
format_proxy_url,
)
from logger import log
# ==================== 分组 ID 缓存 ====================
_resolved_group_ids = None # 缓存解析后的 group_ids
def create_session_with_retry() -> requests.Session:
"""创建带重试机制的 HTTP Session"""
session = requests.Session()
retry_strategy = Retry(
total=5,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["HEAD", "GET", "POST", "PUT", "DELETE", "OPTIONS"]
)
adapter = HTTPAdapter(max_retries=retry_strategy)
session.mount("https://", adapter)
session.mount("http://", adapter)
return session
http_session = create_session_with_retry()
# ==================== PoW Solver (Sentinel 验证) ====================
def _fnv1a_32(data: bytes) -> int:
"""FNV-1a 32-bit hash"""
h = 2166136261
for byte in data:
h ^= byte
h = (h * 16777619) & 0xFFFFFFFF
h ^= (h >> 16)
h = (h * 2246822507) & 0xFFFFFFFF
h ^= (h >> 13)
h = (h * 3266489909) & 0xFFFFFFFF
h ^= (h >> 16)
return h
def _get_parse_time() -> str:
"""生成 JS Date().toString() 格式的时间戳"""
now = datetime.now(timezone(timedelta(hours=8)))
return now.strftime("%a %b %d %Y %H:%M:%S") + " GMT+0800 (中国标准时间)"
def _get_pow_config(user_agent: str, sid: str = None) -> list:
"""生成 PoW 配置数组"""
if not sid:
sid = str(uuid.uuid4())
return [
random.randint(2500, 3500),
_get_parse_time(),
4294967296,
0,
user_agent,
"chrome-extension://pgojnojmmhpofjgdmaebadhbocahppod/assets/aW5qZWN0X2hhc2g/aW5qZ",
None,
"zh-CN",
"zh-CN",
0,
f"canSharefunction canShare() {{ [native code] }}",
f"_reactListening{random.randint(1000000, 9999999)}",
"onhashchange",
time.perf_counter() * 1000,
sid,
"",
24,
int(time.time() * 1000 - random.randint(10000, 50000))
]
def _solve_pow(seed: str, difficulty: str, config: list, max_iterations: int = 5000000) -> Optional[str]:
"""CPU 求解 PoW"""
start_time = time.perf_counter()
seed_bytes = seed.encode()
for iteration in range(max_iterations):
config[3] = iteration
config[9] = 0
json_str = json.dumps(config, separators=(',', ':'))
encoded = base64.b64encode(json_str.encode())
h = _fnv1a_32(seed_bytes + encoded)
hex_hash = f"{h:08x}"
if hex_hash[:len(difficulty)] <= difficulty:
elapsed = time.perf_counter() - start_time
log.debug(f"[PoW] 求解完成: {elapsed:.2f}s (迭代 {iteration:,}, 难度={difficulty})")
return f"{encoded.decode()}~S"
return None
def _get_requirements_token(user_agent: str, sid: str = None) -> str:
"""生成 requirements token"""
if not sid:
sid = str(uuid.uuid4())
config = _get_pow_config(user_agent, sid)
config[3] = 0
config[9] = 0
json_str = json.dumps(config, separators=(',', ':'))
encoded = base64.b64encode(json_str.encode()).decode()
return f"gAAAAAC{encoded}~S"
# ==================== S2A API 授权器 ====================
class S2AApiAuthorizer:
"""S2A 纯 API 授权器 - 无需浏览器"""
def __init__(self, email: str, password: str, proxy: str = None):
self.email = email
self.password = password
# 尝试导入 curl_cffi如果失败则使用 requests
try:
from curl_cffi import requests as cffi_requests
self.session = cffi_requests.Session(impersonate="chrome110")
self._use_cffi = True
except ImportError:
log.warning("curl_cffi 未安装,使用 requests (可能被检测)")
self.session = requests.Session()
self._use_cffi = False
if proxy:
self.session.proxies = {"http": proxy, "https": proxy}
self.ua = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36"
self.sid = str(uuid.uuid4())
self.device_id = str(uuid.uuid4())
self.sentinel_token = None
self.solved_pow = None
self.session.headers.update({
"User-Agent": self.ua,
"Accept": "*/*",
"Accept-Language": "en-US,en;q=0.9",
"sec-ch-ua": '"Not(A:Brand";v="8", "Chromium";v="144", "Microsoft Edge";v="144"',
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": '"Windows"',
"sec-fetch-dest": "empty",
"sec-fetch-mode": "cors",
"sec-fetch-site": "same-origin",
})
def _call_sentinel_req(self, flow: str) -> Optional[dict]:
"""调用 sentinel 获取 token 和处理 PoW"""
init_token = _get_requirements_token(self.ua, self.sid)
payload = {"p": init_token, "id": self.device_id, "flow": flow}
try:
resp = self.session.post(
"https://sentinel.openai.com/backend-api/sentinel/req",
json=payload,
timeout=15
)
if resp.status_code != 200:
log.warning(f"Sentinel 请求失败: {resp.status_code}")
return None
data = resp.json()
self.sentinel_token = data.get('token')
pow_req = data.get('proofofwork', {})
if pow_req.get('required'):
seed = pow_req.get('seed', '')
difficulty = pow_req.get('difficulty', '')
config = _get_pow_config(self.ua, self.sid)
solved = _solve_pow(seed, difficulty, config)
if solved:
self.solved_pow = f"gAAAAAB{solved}"
else:
log.error("PoW 求解失败")
return None
else:
self.solved_pow = init_token
return data
except Exception as e:
log.error(f"Sentinel 异常: {e}")
return None
def _get_sentinel_header(self, header_flow: str) -> str:
"""构建 sentinel header"""
sentinel_obj = {"p": self.solved_pow, "id": self.device_id, "flow": header_flow}
if self.sentinel_token:
sentinel_obj["c"] = self.sentinel_token
return json.dumps(sentinel_obj)
def get_authorization_code(self, auth_url: str) -> Optional[str]:
"""执行 OAuth 流程,返回 authorization code
Args:
auth_url: S2A 生成的授权 URL
Returns:
str: 授权码 或 None
"""
log.step("开始 API 授权流程...")
headers = {
"Origin": "https://auth.openai.com",
"Referer": "https://auth.openai.com/log-in",
"Content-Type": "application/json"
}
try:
# 1. 访问授权端点
log.step("访问授权端点...")
resp = self.session.get(auth_url, allow_redirects=True)
headers["Referer"] = resp.url
# 2. 提交邮箱
log.step("提交邮箱...")
if not self._call_sentinel_req("login_web_init"):
return None
auth_headers = headers.copy()
auth_headers["OpenAI-Sentinel-Token"] = self._get_sentinel_header("authorize_continue")
resp = self.session.post(
"https://auth.openai.com/api/accounts/authorize/continue",
json={"username": {"kind": "email", "value": self.email}},
headers=auth_headers
)
if resp.status_code != 200:
log.error(f"邮箱提交失败: {resp.status_code} - {resp.text[:200]}")
return None
data = resp.json()
page_type = data.get("page", {}).get("type", "")
# 3. 验证密码
if page_type == "password" or "password" in str(data):
log.step("验证密码...")
if not self._call_sentinel_req("authorize_continue__auto"):
return None
verify_headers = headers.copy()
verify_headers["OpenAI-Sentinel-Token"] = self._get_sentinel_header("password_verify")
resp = self.session.post(
"https://auth.openai.com/api/accounts/password/verify",
json={"username": self.email, "password": self.password},
headers=verify_headers
)
if resp.status_code != 200:
log.error(f"密码验证失败: {resp.status_code} - {resp.text[:200]}")
return None
# 4. 获取 continue_url (无需选择 workspaceS2A 授权链接已包含)
data = resp.json()
continue_url = data.get("continue_url")
# 如果没有 continue_url可能需要额外的 sentinel 调用
if not continue_url:
log.step("获取重定向 URL...")
if not self._call_sentinel_req("password_verify__auto"):
return None
# 尝试再次获取
resp = self.session.post(
"https://auth.openai.com/api/accounts/authorize/continue",
json={},
headers=auth_headers
)
if resp.status_code == 200:
data = resp.json()
continue_url = data.get("continue_url")
if not continue_url:
log.error(f"无法获取 continue_url: {data}")
return None
# 5. 跟踪重定向直到获取 code
log.step("跟踪重定向...")
for _ in range(10):
resp = self.session.get(continue_url, allow_redirects=False)
if resp.status_code in (301, 302, 303, 307, 308):
location = resp.headers.get('Location', '')
if "localhost:1455" in location:
parsed = urlparse(location)
query = parse_qs(parsed.query)
code = query.get('code', [None])[0]
if code:
log.success("成功获取授权码")
return code
continue_url = location
else:
break
log.error("无法获取授权码")
return None
except Exception as e:
log.error(f"API 授权异常: {e}")
import traceback
traceback.print_exc()
return None
def s2a_api_authorize(
email: str,
password: str,
proxy: str = None
) -> Tuple[bool, Optional[Dict[str, Any]]]:
"""S2A 纯 API 授权 (无需浏览器)
使用 OpenAI 认证 API 直接完成授权流程,无需浏览器自动化。
Args:
email: 账号邮箱
password: 账号密码
proxy: 代理地址 (可选,格式: http://host:port 或 socks5://user:pass@host:port)
Returns:
tuple: (是否成功, 账号数据或None)
"""
if not S2A_API_BASE or (not S2A_ADMIN_KEY and not S2A_ADMIN_TOKEN):
log.error("S2A 未配置")
return False, None
# 使用配置的代理
if not proxy:
proxy_config = get_next_proxy()
if proxy_config:
proxy = format_proxy_url(proxy_config)
log.info(f"开始 S2A API 授权: {email}", icon="code")
if proxy:
log.debug(f"使用代理: {proxy[:30]}...")
try:
# 1. 生成授权 URL
auth_url, session_id = s2a_generate_auth_url()
if not auth_url or not session_id:
log.error("无法获取 S2A 授权 URL")
return False, None
log.debug(f"授权 URL: {auth_url[:80]}...")
log.debug(f"Session ID: {session_id[:16]}...")
# 2. 使用 API 授权器获取 code
authorizer = S2AApiAuthorizer(email, password, proxy)
code = authorizer.get_authorization_code(auth_url)
if not code:
log.error("API 授权失败,无法获取授权码")
return False, None
log.debug(f"授权码: {code[:20]}...")
# 3. 提交授权码创建账号
log.step("提交授权码到 S2A...")
result = s2a_create_account_from_oauth(code, session_id, name=email)
if result:
log.success(f"S2A API 授权成功: {email}")
return True, result
else:
log.error("S2A 账号入库失败")
return False, None
except Exception as e:
log.error(f"S2A API 授权异常: {e}")
return False, None
def build_s2a_headers() -> Dict[str, str]:
"""构建 S2A API 请求的 Headers
优先使用 Admin API Key如果未配置则使用 JWT Token
"""
headers = {
"accept": "application/json",
"content-type": "application/json",
"user-agent": USER_AGENT
}
if S2A_ADMIN_KEY:
headers["x-api-key"] = S2A_ADMIN_KEY
elif S2A_ADMIN_TOKEN:
headers["authorization"] = f"Bearer {S2A_ADMIN_TOKEN}"
return headers
def get_auth_method() -> Tuple[str, str]:
"""获取当前使用的认证方式
Returns:
tuple: (method_name, credential_preview)
"""
if S2A_ADMIN_KEY:
preview = S2A_ADMIN_KEY[:16] + "..." if len(S2A_ADMIN_KEY) > 16 else S2A_ADMIN_KEY
return "Admin API Key", preview
elif S2A_ADMIN_TOKEN:
preview = S2A_ADMIN_TOKEN[:16] + "..." if len(S2A_ADMIN_TOKEN) > 16 else S2A_ADMIN_TOKEN
return "JWT Token", preview
return "None", ""
# ==================== 分组管理 ====================
def s2a_get_groups() -> List[Dict[str, Any]]:
"""获取所有分组列表"""
headers = build_s2a_headers()
try:
response = http_session.get(
f"{S2A_API_BASE}/admin/groups",
headers=headers,
params={"page": 1, "page_size": 100},
timeout=REQUEST_TIMEOUT
)
if response.status_code == 200:
result = response.json()
if result.get("code") == 0:
data = result.get("data", {})
return data.get("items", [])
except Exception as e:
log.warning(f"S2A 获取分组列表异常: {e}")
return []
def s2a_resolve_group_ids(silent: bool = False) -> List[int]:
"""解析分组 ID 列表
优先使用 S2A_GROUP_IDS (直接配置的 ID)
如果未配置,则通过 S2A_GROUP_NAMES 查询 API 获取对应的 ID
Args:
silent: 是否静默模式 (不输出日志)
"""
global _resolved_group_ids
# 使用缓存
if _resolved_group_ids is not None:
return _resolved_group_ids
# 优先使用直接配置的 group_ids
if S2A_GROUP_IDS:
_resolved_group_ids = S2A_GROUP_IDS
return _resolved_group_ids
# 通过 group_names 查询获取 ID
if not S2A_GROUP_NAMES:
_resolved_group_ids = []
return _resolved_group_ids
groups = s2a_get_groups()
if not groups:
if not silent:
log.warning("S2A 无法获取分组列表group_names 解析失败")
_resolved_group_ids = []
return _resolved_group_ids
# 构建 name -> id 映射
name_to_id = {g.get("name", "").lower(): g.get("id") for g in groups}
resolved = []
not_found = []
for name in S2A_GROUP_NAMES:
group_id = name_to_id.get(name.lower())
if group_id is not None:
resolved.append(group_id)
else:
not_found.append(name)
if not_found and not silent:
log.warning(f"S2A 分组未找到: {', '.join(not_found)}")
_resolved_group_ids = resolved
return _resolved_group_ids
def get_s2a_group_ids() -> List[int]:
"""获取当前配置的分组 ID 列表 (供外部调用)"""
return s2a_resolve_group_ids()
# ==================== 连接验证 ====================
def s2a_verify_connection() -> Tuple[bool, str]:
"""验证 S2A 服务连接和认证有效性
Returns:
tuple: (is_valid, message)
"""
if not S2A_API_BASE:
return False, "S2A_API_BASE 未配置"
if not S2A_ADMIN_KEY and not S2A_ADMIN_TOKEN:
return False, "S2A_ADMIN_KEY 或 S2A_ADMIN_TOKEN 未配置"
auth_method, auth_preview = get_auth_method()
headers = build_s2a_headers()
try:
# 使用 /admin/groups 接口验证连接 (支持 x-api-key 认证)
response = http_session.get(
f"{S2A_API_BASE}/admin/groups",
headers=headers,
params={"page": 1, "page_size": 1},
timeout=REQUEST_TIMEOUT
)
if response.status_code == 200:
result = response.json()
if result.get("code") == 0:
# 解析分组配置
group_ids = s2a_resolve_group_ids(silent=True)
group_info = ""
if S2A_GROUP_NAMES:
group_info = f", 分组: {S2A_GROUP_NAMES} -> {group_ids}"
elif S2A_GROUP_IDS:
group_info = f", 分组 ID: {group_ids}"
return True, f"认证有效 (方式: {auth_method}{group_info})"
else:
return False, f"API 返回失败: {result.get('message', 'Unknown error')}"
elif response.status_code == 401:
return False, f"{auth_method} 无效或已过期 (HTTP 401)"
elif response.status_code == 403:
return False, f"{auth_method} 权限不足 (HTTP 403)"
else:
return False, f"服务异常 (HTTP {response.status_code})"
except requests.exceptions.Timeout:
return False, f"服务连接超时 ({S2A_API_BASE})"
except requests.exceptions.ConnectionError:
return False, f"无法连接到服务 ({S2A_API_BASE})"
except Exception as e:
return False, f"验证异常: {str(e)}"
# ==================== OAuth 授权 ====================
def s2a_generate_auth_url(proxy_id: Optional[int] = None) -> Tuple[Optional[str], Optional[str]]:
"""生成 OpenAI OAuth 授权 URL
Returns:
tuple: (auth_url, session_id) 或 (None, None)
"""
headers = build_s2a_headers()
payload = {}
if proxy_id is not None:
payload["proxy_id"] = proxy_id
try:
response = http_session.post(
f"{S2A_API_BASE}/admin/openai/generate-auth-url",
headers=headers,
json=payload,
timeout=REQUEST_TIMEOUT
)
if response.status_code == 200:
result = response.json()
if result.get("code") == 0:
data = result.get("data", {})
auth_url = data.get("auth_url")
session_id = data.get("session_id")
if auth_url and session_id:
log.success(f"生成 S2A 授权链接成功 (会话: {session_id[:16]}...)")
return auth_url, session_id
log.error(f"生成 S2A 授权链接失败: HTTP {response.status_code}")
return None, None
except Exception as e:
log.error(f"S2A API 异常: {e}")
return None, None
def s2a_verify_account_in_pool(email: str, timeout: int = 10) -> Tuple[bool, Optional[Dict[str, Any]]]:
"""验证账号是否已成功入库到 S2A 账号池
通过请求 /admin/accounts 接口,检查第一个账号的 name 是否匹配邮箱
Args:
email: 要验证的邮箱地址
timeout: 超时时间 (秒)
Returns:
tuple: (是否成功, 账号数据或None)
"""
headers = build_s2a_headers()
try:
# 使用 search 参数搜索该邮箱
params = {
"page": 1,
"page_size": 20,
"platform": "",
"type": "",
"status": "",
"search": email,
"timezone": "Asia/Shanghai"
}
response = http_session.get(
f"{S2A_API_BASE}/admin/accounts",
headers=headers,
params=params,
timeout=timeout
)
if response.status_code == 200:
result = response.json()
if result.get("code") == 0:
data = result.get("data", {})
items = data.get("items", [])
if items:
# 检查第一个账号的 name 是否匹配
first_account = items[0]
account_name = first_account.get("name", "")
# 邮箱匹配检查 (忽略大小写)
if email.lower() in account_name.lower() or account_name.lower() in email.lower():
return True, first_account
return False, None
else:
log.warning(f"S2A 验证账号失败: {result.get('message', '未知错误')}")
else:
log.warning(f"S2A 验证账号失败: HTTP {response.status_code}")
return False, None
except Exception as e:
log.warning(f"S2A 验证账号异常: {e}")
return False, None
def s2a_create_account_from_oauth(
code: str,
session_id: str,
name: str = "",
proxy_id: Optional[int] = None
) -> Optional[Dict[str, Any]]:
"""一步完成:用授权码换取 token 并创建账号
Args:
code: 授权码
session_id: 会话 ID
name: 账号名称 (可选)
proxy_id: 代理 ID (可选)
Returns:
dict: 账号数据 或 None
"""
headers = build_s2a_headers()
payload = {
"session_id": session_id,
"code": code,
"concurrency": S2A_CONCURRENCY,
"priority": S2A_PRIORITY,
}
# 获取完整邮箱用于后续验证
full_email = name if "@" in name else ""
if name:
payload["name"] = f"team-{name}"
if proxy_id is not None:
payload["proxy_id"] = proxy_id
group_ids = get_s2a_group_ids()
if group_ids:
payload["group_ids"] = group_ids
try:
log.step("正在提交授权码到 S2A...")
response = http_session.post(
f"{S2A_API_BASE}/admin/openai/create-from-oauth",
headers=headers,
json=payload,
timeout=REQUEST_TIMEOUT
)
if response.status_code == 200:
result = response.json()
if result.get("code") == 0:
account_data = result.get("data", {})
account_id = account_data.get("id")
account_name = account_data.get("name")
log.success(f"S2A 授权成功 (ID: {account_id}, 名称: {account_name})")
# 验证账号是否成功入库
if full_email or account_name:
verify_email = full_email or account_name
log.step(f"正在验证账号入库状态...")
verified, verified_data = s2a_verify_account_in_pool(verify_email)
if verified:
verified_id = verified_data.get("id", "")
verified_name = verified_data.get("name", "")
log.success(f"✅ 账号入库验证成功 (ID: {verified_id}, 名称: {verified_name})")
else:
log.warning(f"⚠️ 账号入库验证失败,但授权已成功")
return account_data
else:
error_msg = result.get('message', '未知错误')
log.error(f"S2A 账号创建失败: {error_msg}")
else:
log.error(f"S2A 账号创建失败: HTTP {response.status_code}")
return None
except Exception as e:
log.error(f"S2A 创建账号异常: {e}")
return None
def s2a_add_account(
name: str,
token_info: Dict[str, Any],
proxy_id: Optional[int] = None
) -> Optional[Dict[str, Any]]:
"""将账号添加到 S2A 账号池
Args:
name: 账号名称 (通常是邮箱)
token_info: Token 信息 (包含 access_token, refresh_token, expires_at)
proxy_id: 代理 ID (可选)
Returns:
dict: 账号数据 或 None
"""
headers = build_s2a_headers()
credentials = {
"access_token": token_info.get("access_token"),
"refresh_token": token_info.get("refresh_token"),
"expires_at": token_info.get("expires_at"),
}
if token_info.get("id_token"):
credentials["id_token"] = token_info.get("id_token")
if token_info.get("email"):
credentials["email"] = token_info.get("email")
payload = {
"name": f"team-{name}",
"platform": "openai",
"type": "oauth",
"credentials": credentials,
"concurrency": S2A_CONCURRENCY,
"priority": S2A_PRIORITY,
"auto_pause_on_expired": True,
}
if proxy_id is not None:
payload["proxy_id"] = proxy_id
group_ids = get_s2a_group_ids()
if group_ids:
payload["group_ids"] = group_ids
try:
response = http_session.post(
f"{S2A_API_BASE}/admin/accounts",
headers=headers,
json=payload,
timeout=REQUEST_TIMEOUT
)
if response.status_code == 200:
result = response.json()
if result.get("code") == 0:
account_data = result.get("data", {})
account_id = account_data.get("id")
log.success(f"S2A 账号添加成功 (ID: {account_id}, Name: {name})")
return account_data
else:
log.error(f"S2A 添加账号失败: {result.get('message', 'Unknown error')}")
else:
log.error(f"S2A 添加账号失败: HTTP {response.status_code}")
return None
except Exception as e:
log.error(f"S2A 添加账号异常: {e}")
return None
# ==================== 账号管理 ====================
def s2a_get_accounts(platform: str = "openai") -> List[Dict[str, Any]]:
"""获取账号列表"""
headers = build_s2a_headers()
try:
params = {"platform": platform} if platform else {}
response = http_session.get(
f"{S2A_API_BASE}/admin/accounts",
headers=headers,
params=params,
timeout=REQUEST_TIMEOUT
)
if response.status_code == 200:
result = response.json()
if result.get("code") == 0:
data = result.get("data", {})
if isinstance(data, dict) and "items" in data:
return data.get("items", [])
elif isinstance(data, list):
return data
return []
except Exception as e:
log.warning(f"S2A 获取账号列表异常: {e}")
return []
def s2a_get_error_accounts(
platform: str = "",
page_size: int = 100,
timezone: str = "Asia/Shanghai"
) -> Tuple[List[Dict[str, Any]], int]:
"""获取所有错误状态的账号(支持分页获取全部)
Args:
platform: 平台筛选 (默认为空,获取所有平台)
page_size: 每页数量
timezone: 时区
Returns:
tuple: (账号列表, 总数)
"""
if not S2A_API_BASE or (not S2A_ADMIN_KEY and not S2A_ADMIN_TOKEN):
return [], 0
headers = build_s2a_headers()
all_accounts = []
total_count = 0
page = 1
try:
while True:
params = {
"page": page,
"page_size": page_size,
"platform": platform,
"type": "",
"status": "error",
"search": "",
"timezone": timezone
}
response = http_session.get(
f"{S2A_API_BASE}/admin/accounts",
headers=headers,
params=params,
timeout=REQUEST_TIMEOUT
)
if response.status_code != 200:
log.warning(f"S2A 获取错误账号失败: HTTP {response.status_code}")
break
result = response.json()
if result.get("code") != 0:
log.warning(f"S2A 获取错误账号失败: {result.get('message', 'Unknown error')}")
break
data = result.get("data", {})
items = data.get("items", [])
total_count = data.get("total", 0)
total_pages = data.get("pages", 1)
all_accounts.extend(items)
# 如果已获取所有页面,退出循环
if page >= total_pages or not items:
break
page += 1
return all_accounts, total_count
except Exception as e:
log.warning(f"S2A 获取错误账号异常: {e}")
return [], 0
def s2a_delete_account(account_id: int) -> Tuple[bool, str]:
"""删除单个账号
Args:
account_id: 账号 ID
Returns:
tuple: (是否成功, 消息)
"""
if not S2A_API_BASE or (not S2A_ADMIN_KEY and not S2A_ADMIN_TOKEN):
return False, "S2A not configured"
headers = build_s2a_headers()
try:
response = http_session.delete(
f"{S2A_API_BASE}/admin/accounts/{account_id}",
headers=headers,
timeout=REQUEST_TIMEOUT
)
if response.status_code == 200:
result = response.json()
if result.get("code") == 0:
return True, "Deleted"
else:
return False, result.get("message", "Unknown error")
else:
return False, f"HTTP {response.status_code}"
except Exception as e:
return False, str(e)
def s2a_batch_delete_error_accounts(
progress_callback: Optional[callable] = None
) -> Dict[str, Any]:
"""批量删除所有错误状态的账号
Args:
progress_callback: 进度回调函数 (current, total, account_name, success)
Returns:
dict: {"success": int, "failed": int, "total": int, "details": [...]}
"""
results = {
"success": 0,
"failed": 0,
"total": 0,
"details": []
}
# 获取所有错误账号
error_accounts, total = s2a_get_error_accounts()
results["total"] = total
if not error_accounts:
return results
log.info(f"开始批量删除 {len(error_accounts)} 个错误账号...")
for i, account in enumerate(error_accounts):
account_id = account.get("id")
account_name = account.get("name", "")
error_message = account.get("error_message", "")
if not account_id:
results["failed"] += 1
results["details"].append({
"name": account_name,
"status": "failed",
"message": "Missing account ID"
})
continue
success, message = s2a_delete_account(account_id)
if success:
results["success"] += 1
results["details"].append({
"id": account_id,
"name": account_name,
"error": error_message[:50] if error_message else "",
"status": "deleted"
})
else:
results["failed"] += 1
results["details"].append({
"id": account_id,
"name": account_name,
"status": "failed",
"message": message
})
if progress_callback:
progress_callback(i + 1, len(error_accounts), account_name, success)
log.success(f"批量删除完成: 成功 {results['success']}, 失败 {results['failed']}")
return results
def s2a_check_account_exists(email: str, platform: str = "openai") -> bool:
"""检查账号是否已存在"""
accounts = s2a_get_accounts(platform)
for account in accounts:
account_name = account.get("name", "").lower()
credentials = account.get("credentials", {})
account_email = credentials.get("email", "").lower()
if account_name == email.lower() or account_email == email.lower():
return True
return False
# ==================== 工具函数 ====================
def extract_code_from_url(url: str) -> Optional[str]:
"""从回调 URL 中提取授权码"""
if not url:
return None
try:
parsed = urlparse(url)
params = parse_qs(parsed.query)
return params.get("code", [None])[0]
except Exception as e:
log.error(f"解析 URL 失败: {e}")
return None
def is_s2a_callback_url(url: str) -> bool:
"""检查 URL 是否为 S2A 回调 URL"""
if not url:
return False
return "localhost:1455/auth/callback" in url and "code=" in url
# ==================== 仪表盘统计 ====================
def s2a_get_dashboard_stats(timezone: str = "Asia/Shanghai") -> Optional[Dict[str, Any]]:
"""获取 S2A 仪表盘统计数据
Args:
timezone: 时区 (默认 Asia/Shanghai)
Returns:
dict: 仪表盘数据 或 None
"""
if not S2A_API_BASE or (not S2A_ADMIN_KEY and not S2A_ADMIN_TOKEN):
return None
headers = build_s2a_headers()
try:
response = http_session.get(
f"{S2A_API_BASE}/admin/dashboard/stats",
headers=headers,
params={"timezone": timezone},
timeout=REQUEST_TIMEOUT
)
if response.status_code == 200:
result = response.json()
if result.get("code") == 0:
return result.get("data", {})
else:
log.warning(f"S2A 仪表盘获取失败: {result.get('message', 'Unknown error')}")
else:
log.warning(f"S2A 仪表盘获取失败: HTTP {response.status_code}")
except Exception as e:
log.warning(f"S2A 仪表盘获取异常: {e}")
return None
def format_dashboard_stats(stats: Dict[str, Any]) -> str:
"""格式化仪表盘统计数据为可读文本
Args:
stats: 仪表盘原始数据
Returns:
str: 格式化后的文本
"""
if not stats:
return "暂无数据"
def fmt_num(n):
"""格式化数字 (添加千分位)"""
if isinstance(n, float):
return f"{n:,.2f}"
return f"{n:,}"
def fmt_tokens(n):
"""格式化 Token 数量 (简化显示)"""
if n >= 1_000_000_000:
return f"{n / 1_000_000_000:.2f}B"
elif n >= 1_000_000:
return f"{n / 1_000_000:.2f}M"
elif n >= 1_000:
return f"{n / 1_000:.1f}K"
return str(n)
# 账号状态
total_accounts = stats.get("total_accounts", 0)
normal_accounts = stats.get("normal_accounts", 0)
error_accounts = stats.get("error_accounts", 0)
ratelimit_accounts = stats.get("ratelimit_accounts", 0)
overload_accounts = stats.get("overload_accounts", 0)
# 今日统计
today_requests = stats.get("today_requests", 0)
today_tokens = stats.get("today_tokens", 0)
today_cost = stats.get("today_cost", 0)
today_input = stats.get("today_input_tokens", 0)
today_output = stats.get("today_output_tokens", 0)
today_cache_read = stats.get("today_cache_read_tokens", 0)
# 总计
total_requests = stats.get("total_requests", 0)
total_tokens = stats.get("total_tokens", 0)
total_cost = stats.get("total_cost", 0)
# 实时状态
rpm = stats.get("rpm", 0)
tpm = stats.get("tpm", 0)
active_users = stats.get("active_users", 0)
avg_duration = stats.get("average_duration_ms", 0)
lines = [
"<b>📊 S2A 仪表盘</b>",
"",
"<b>📦 账号状态</b>",
f" 总计: {total_accounts} | 正常: {normal_accounts}",
f" 异常: {error_accounts} | 限流: {ratelimit_accounts}",
"",
"<b>📅 今日统计</b>",
f" 请求数: {fmt_num(today_requests)}",
f" Token: {fmt_tokens(today_tokens)}",
f" 输入: {fmt_tokens(today_input)} | 输出: {fmt_tokens(today_output)}",
f" 缓存: {fmt_tokens(today_cache_read)}",
f" 费用: ${fmt_num(today_cost)}",
"",
"<b>📈 累计统计</b>",
f" 请求数: {fmt_num(total_requests)}",
f" Token: {fmt_tokens(total_tokens)}",
f" 费用: ${fmt_num(total_cost)}",
"",
"<b>⚡ 实时状态</b>",
f" RPM: {rpm} | TPM: {fmt_num(tpm)}",
f" 活跃用户: {active_users}",
f" 平均延迟: {avg_duration:.0f}ms",
]
return "\n".join(lines)
# ==================== 批量导入账号 ====================
def s2a_import_account_with_token(
email: str,
access_token: str,
password: str = "",
proxy_id: Optional[int] = None
) -> Tuple[bool, str]:
"""使用 access_token 直接导入账号到 S2A
Args:
email: 账号邮箱
access_token: OpenAI access token (JWT)
password: 账号密码 (可选,用于备注)
proxy_id: 代理 ID (可选)
Returns:
tuple: (success, message)
"""
if not S2A_API_BASE or (not S2A_ADMIN_KEY and not S2A_ADMIN_TOKEN):
return False, "S2A not configured"
headers = build_s2a_headers()
# 构建账号数据
credentials = {
"access_token": access_token,
}
payload = {
"name": email,
"platform": "openai",
"type": "access_token",
"credentials": credentials,
"concurrency": S2A_CONCURRENCY,
"priority": S2A_PRIORITY,
"auto_pause_on_expired": True,
}
if proxy_id is not None:
payload["proxy_id"] = proxy_id
group_ids = get_s2a_group_ids()
if group_ids:
payload["group_ids"] = group_ids
try:
response = http_session.post(
f"{S2A_API_BASE}/admin/accounts",
headers=headers,
json=payload,
timeout=REQUEST_TIMEOUT
)
if response.status_code == 200:
result = response.json()
if result.get("code") == 0:
account_data = result.get("data", {})
account_id = account_data.get("id")
return True, f"ID: {account_id}"
else:
error_msg = result.get("message", "Unknown error")
# 检查是否已存在
if "exist" in error_msg.lower() or "duplicate" in error_msg.lower():
return False, "Already exists"
return False, error_msg
else:
return False, f"HTTP {response.status_code}"
except Exception as e:
return False, str(e)
def s2a_batch_import_accounts(
accounts: List[Dict[str, str]],
progress_callback: Optional[callable] = None
) -> Dict[str, Any]:
"""批量导入账号到 S2A
Args:
accounts: 账号列表 [{"account": "email", "password": "pwd", "token": "jwt"}]
progress_callback: 进度回调函数 (current, total, email, status)
Returns:
dict: {"success": int, "failed": int, "skipped": int, "results": [...]}
"""
results = {
"success": 0,
"failed": 0,
"skipped": 0,
"details": []
}
total = len(accounts)
for i, acc in enumerate(accounts):
email = acc.get("account", "")
token = acc.get("token", "")
password = acc.get("password", "")
if not email or not token:
results["skipped"] += 1
results["details"].append({
"email": email or "unknown",
"status": "skipped",
"message": "Missing email or token"
})
continue
# 检查是否已存在
if s2a_check_account_exists(email):
results["skipped"] += 1
results["details"].append({
"email": email,
"status": "skipped",
"message": "Already exists"
})
if progress_callback:
progress_callback(i + 1, total, email, "skipped")
continue
# 导入账号
success, message = s2a_import_account_with_token(email, token, password)
if success:
results["success"] += 1
results["details"].append({
"email": email,
"status": "success",
"message": message
})
else:
if "exist" in message.lower():
results["skipped"] += 1
results["details"].append({
"email": email,
"status": "skipped",
"message": message
})
else:
results["failed"] += 1
results["details"].append({
"email": email,
"status": "failed",
"message": message
})
if progress_callback:
status = "success" if success else ("skipped" if "exist" in message.lower() else "failed")
progress_callback(i + 1, total, email, status)
return results
# ==================== API 密钥用量查询 ====================
def s2a_get_all_usage_stats(
start_date: str,
end_date: str,
timezone: str = "Asia/Shanghai"
) -> Optional[Dict[str, Any]]:
"""获取所有密钥的汇总用量统计(不传 api_key_id
Returns:
dict: 汇总统计数据 或 None
"""
if not S2A_API_BASE or (not S2A_ADMIN_KEY and not S2A_ADMIN_TOKEN):
return None
headers = build_s2a_headers()
try:
response = http_session.get(
f"{S2A_API_BASE}/admin/usage/stats",
headers=headers,
params={
"start_date": start_date,
"end_date": end_date,
"timezone": timezone
},
timeout=REQUEST_TIMEOUT
)
if response.status_code == 200:
result = response.json()
if result.get("code") == 0:
return result.get("data", {})
else:
log.warning(f"S2A 获取汇总用量失败: HTTP {response.status_code}")
except Exception as e:
log.warning(f"S2A 获取汇总用量异常: {e}")
return None
def s2a_get_key_usage_stats(
api_key_id: int,
start_date: str,
end_date: str,
timezone: str = "Asia/Shanghai"
) -> Optional[Dict[str, Any]]:
"""获取单个 API 密钥的详细用量统计
Args:
api_key_id: 密钥 ID
start_date: 开始日期 (YYYY-MM-DD)
end_date: 结束日期 (YYYY-MM-DD)
timezone: 时区
Returns:
dict: 用量统计数据 或 None
"""
if not S2A_API_BASE or (not S2A_ADMIN_KEY and not S2A_ADMIN_TOKEN):
return None
headers = build_s2a_headers()
try:
response = http_session.get(
f"{S2A_API_BASE}/admin/usage/stats",
headers=headers,
params={
"api_key_id": api_key_id,
"start_date": start_date,
"end_date": end_date,
"timezone": timezone
},
timeout=REQUEST_TIMEOUT
)
if response.status_code == 200:
result = response.json()
if result.get("code") == 0:
return result.get("data", {})
else:
log.warning(f"S2A 获取密钥用量失败: {result.get('message', 'Unknown error')}")
else:
log.warning(f"S2A 获取密钥用量失败: HTTP {response.status_code}")
except Exception as e:
log.warning(f"S2A 获取密钥用量异常: {e}")
return None
# 密钥列表配置 (从 /api/v1/keys 接口获取的数据)
API_KEYS_LIST = [
{"id": 11, "name": "bohe", "key": "bohebohebohebohe", "group_name": "codex"},
{"id": 10, "name": "黑与白", "key": "oyqq114514oyqq114514", "group_name": "codex"},
{"id": 9, "name": "baozi", "key": "baozibaozibaozibaozi", "group_name": "codex"},
{"id": 7, "name": "Wong-灭鼠专家", "key": "Wong1234567891011", "group_name": "codex"},
{"id": 6, "name": "小马-莹佬", "key": "mazhichentaiqiangla", "group_name": "codex"},
{"id": 5, "name": "猫猫-byteBender", "key": "ByteBender114514", "group_name": "codex"},
{"id": 4, "name": "KFC", "key": "sk-babf3f6d15582b31e959311d6ceaa9448f2cf951dc23d02386d41f68046e6018", "group_name": "codex"},
{"id": 3, "name": "zhongruan", "key": "zhongruantaiqiangla", "group_name": "codex"},
]
def s2a_get_keys_with_usage(start_date: str = None, end_date: str = None, timezone: str = "Asia/Shanghai") -> Optional[List[Dict[str, Any]]]:
"""获取密钥列表并合并用量数据
Args:
start_date: 开始日期 (YYYY-MM-DD),默认今日
end_date: 结束日期 (YYYY-MM-DD),默认今日
timezone: 时区
Returns:
list: 包含用量信息的密钥列表 或 None
"""
from datetime import datetime
# 默认今日
if not start_date:
start_date = datetime.now().strftime("%Y-%m-%d")
if not end_date:
end_date = datetime.now().strftime("%Y-%m-%d")
keys = []
for key_info in API_KEYS_LIST:
key_id = key_info["id"]
usage = s2a_get_key_usage_stats(key_id, start_date, end_date, timezone)
keys.append({
"id": key_id,
"name": key_info["name"],
"key": key_info["key"],
"group_name": key_info["group_name"],
"status": "active",
"usage": usage if usage else {}
})
return keys if keys else None
def format_keys_usage(keys: List[Dict[str, Any]], period_text: str = "今日") -> str:
"""格式化密钥用量为可读文本
Args:
keys: 密钥列表 (包含 usage 字段)
period_text: 时间段描述
Returns:
str: 格式化后的文本
"""
if not keys:
return "暂无密钥数据"
def fmt_cost(n):
"""格式化费用"""
if n >= 1000:
return f"${n:,.2f}"
elif n >= 1:
return f"${n:.2f}"
return f"${n:.4f}"
def fmt_tokens(n):
"""格式化 Token 数量"""
if n >= 1_000_000_000:
return f"{n / 1_000_000_000:.2f}B"
elif n >= 1_000_000:
return f"{n / 1_000_000:.2f}M"
elif n >= 1_000:
return f"{n / 1_000:.1f}K"
return str(int(n))
def fmt_duration(ms):
"""格式化耗时"""
if ms >= 1000:
return f"{ms / 1000:.2f}s"
return f"{ms:.0f}ms"
# 按请求数排序(降序)
sorted_keys = sorted(keys, key=lambda k: k.get("usage", {}).get("total_requests", 0), reverse=True)
lines = [f"<b>◈ API 密钥用量 ({period_text})</b>", ""]
total_requests = 0
total_tokens = 0
total_cost = 0
for i, key in enumerate(sorted_keys, 1):
name = key.get("name", "未命名")
status = key.get("status", "active")
group_name = key.get("group_name", "默认")
usage = key.get("usage", {})
requests = usage.get("total_requests", 0)
tokens = usage.get("total_tokens", 0)
input_tokens = usage.get("total_input_tokens", 0)
output_tokens = usage.get("total_output_tokens", 0)
cache_tokens = usage.get("total_cache_tokens", 0)
cost = usage.get("total_actual_cost", 0)
avg_duration = usage.get("average_duration_ms", 0)
total_requests += requests
total_tokens += tokens
total_cost += cost
# 排名序号
rank = f"#{i}"
lines.append(f"{rank} <b>{name}</b>")
lines.append(f"{requests:,} 请求 · {fmt_duration(avg_duration)}")
lines.append(f"{fmt_tokens(tokens)} (↓{fmt_tokens(input_tokens)}{fmt_tokens(output_tokens)})")
if cache_tokens > 0:
lines.append(f" ▸ 缓存 {fmt_tokens(cache_tokens)}")
lines.append(f"{fmt_cost(cost)}")
lines.append("")
# 汇总
lines.append("────────────────────")
lines.append(f"<b>◇ {period_text}汇总</b>")
lines.append(f" 密钥: {len(keys)}")
lines.append(f" 请求: {total_requests:,}")
lines.append(f" Token: {fmt_tokens(total_tokens)}")
lines.append(f" 费用: {fmt_cost(total_cost)}")
return "\n".join(lines)
# ==================== 批量 API 授权 ====================
def s2a_batch_api_authorize(
accounts: List[Dict[str, str]],
proxy: str = None,
progress_callback: Optional[callable] = None
) -> Dict[str, Any]:
"""批量使用 API 模式授权账号到 S2A
无需浏览器,直接通过 OpenAI 认证 API 完成授权。
Args:
accounts: 账号列表 [{"email": "xxx", "password": "xxx"}, ...]
proxy: 代理地址 (可选)
progress_callback: 进度回调函数 (current, total, email, status, message)
Returns:
dict: {
"success": int,
"failed": int,
"total": int,
"details": [{"email": "xxx", "status": "success/failed", "message": "xxx"}, ...]
}
"""
results = {
"success": 0,
"failed": 0,
"total": len(accounts),
"details": []
}
if not S2A_API_BASE or (not S2A_ADMIN_KEY and not S2A_ADMIN_TOKEN):
log.error("S2A 未配置")
return results
# 使用配置的代理
if not proxy:
proxy_config = get_next_proxy()
if proxy_config:
proxy = format_proxy_url(proxy_config)
log.info(f"开始批量 API 授权: {len(accounts)} 个账号")
for i, acc in enumerate(accounts):
email = acc.get("email", "")
password = acc.get("password", "")
if not email or not password:
results["failed"] += 1
results["details"].append({
"email": email or "unknown",
"status": "failed",
"message": "缺少邮箱或密码"
})
if progress_callback:
progress_callback(i + 1, len(accounts), email, "failed", "缺少邮箱或密码")
continue
try:
success, result = s2a_api_authorize(email, password, proxy)
if success:
results["success"] += 1
account_id = result.get("id", "") if result else ""
results["details"].append({
"email": email,
"status": "success",
"message": f"ID: {account_id}"
})
if progress_callback:
progress_callback(i + 1, len(accounts), email, "success", f"ID: {account_id}")
else:
results["failed"] += 1
results["details"].append({
"email": email,
"status": "failed",
"message": "授权失败"
})
if progress_callback:
progress_callback(i + 1, len(accounts), email, "failed", "授权失败")
except Exception as e:
results["failed"] += 1
results["details"].append({
"email": email,
"status": "failed",
"message": str(e)
})
if progress_callback:
progress_callback(i + 1, len(accounts), email, "failed", str(e))
log.success(f"批量授权完成: 成功 {results['success']}, 失败 {results['failed']}")
return results
def s2a_api_authorize_single(
email: str,
password: str,
proxy: str = None
) -> Tuple[bool, str]:
"""单个账号 API 授权 (简化返回值)
Args:
email: 账号邮箱
password: 账号密码
proxy: 代理地址 (可选)
Returns:
tuple: (是否成功, 消息)
"""
success, result = s2a_api_authorize(email, password, proxy)
if success:
account_id = result.get("id", "") if result else ""
return True, f"授权成功 (ID: {account_id})"
else:
return False, "授权失败"