# ==================== S2A (Sub2API) 服务模块 ==================== # 处理 Sub2API 系统相关功能 (OpenAI OAuth 授权、账号入库) # # S2A 与 CPA/CRS 的关键差异: # - 认证方式: S2A 支持 Admin API Key (x-api-key) 或 JWT Token (Bearer) # - 会话标识: S2A 使用 session_id # - 授权流程: S2A 生成授权 URL -> 用户授权 -> 提交 code 换取 token -> 创建账号 # - 账号入库: S2A 可一步完成 (create-from-oauth) 或分步完成 (exchange + add_account) # # 新增: 纯 API 授权模式 (无需浏览器) # - 使用 curl_cffi 模拟浏览器指纹 # - 支持 Sentinel PoW 验证 # - 直接通过 API 完成 OAuth 流程 import requests from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry from urllib.parse import urlparse, parse_qs from typing import Optional, Tuple, Dict, List, Any import json import uuid import time import random import base64 import hashlib from datetime import datetime, timedelta, timezone from config import ( S2A_API_BASE, S2A_ADMIN_KEY, S2A_ADMIN_TOKEN, S2A_CONCURRENCY, S2A_PRIORITY, S2A_GROUP_IDS, S2A_GROUP_NAMES, REQUEST_TIMEOUT, USER_AGENT, get_next_proxy, format_proxy_url, ) from logger import log # ==================== 分组 ID 缓存 ==================== _resolved_group_ids = None # 缓存解析后的 group_ids def create_session_with_retry() -> requests.Session: """创建带重试机制的 HTTP Session""" session = requests.Session() retry_strategy = Retry( total=5, backoff_factor=1, status_forcelist=[429, 500, 502, 503, 504], allowed_methods=["HEAD", "GET", "POST", "PUT", "DELETE", "OPTIONS"] ) adapter = HTTPAdapter(max_retries=retry_strategy) session.mount("https://", adapter) session.mount("http://", adapter) return session http_session = create_session_with_retry() # ==================== PoW Solver (Sentinel 验证) ==================== def _fnv1a_32(data: bytes) -> int: """FNV-1a 32-bit hash""" h = 2166136261 for byte in data: h ^= byte h = (h * 16777619) & 0xFFFFFFFF h ^= (h >> 16) h = (h * 2246822507) & 0xFFFFFFFF h ^= (h >> 13) h = (h * 3266489909) & 0xFFFFFFFF h ^= (h >> 16) return h def _get_parse_time() -> str: """生成 JS Date().toString() 格式的时间戳""" now = datetime.now(timezone(timedelta(hours=8))) return now.strftime("%a %b %d %Y %H:%M:%S") + " GMT+0800 (中国标准时间)" def _get_pow_config(user_agent: str, sid: str = None) -> list: """生成 PoW 配置数组""" if not sid: sid = str(uuid.uuid4()) return [ random.randint(2500, 3500), _get_parse_time(), 4294967296, 0, user_agent, "chrome-extension://pgojnojmmhpofjgdmaebadhbocahppod/assets/aW5qZWN0X2hhc2g/aW5qZ", None, "zh-CN", "zh-CN", 0, f"canShare−function canShare() {{ [native code] }}", f"_reactListening{random.randint(1000000, 9999999)}", "onhashchange", time.perf_counter() * 1000, sid, "", 24, int(time.time() * 1000 - random.randint(10000, 50000)) ] def _solve_pow(seed: str, difficulty: str, config: list, max_iterations: int = 5000000) -> Optional[str]: """CPU 求解 PoW""" start_time = time.perf_counter() seed_bytes = seed.encode() for iteration in range(max_iterations): config[3] = iteration config[9] = 0 json_str = json.dumps(config, separators=(',', ':')) encoded = base64.b64encode(json_str.encode()) h = _fnv1a_32(seed_bytes + encoded) hex_hash = f"{h:08x}" if hex_hash[:len(difficulty)] <= difficulty: elapsed = time.perf_counter() - start_time log.debug(f"[PoW] 求解完成: {elapsed:.2f}s (迭代 {iteration:,}, 难度={difficulty})") return f"{encoded.decode()}~S" return None def _get_requirements_token(user_agent: str, sid: str = None) -> str: """生成 requirements token""" if not sid: sid = str(uuid.uuid4()) config = _get_pow_config(user_agent, sid) config[3] = 0 config[9] = 0 json_str = json.dumps(config, separators=(',', ':')) encoded = base64.b64encode(json_str.encode()).decode() return f"gAAAAAC{encoded}~S" # ==================== S2A API 授权器 ==================== class S2AApiAuthorizer: """S2A 纯 API 授权器 - 无需浏览器""" def __init__(self, email: str, password: str, proxy: str = None): self.email = email self.password = password # 尝试导入 curl_cffi,如果失败则使用 requests try: from curl_cffi import requests as cffi_requests self.session = cffi_requests.Session(impersonate="chrome110") self._use_cffi = True except ImportError: log.warning("curl_cffi 未安装,使用 requests (可能被检测)") self.session = requests.Session() self._use_cffi = False if proxy: self.session.proxies = {"http": proxy, "https": proxy} self.ua = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36" self.sid = str(uuid.uuid4()) self.device_id = str(uuid.uuid4()) self.sentinel_token = None self.solved_pow = None self.session.headers.update({ "User-Agent": self.ua, "Accept": "*/*", "Accept-Language": "en-US,en;q=0.9", "sec-ch-ua": '"Not(A:Brand";v="8", "Chromium";v="144", "Microsoft Edge";v="144"', "sec-ch-ua-mobile": "?0", "sec-ch-ua-platform": '"Windows"', "sec-fetch-dest": "empty", "sec-fetch-mode": "cors", "sec-fetch-site": "same-origin", }) def _call_sentinel_req(self, flow: str) -> Optional[dict]: """调用 sentinel 获取 token 和处理 PoW""" init_token = _get_requirements_token(self.ua, self.sid) payload = {"p": init_token, "id": self.device_id, "flow": flow} try: resp = self.session.post( "https://sentinel.openai.com/backend-api/sentinel/req", json=payload, timeout=15 ) if resp.status_code != 200: log.warning(f"Sentinel 请求失败: {resp.status_code}") return None data = resp.json() self.sentinel_token = data.get('token') pow_req = data.get('proofofwork', {}) if pow_req.get('required'): seed = pow_req.get('seed', '') difficulty = pow_req.get('difficulty', '') config = _get_pow_config(self.ua, self.sid) solved = _solve_pow(seed, difficulty, config) if solved: self.solved_pow = f"gAAAAAB{solved}" else: log.error("PoW 求解失败") return None else: self.solved_pow = init_token return data except Exception as e: log.error(f"Sentinel 异常: {e}") return None def _get_sentinel_header(self, header_flow: str) -> str: """构建 sentinel header""" sentinel_obj = {"p": self.solved_pow, "id": self.device_id, "flow": header_flow} if self.sentinel_token: sentinel_obj["c"] = self.sentinel_token return json.dumps(sentinel_obj) def get_authorization_code(self, auth_url: str) -> Optional[str]: """执行 OAuth 流程,返回 authorization code Args: auth_url: S2A 生成的授权 URL Returns: str: 授权码 或 None """ log.step("开始 API 授权流程...") headers = { "Origin": "https://auth.openai.com", "Referer": "https://auth.openai.com/log-in", "Content-Type": "application/json" } try: # 1. 访问授权端点 log.step("访问授权端点...") resp = self.session.get(auth_url, allow_redirects=True) headers["Referer"] = resp.url # 2. 提交邮箱 log.step("提交邮箱...") if not self._call_sentinel_req("login_web_init"): return None auth_headers = headers.copy() auth_headers["OpenAI-Sentinel-Token"] = self._get_sentinel_header("authorize_continue") resp = self.session.post( "https://auth.openai.com/api/accounts/authorize/continue", json={"username": {"kind": "email", "value": self.email}}, headers=auth_headers ) if resp.status_code != 200: log.error(f"邮箱提交失败: {resp.status_code} - {resp.text[:200]}") return None data = resp.json() page_type = data.get("page", {}).get("type", "") # 3. 验证密码 if page_type == "password" or "password" in str(data): log.step("验证密码...") if not self._call_sentinel_req("authorize_continue__auto"): return None verify_headers = headers.copy() verify_headers["OpenAI-Sentinel-Token"] = self._get_sentinel_header("password_verify") resp = self.session.post( "https://auth.openai.com/api/accounts/password/verify", json={"username": self.email, "password": self.password}, headers=verify_headers ) if resp.status_code != 200: log.error(f"密码验证失败: {resp.status_code} - {resp.text[:200]}") return None # 4. 获取 continue_url (无需选择 workspace,S2A 授权链接已包含) data = resp.json() continue_url = data.get("continue_url") # 如果没有 continue_url,可能需要额外的 sentinel 调用 if not continue_url: log.step("获取重定向 URL...") if not self._call_sentinel_req("password_verify__auto"): return None # 尝试再次获取 resp = self.session.post( "https://auth.openai.com/api/accounts/authorize/continue", json={}, headers=auth_headers ) if resp.status_code == 200: data = resp.json() continue_url = data.get("continue_url") if not continue_url: log.error(f"无法获取 continue_url: {data}") return None # 5. 跟踪重定向直到获取 code log.step("跟踪重定向...") for _ in range(10): resp = self.session.get(continue_url, allow_redirects=False) if resp.status_code in (301, 302, 303, 307, 308): location = resp.headers.get('Location', '') if "localhost:1455" in location: parsed = urlparse(location) query = parse_qs(parsed.query) code = query.get('code', [None])[0] if code: log.success("成功获取授权码") return code continue_url = location else: break log.error("无法获取授权码") return None except Exception as e: log.error(f"API 授权异常: {e}") import traceback traceback.print_exc() return None def s2a_api_authorize( email: str, password: str, proxy: str = None ) -> Tuple[bool, Optional[Dict[str, Any]]]: """S2A 纯 API 授权 (无需浏览器) 使用 OpenAI 认证 API 直接完成授权流程,无需浏览器自动化。 Args: email: 账号邮箱 password: 账号密码 proxy: 代理地址 (可选,格式: http://host:port 或 socks5://user:pass@host:port) Returns: tuple: (是否成功, 账号数据或None) """ if not S2A_API_BASE or (not S2A_ADMIN_KEY and not S2A_ADMIN_TOKEN): log.error("S2A 未配置") return False, None # 使用配置的代理 if not proxy: proxy_config = get_next_proxy() if proxy_config: proxy = format_proxy_url(proxy_config) log.info(f"开始 S2A API 授权: {email}", icon="code") if proxy: log.debug(f"使用代理: {proxy[:30]}...") try: # 1. 生成授权 URL auth_url, session_id = s2a_generate_auth_url() if not auth_url or not session_id: log.error("无法获取 S2A 授权 URL") return False, None log.debug(f"授权 URL: {auth_url[:80]}...") log.debug(f"Session ID: {session_id[:16]}...") # 2. 使用 API 授权器获取 code authorizer = S2AApiAuthorizer(email, password, proxy) code = authorizer.get_authorization_code(auth_url) if not code: log.error("API 授权失败,无法获取授权码") return False, None log.debug(f"授权码: {code[:20]}...") # 3. 提交授权码创建账号 log.step("提交授权码到 S2A...") result = s2a_create_account_from_oauth(code, session_id, name=email) if result: log.success(f"S2A API 授权成功: {email}") return True, result else: log.error("S2A 账号入库失败") return False, None except Exception as e: log.error(f"S2A API 授权异常: {e}") return False, None def build_s2a_headers() -> Dict[str, str]: """构建 S2A API 请求的 Headers 优先使用 Admin API Key,如果未配置则使用 JWT Token """ headers = { "accept": "application/json", "content-type": "application/json", "user-agent": USER_AGENT } if S2A_ADMIN_KEY: headers["x-api-key"] = S2A_ADMIN_KEY elif S2A_ADMIN_TOKEN: headers["authorization"] = f"Bearer {S2A_ADMIN_TOKEN}" return headers def get_auth_method() -> Tuple[str, str]: """获取当前使用的认证方式 Returns: tuple: (method_name, credential_preview) """ if S2A_ADMIN_KEY: preview = S2A_ADMIN_KEY[:16] + "..." if len(S2A_ADMIN_KEY) > 16 else S2A_ADMIN_KEY return "Admin API Key", preview elif S2A_ADMIN_TOKEN: preview = S2A_ADMIN_TOKEN[:16] + "..." if len(S2A_ADMIN_TOKEN) > 16 else S2A_ADMIN_TOKEN return "JWT Token", preview return "None", "" # ==================== 分组管理 ==================== def s2a_get_groups() -> List[Dict[str, Any]]: """获取所有分组列表""" headers = build_s2a_headers() try: response = http_session.get( f"{S2A_API_BASE}/admin/groups", headers=headers, params={"page": 1, "page_size": 100}, timeout=REQUEST_TIMEOUT ) if response.status_code == 200: result = response.json() if result.get("code") == 0: data = result.get("data", {}) return data.get("items", []) except Exception as e: log.warning(f"S2A 获取分组列表异常: {e}") return [] def s2a_resolve_group_ids(silent: bool = False) -> List[int]: """解析分组 ID 列表 优先使用 S2A_GROUP_IDS (直接配置的 ID) 如果未配置,则通过 S2A_GROUP_NAMES 查询 API 获取对应的 ID Args: silent: 是否静默模式 (不输出日志) """ global _resolved_group_ids # 使用缓存 if _resolved_group_ids is not None: return _resolved_group_ids # 优先使用直接配置的 group_ids if S2A_GROUP_IDS: _resolved_group_ids = S2A_GROUP_IDS return _resolved_group_ids # 通过 group_names 查询获取 ID if not S2A_GROUP_NAMES: _resolved_group_ids = [] return _resolved_group_ids groups = s2a_get_groups() if not groups: if not silent: log.warning("S2A 无法获取分组列表,group_names 解析失败") _resolved_group_ids = [] return _resolved_group_ids # 构建 name -> id 映射 name_to_id = {g.get("name", "").lower(): g.get("id") for g in groups} resolved = [] not_found = [] for name in S2A_GROUP_NAMES: group_id = name_to_id.get(name.lower()) if group_id is not None: resolved.append(group_id) else: not_found.append(name) if not_found and not silent: log.warning(f"S2A 分组未找到: {', '.join(not_found)}") _resolved_group_ids = resolved return _resolved_group_ids def get_s2a_group_ids() -> List[int]: """获取当前配置的分组 ID 列表 (供外部调用)""" return s2a_resolve_group_ids() # ==================== 连接验证 ==================== def s2a_verify_connection() -> Tuple[bool, str]: """验证 S2A 服务连接和认证有效性 Returns: tuple: (is_valid, message) """ if not S2A_API_BASE: return False, "S2A_API_BASE 未配置" if not S2A_ADMIN_KEY and not S2A_ADMIN_TOKEN: return False, "S2A_ADMIN_KEY 或 S2A_ADMIN_TOKEN 未配置" auth_method, auth_preview = get_auth_method() headers = build_s2a_headers() try: # 使用 /admin/groups 接口验证连接 (支持 x-api-key 认证) response = http_session.get( f"{S2A_API_BASE}/admin/groups", headers=headers, params={"page": 1, "page_size": 1}, timeout=REQUEST_TIMEOUT ) if response.status_code == 200: result = response.json() if result.get("code") == 0: # 解析分组配置 group_ids = s2a_resolve_group_ids(silent=True) group_info = "" if S2A_GROUP_NAMES: group_info = f", 分组: {S2A_GROUP_NAMES} -> {group_ids}" elif S2A_GROUP_IDS: group_info = f", 分组 ID: {group_ids}" return True, f"认证有效 (方式: {auth_method}{group_info})" else: return False, f"API 返回失败: {result.get('message', 'Unknown error')}" elif response.status_code == 401: return False, f"{auth_method} 无效或已过期 (HTTP 401)" elif response.status_code == 403: return False, f"{auth_method} 权限不足 (HTTP 403)" else: return False, f"服务异常 (HTTP {response.status_code})" except requests.exceptions.Timeout: return False, f"服务连接超时 ({S2A_API_BASE})" except requests.exceptions.ConnectionError: return False, f"无法连接到服务 ({S2A_API_BASE})" except Exception as e: return False, f"验证异常: {str(e)}" # ==================== OAuth 授权 ==================== def s2a_generate_auth_url(proxy_id: Optional[int] = None) -> Tuple[Optional[str], Optional[str]]: """生成 OpenAI OAuth 授权 URL Returns: tuple: (auth_url, session_id) 或 (None, None) """ headers = build_s2a_headers() payload = {} if proxy_id is not None: payload["proxy_id"] = proxy_id try: response = http_session.post( f"{S2A_API_BASE}/admin/openai/generate-auth-url", headers=headers, json=payload, timeout=REQUEST_TIMEOUT ) if response.status_code == 200: result = response.json() if result.get("code") == 0: data = result.get("data", {}) auth_url = data.get("auth_url") session_id = data.get("session_id") if auth_url and session_id: log.success(f"生成 S2A 授权链接成功 (会话: {session_id[:16]}...)") return auth_url, session_id log.error(f"生成 S2A 授权链接失败: HTTP {response.status_code}") return None, None except Exception as e: log.error(f"S2A API 异常: {e}") return None, None def s2a_verify_account_in_pool(email: str, timeout: int = 10) -> Tuple[bool, Optional[Dict[str, Any]]]: """验证账号是否已成功入库到 S2A 账号池 通过请求 /admin/accounts 接口,检查第一个账号的 name 是否匹配邮箱 Args: email: 要验证的邮箱地址 timeout: 超时时间 (秒) Returns: tuple: (是否成功, 账号数据或None) """ headers = build_s2a_headers() try: # 使用 search 参数搜索该邮箱 params = { "page": 1, "page_size": 20, "platform": "", "type": "", "status": "", "search": email, "timezone": "Asia/Shanghai" } response = http_session.get( f"{S2A_API_BASE}/admin/accounts", headers=headers, params=params, timeout=timeout ) if response.status_code == 200: result = response.json() if result.get("code") == 0: data = result.get("data", {}) items = data.get("items", []) if items: # 检查第一个账号的 name 是否匹配 first_account = items[0] account_name = first_account.get("name", "") # 邮箱匹配检查 (忽略大小写) if email.lower() in account_name.lower() or account_name.lower() in email.lower(): return True, first_account return False, None else: log.warning(f"S2A 验证账号失败: {result.get('message', '未知错误')}") else: log.warning(f"S2A 验证账号失败: HTTP {response.status_code}") return False, None except Exception as e: log.warning(f"S2A 验证账号异常: {e}") return False, None def s2a_create_account_from_oauth( code: str, session_id: str, name: str = "", proxy_id: Optional[int] = None ) -> Optional[Dict[str, Any]]: """一步完成:用授权码换取 token 并创建账号 Args: code: 授权码 session_id: 会话 ID name: 账号名称 (可选) proxy_id: 代理 ID (可选) Returns: dict: 账号数据 或 None """ headers = build_s2a_headers() payload = { "session_id": session_id, "code": code, "concurrency": S2A_CONCURRENCY, "priority": S2A_PRIORITY, } # 获取完整邮箱用于后续验证 full_email = name if "@" in name else "" if name: payload["name"] = f"team-{name}" if proxy_id is not None: payload["proxy_id"] = proxy_id group_ids = get_s2a_group_ids() if group_ids: payload["group_ids"] = group_ids try: log.step("正在提交授权码到 S2A...") response = http_session.post( f"{S2A_API_BASE}/admin/openai/create-from-oauth", headers=headers, json=payload, timeout=REQUEST_TIMEOUT ) if response.status_code == 200: result = response.json() if result.get("code") == 0: account_data = result.get("data", {}) account_id = account_data.get("id") account_name = account_data.get("name") log.success(f"S2A 授权成功 (ID: {account_id}, 名称: {account_name})") # 验证账号是否成功入库 if full_email or account_name: verify_email = full_email or account_name log.step(f"正在验证账号入库状态...") verified, verified_data = s2a_verify_account_in_pool(verify_email) if verified: verified_id = verified_data.get("id", "") verified_name = verified_data.get("name", "") log.success(f"✅ 账号入库验证成功 (ID: {verified_id}, 名称: {verified_name})") else: log.warning(f"⚠️ 账号入库验证失败,但授权已成功") return account_data else: error_msg = result.get('message', '未知错误') log.error(f"S2A 账号创建失败: {error_msg}") else: log.error(f"S2A 账号创建失败: HTTP {response.status_code}") return None except Exception as e: log.error(f"S2A 创建账号异常: {e}") return None def s2a_add_account( name: str, token_info: Dict[str, Any], proxy_id: Optional[int] = None ) -> Optional[Dict[str, Any]]: """将账号添加到 S2A 账号池 Args: name: 账号名称 (通常是邮箱) token_info: Token 信息 (包含 access_token, refresh_token, expires_at) proxy_id: 代理 ID (可选) Returns: dict: 账号数据 或 None """ headers = build_s2a_headers() credentials = { "access_token": token_info.get("access_token"), "refresh_token": token_info.get("refresh_token"), "expires_at": token_info.get("expires_at"), } if token_info.get("id_token"): credentials["id_token"] = token_info.get("id_token") if token_info.get("email"): credentials["email"] = token_info.get("email") payload = { "name": f"team-{name}", "platform": "openai", "type": "oauth", "credentials": credentials, "concurrency": S2A_CONCURRENCY, "priority": S2A_PRIORITY, "auto_pause_on_expired": True, } if proxy_id is not None: payload["proxy_id"] = proxy_id group_ids = get_s2a_group_ids() if group_ids: payload["group_ids"] = group_ids try: response = http_session.post( f"{S2A_API_BASE}/admin/accounts", headers=headers, json=payload, timeout=REQUEST_TIMEOUT ) if response.status_code == 200: result = response.json() if result.get("code") == 0: account_data = result.get("data", {}) account_id = account_data.get("id") log.success(f"S2A 账号添加成功 (ID: {account_id}, Name: {name})") return account_data else: log.error(f"S2A 添加账号失败: {result.get('message', 'Unknown error')}") else: log.error(f"S2A 添加账号失败: HTTP {response.status_code}") return None except Exception as e: log.error(f"S2A 添加账号异常: {e}") return None # ==================== 账号管理 ==================== def s2a_get_accounts(platform: str = "openai") -> List[Dict[str, Any]]: """获取账号列表""" headers = build_s2a_headers() try: params = {"platform": platform} if platform else {} response = http_session.get( f"{S2A_API_BASE}/admin/accounts", headers=headers, params=params, timeout=REQUEST_TIMEOUT ) if response.status_code == 200: result = response.json() if result.get("code") == 0: data = result.get("data", {}) if isinstance(data, dict) and "items" in data: return data.get("items", []) elif isinstance(data, list): return data return [] except Exception as e: log.warning(f"S2A 获取账号列表异常: {e}") return [] def s2a_get_error_accounts( platform: str = "", page_size: int = 100, timezone: str = "Asia/Shanghai" ) -> Tuple[List[Dict[str, Any]], int]: """获取所有错误状态的账号(支持分页获取全部) Args: platform: 平台筛选 (默认为空,获取所有平台) page_size: 每页数量 timezone: 时区 Returns: tuple: (账号列表, 总数) """ if not S2A_API_BASE or (not S2A_ADMIN_KEY and not S2A_ADMIN_TOKEN): return [], 0 headers = build_s2a_headers() all_accounts = [] total_count = 0 page = 1 try: while True: params = { "page": page, "page_size": page_size, "platform": platform, "type": "", "status": "error", "search": "", "timezone": timezone } response = http_session.get( f"{S2A_API_BASE}/admin/accounts", headers=headers, params=params, timeout=REQUEST_TIMEOUT ) if response.status_code != 200: log.warning(f"S2A 获取错误账号失败: HTTP {response.status_code}") break result = response.json() if result.get("code") != 0: log.warning(f"S2A 获取错误账号失败: {result.get('message', 'Unknown error')}") break data = result.get("data", {}) items = data.get("items", []) total_count = data.get("total", 0) total_pages = data.get("pages", 1) all_accounts.extend(items) # 如果已获取所有页面,退出循环 if page >= total_pages or not items: break page += 1 return all_accounts, total_count except Exception as e: log.warning(f"S2A 获取错误账号异常: {e}") return [], 0 def s2a_delete_account(account_id: int) -> Tuple[bool, str]: """删除单个账号 Args: account_id: 账号 ID Returns: tuple: (是否成功, 消息) """ if not S2A_API_BASE or (not S2A_ADMIN_KEY and not S2A_ADMIN_TOKEN): return False, "S2A not configured" headers = build_s2a_headers() try: response = http_session.delete( f"{S2A_API_BASE}/admin/accounts/{account_id}", headers=headers, timeout=REQUEST_TIMEOUT ) if response.status_code == 200: result = response.json() if result.get("code") == 0: return True, "Deleted" else: return False, result.get("message", "Unknown error") else: return False, f"HTTP {response.status_code}" except Exception as e: return False, str(e) def s2a_batch_delete_error_accounts( progress_callback: Optional[callable] = None ) -> Dict[str, Any]: """批量删除所有错误状态的账号 Args: progress_callback: 进度回调函数 (current, total, account_name, success) Returns: dict: {"success": int, "failed": int, "total": int, "details": [...]} """ results = { "success": 0, "failed": 0, "total": 0, "details": [] } # 获取所有错误账号 error_accounts, total = s2a_get_error_accounts() results["total"] = total if not error_accounts: return results log.info(f"开始批量删除 {len(error_accounts)} 个错误账号...") for i, account in enumerate(error_accounts): account_id = account.get("id") account_name = account.get("name", "") error_message = account.get("error_message", "") if not account_id: results["failed"] += 1 results["details"].append({ "name": account_name, "status": "failed", "message": "Missing account ID" }) continue success, message = s2a_delete_account(account_id) if success: results["success"] += 1 results["details"].append({ "id": account_id, "name": account_name, "error": error_message[:50] if error_message else "", "status": "deleted" }) else: results["failed"] += 1 results["details"].append({ "id": account_id, "name": account_name, "status": "failed", "message": message }) if progress_callback: progress_callback(i + 1, len(error_accounts), account_name, success) log.success(f"批量删除完成: 成功 {results['success']}, 失败 {results['failed']}") return results def s2a_check_account_exists(email: str, platform: str = "openai") -> bool: """检查账号是否已存在""" accounts = s2a_get_accounts(platform) for account in accounts: account_name = account.get("name", "").lower() credentials = account.get("credentials", {}) account_email = credentials.get("email", "").lower() if account_name == email.lower() or account_email == email.lower(): return True return False # ==================== 工具函数 ==================== def extract_code_from_url(url: str) -> Optional[str]: """从回调 URL 中提取授权码""" if not url: return None try: parsed = urlparse(url) params = parse_qs(parsed.query) return params.get("code", [None])[0] except Exception as e: log.error(f"解析 URL 失败: {e}") return None def is_s2a_callback_url(url: str) -> bool: """检查 URL 是否为 S2A 回调 URL""" if not url: return False return "localhost:1455/auth/callback" in url and "code=" in url # ==================== 仪表盘统计 ==================== def s2a_get_dashboard_stats(timezone: str = "Asia/Shanghai") -> Optional[Dict[str, Any]]: """获取 S2A 仪表盘统计数据 Args: timezone: 时区 (默认 Asia/Shanghai) Returns: dict: 仪表盘数据 或 None """ if not S2A_API_BASE or (not S2A_ADMIN_KEY and not S2A_ADMIN_TOKEN): return None headers = build_s2a_headers() try: response = http_session.get( f"{S2A_API_BASE}/admin/dashboard/stats", headers=headers, params={"timezone": timezone}, timeout=REQUEST_TIMEOUT ) if response.status_code == 200: result = response.json() if result.get("code") == 0: return result.get("data", {}) else: log.warning(f"S2A 仪表盘获取失败: {result.get('message', 'Unknown error')}") else: log.warning(f"S2A 仪表盘获取失败: HTTP {response.status_code}") except Exception as e: log.warning(f"S2A 仪表盘获取异常: {e}") return None def format_dashboard_stats(stats: Dict[str, Any]) -> str: """格式化仪表盘统计数据为可读文本 Args: stats: 仪表盘原始数据 Returns: str: 格式化后的文本 """ if not stats: return "暂无数据" def fmt_num(n): """格式化数字 (添加千分位)""" if isinstance(n, float): return f"{n:,.2f}" return f"{n:,}" def fmt_tokens(n): """格式化 Token 数量 (简化显示)""" if n >= 1_000_000_000: return f"{n / 1_000_000_000:.2f}B" elif n >= 1_000_000: return f"{n / 1_000_000:.2f}M" elif n >= 1_000: return f"{n / 1_000:.1f}K" return str(n) # 账号状态 total_accounts = stats.get("total_accounts", 0) normal_accounts = stats.get("normal_accounts", 0) error_accounts = stats.get("error_accounts", 0) ratelimit_accounts = stats.get("ratelimit_accounts", 0) overload_accounts = stats.get("overload_accounts", 0) # 今日统计 today_requests = stats.get("today_requests", 0) today_tokens = stats.get("today_tokens", 0) today_cost = stats.get("today_cost", 0) today_input = stats.get("today_input_tokens", 0) today_output = stats.get("today_output_tokens", 0) today_cache_read = stats.get("today_cache_read_tokens", 0) # 总计 total_requests = stats.get("total_requests", 0) total_tokens = stats.get("total_tokens", 0) total_cost = stats.get("total_cost", 0) # 实时状态 rpm = stats.get("rpm", 0) tpm = stats.get("tpm", 0) active_users = stats.get("active_users", 0) avg_duration = stats.get("average_duration_ms", 0) lines = [ "📊 S2A 仪表盘", "", "📦 账号状态", f" 总计: {total_accounts} | 正常: {normal_accounts}", f" 异常: {error_accounts} | 限流: {ratelimit_accounts}", "", "📅 今日统计", f" 请求数: {fmt_num(today_requests)}", f" Token: {fmt_tokens(today_tokens)}", f" 输入: {fmt_tokens(today_input)} | 输出: {fmt_tokens(today_output)}", f" 缓存: {fmt_tokens(today_cache_read)}", f" 费用: ${fmt_num(today_cost)}", "", "📈 累计统计", f" 请求数: {fmt_num(total_requests)}", f" Token: {fmt_tokens(total_tokens)}", f" 费用: ${fmt_num(total_cost)}", "", "⚡ 实时状态", f" RPM: {rpm} | TPM: {fmt_num(tpm)}", f" 活跃用户: {active_users}", f" 平均延迟: {avg_duration:.0f}ms", ] return "\n".join(lines) # ==================== 批量导入账号 ==================== def s2a_import_account_with_token( email: str, access_token: str, password: str = "", proxy_id: Optional[int] = None ) -> Tuple[bool, str]: """使用 access_token 直接导入账号到 S2A Args: email: 账号邮箱 access_token: OpenAI access token (JWT) password: 账号密码 (可选,用于备注) proxy_id: 代理 ID (可选) Returns: tuple: (success, message) """ if not S2A_API_BASE or (not S2A_ADMIN_KEY and not S2A_ADMIN_TOKEN): return False, "S2A not configured" headers = build_s2a_headers() # 构建账号数据 credentials = { "access_token": access_token, } payload = { "name": email, "platform": "openai", "type": "access_token", "credentials": credentials, "concurrency": S2A_CONCURRENCY, "priority": S2A_PRIORITY, "auto_pause_on_expired": True, } if proxy_id is not None: payload["proxy_id"] = proxy_id group_ids = get_s2a_group_ids() if group_ids: payload["group_ids"] = group_ids try: response = http_session.post( f"{S2A_API_BASE}/admin/accounts", headers=headers, json=payload, timeout=REQUEST_TIMEOUT ) if response.status_code == 200: result = response.json() if result.get("code") == 0: account_data = result.get("data", {}) account_id = account_data.get("id") return True, f"ID: {account_id}" else: error_msg = result.get("message", "Unknown error") # 检查是否已存在 if "exist" in error_msg.lower() or "duplicate" in error_msg.lower(): return False, "Already exists" return False, error_msg else: return False, f"HTTP {response.status_code}" except Exception as e: return False, str(e) def s2a_batch_import_accounts( accounts: List[Dict[str, str]], progress_callback: Optional[callable] = None ) -> Dict[str, Any]: """批量导入账号到 S2A Args: accounts: 账号列表 [{"account": "email", "password": "pwd", "token": "jwt"}] progress_callback: 进度回调函数 (current, total, email, status) Returns: dict: {"success": int, "failed": int, "skipped": int, "results": [...]} """ results = { "success": 0, "failed": 0, "skipped": 0, "details": [] } total = len(accounts) for i, acc in enumerate(accounts): email = acc.get("account", "") token = acc.get("token", "") password = acc.get("password", "") if not email or not token: results["skipped"] += 1 results["details"].append({ "email": email or "unknown", "status": "skipped", "message": "Missing email or token" }) continue # 检查是否已存在 if s2a_check_account_exists(email): results["skipped"] += 1 results["details"].append({ "email": email, "status": "skipped", "message": "Already exists" }) if progress_callback: progress_callback(i + 1, total, email, "skipped") continue # 导入账号 success, message = s2a_import_account_with_token(email, token, password) if success: results["success"] += 1 results["details"].append({ "email": email, "status": "success", "message": message }) else: if "exist" in message.lower(): results["skipped"] += 1 results["details"].append({ "email": email, "status": "skipped", "message": message }) else: results["failed"] += 1 results["details"].append({ "email": email, "status": "failed", "message": message }) if progress_callback: status = "success" if success else ("skipped" if "exist" in message.lower() else "failed") progress_callback(i + 1, total, email, status) return results # ==================== API 密钥用量查询 ==================== def s2a_get_all_usage_stats( start_date: str, end_date: str, timezone: str = "Asia/Shanghai" ) -> Optional[Dict[str, Any]]: """获取所有密钥的汇总用量统计(不传 api_key_id) Returns: dict: 汇总统计数据 或 None """ if not S2A_API_BASE or (not S2A_ADMIN_KEY and not S2A_ADMIN_TOKEN): return None headers = build_s2a_headers() try: response = http_session.get( f"{S2A_API_BASE}/admin/usage/stats", headers=headers, params={ "start_date": start_date, "end_date": end_date, "timezone": timezone }, timeout=REQUEST_TIMEOUT ) if response.status_code == 200: result = response.json() if result.get("code") == 0: return result.get("data", {}) else: log.warning(f"S2A 获取汇总用量失败: HTTP {response.status_code}") except Exception as e: log.warning(f"S2A 获取汇总用量异常: {e}") return None def s2a_get_key_usage_stats( api_key_id: int, start_date: str, end_date: str, timezone: str = "Asia/Shanghai" ) -> Optional[Dict[str, Any]]: """获取单个 API 密钥的详细用量统计 Args: api_key_id: 密钥 ID start_date: 开始日期 (YYYY-MM-DD) end_date: 结束日期 (YYYY-MM-DD) timezone: 时区 Returns: dict: 用量统计数据 或 None """ if not S2A_API_BASE or (not S2A_ADMIN_KEY and not S2A_ADMIN_TOKEN): return None headers = build_s2a_headers() try: response = http_session.get( f"{S2A_API_BASE}/admin/usage/stats", headers=headers, params={ "api_key_id": api_key_id, "start_date": start_date, "end_date": end_date, "timezone": timezone }, timeout=REQUEST_TIMEOUT ) if response.status_code == 200: result = response.json() if result.get("code") == 0: return result.get("data", {}) else: log.warning(f"S2A 获取密钥用量失败: {result.get('message', 'Unknown error')}") else: log.warning(f"S2A 获取密钥用量失败: HTTP {response.status_code}") except Exception as e: log.warning(f"S2A 获取密钥用量异常: {e}") return None # 密钥列表配置 (从 /api/v1/keys 接口获取的数据) API_KEYS_LIST = [ {"id": 11, "name": "bohe", "key": "bohebohebohebohe", "group_name": "codex"}, {"id": 10, "name": "黑与白", "key": "oyqq114514oyqq114514", "group_name": "codex"}, {"id": 9, "name": "baozi", "key": "baozibaozibaozibaozi", "group_name": "codex"}, {"id": 7, "name": "Wong-灭鼠专家", "key": "Wong1234567891011", "group_name": "codex"}, {"id": 6, "name": "小马-莹佬", "key": "mazhichentaiqiangla", "group_name": "codex"}, {"id": 5, "name": "猫猫-byteBender", "key": "ByteBender114514", "group_name": "codex"}, {"id": 4, "name": "KFC", "key": "sk-babf3f6d15582b31e959311d6ceaa9448f2cf951dc23d02386d41f68046e6018", "group_name": "codex"}, {"id": 3, "name": "zhongruan", "key": "zhongruantaiqiangla", "group_name": "codex"}, ] def s2a_get_keys_with_usage(start_date: str = None, end_date: str = None, timezone: str = "Asia/Shanghai") -> Optional[List[Dict[str, Any]]]: """获取密钥列表并合并用量数据 Args: start_date: 开始日期 (YYYY-MM-DD),默认今日 end_date: 结束日期 (YYYY-MM-DD),默认今日 timezone: 时区 Returns: list: 包含用量信息的密钥列表 或 None """ from datetime import datetime # 默认今日 if not start_date: start_date = datetime.now().strftime("%Y-%m-%d") if not end_date: end_date = datetime.now().strftime("%Y-%m-%d") keys = [] for key_info in API_KEYS_LIST: key_id = key_info["id"] usage = s2a_get_key_usage_stats(key_id, start_date, end_date, timezone) keys.append({ "id": key_id, "name": key_info["name"], "key": key_info["key"], "group_name": key_info["group_name"], "status": "active", "usage": usage if usage else {} }) return keys if keys else None def format_keys_usage(keys: List[Dict[str, Any]], period_text: str = "今日") -> str: """格式化密钥用量为可读文本 Args: keys: 密钥列表 (包含 usage 字段) period_text: 时间段描述 Returns: str: 格式化后的文本 """ if not keys: return "暂无密钥数据" def fmt_cost(n): """格式化费用""" if n >= 1000: return f"${n:,.2f}" elif n >= 1: return f"${n:.2f}" return f"${n:.4f}" def fmt_tokens(n): """格式化 Token 数量""" if n >= 1_000_000_000: return f"{n / 1_000_000_000:.2f}B" elif n >= 1_000_000: return f"{n / 1_000_000:.2f}M" elif n >= 1_000: return f"{n / 1_000:.1f}K" return str(int(n)) def fmt_duration(ms): """格式化耗时""" if ms >= 1000: return f"{ms / 1000:.2f}s" return f"{ms:.0f}ms" # 按请求数排序(降序) sorted_keys = sorted(keys, key=lambda k: k.get("usage", {}).get("total_requests", 0), reverse=True) lines = [f"◈ API 密钥用量 ({period_text})", ""] total_requests = 0 total_tokens = 0 total_cost = 0 for i, key in enumerate(sorted_keys, 1): name = key.get("name", "未命名") status = key.get("status", "active") group_name = key.get("group_name", "默认") usage = key.get("usage", {}) requests = usage.get("total_requests", 0) tokens = usage.get("total_tokens", 0) input_tokens = usage.get("total_input_tokens", 0) output_tokens = usage.get("total_output_tokens", 0) cache_tokens = usage.get("total_cache_tokens", 0) cost = usage.get("total_actual_cost", 0) avg_duration = usage.get("average_duration_ms", 0) total_requests += requests total_tokens += tokens total_cost += cost # 排名序号 rank = f"#{i}" lines.append(f"{rank} {name}") lines.append(f" ▸ {requests:,} 请求 · {fmt_duration(avg_duration)}") lines.append(f" ▸ {fmt_tokens(tokens)} (↓{fmt_tokens(input_tokens)} ↑{fmt_tokens(output_tokens)})") if cache_tokens > 0: lines.append(f" ▸ 缓存 {fmt_tokens(cache_tokens)}") lines.append(f" ▸ {fmt_cost(cost)}") lines.append("") # 汇总 lines.append("────────────────────") lines.append(f"◇ {period_text}汇总") lines.append(f" 密钥: {len(keys)} 个") lines.append(f" 请求: {total_requests:,}") lines.append(f" Token: {fmt_tokens(total_tokens)}") lines.append(f" 费用: {fmt_cost(total_cost)}") return "\n".join(lines) # ==================== 批量 API 授权 ==================== def s2a_batch_api_authorize( accounts: List[Dict[str, str]], proxy: str = None, progress_callback: Optional[callable] = None ) -> Dict[str, Any]: """批量使用 API 模式授权账号到 S2A 无需浏览器,直接通过 OpenAI 认证 API 完成授权。 Args: accounts: 账号列表 [{"email": "xxx", "password": "xxx"}, ...] proxy: 代理地址 (可选) progress_callback: 进度回调函数 (current, total, email, status, message) Returns: dict: { "success": int, "failed": int, "total": int, "details": [{"email": "xxx", "status": "success/failed", "message": "xxx"}, ...] } """ results = { "success": 0, "failed": 0, "total": len(accounts), "details": [] } if not S2A_API_BASE or (not S2A_ADMIN_KEY and not S2A_ADMIN_TOKEN): log.error("S2A 未配置") return results # 使用配置的代理 if not proxy: proxy_config = get_next_proxy() if proxy_config: proxy = format_proxy_url(proxy_config) log.info(f"开始批量 API 授权: {len(accounts)} 个账号") for i, acc in enumerate(accounts): email = acc.get("email", "") password = acc.get("password", "") if not email or not password: results["failed"] += 1 results["details"].append({ "email": email or "unknown", "status": "failed", "message": "缺少邮箱或密码" }) if progress_callback: progress_callback(i + 1, len(accounts), email, "failed", "缺少邮箱或密码") continue try: success, result = s2a_api_authorize(email, password, proxy) if success: results["success"] += 1 account_id = result.get("id", "") if result else "" results["details"].append({ "email": email, "status": "success", "message": f"ID: {account_id}" }) if progress_callback: progress_callback(i + 1, len(accounts), email, "success", f"ID: {account_id}") else: results["failed"] += 1 results["details"].append({ "email": email, "status": "failed", "message": "授权失败" }) if progress_callback: progress_callback(i + 1, len(accounts), email, "failed", "授权失败") except Exception as e: results["failed"] += 1 results["details"].append({ "email": email, "status": "failed", "message": str(e) }) if progress_callback: progress_callback(i + 1, len(accounts), email, "failed", str(e)) log.success(f"批量授权完成: 成功 {results['success']}, 失败 {results['failed']}") return results def s2a_api_authorize_single( email: str, password: str, proxy: str = None ) -> Tuple[bool, str]: """单个账号 API 授权 (简化返回值) Args: email: 账号邮箱 password: 账号密码 proxy: 代理地址 (可选) Returns: tuple: (是否成功, 消息) """ success, result = s2a_api_authorize(email, password, proxy) if success: account_id = result.get("id", "") if result else "" return True, f"授权成功 (ID: {account_id})" else: return False, "授权失败"