From a7867ae4062086827d80ac63e1680826b393ce2f Mon Sep 17 00:00:00 2001 From: kyx236 Date: Mon, 2 Feb 2026 09:26:57 +0800 Subject: [PATCH] feat(s2a_service): Add pure API authorization mode without browser - Add S2A_API_MODE configuration option to enable browser-less authorization - Implement S2AApiAuthorizer class using curl_cffi for browser fingerprint simulation - Add Sentinel PoW (Proof of Work) solver with FNV-1a hashing algorithm - Implement OAuth flow via direct API calls instead of browser automation - Add s2a_api_authorize() function to handle email/password authentication - Support proxy configuration for API requests - Add requirements token generation for API authentication - Update browser_automation.py to check S2A_API_MODE and route to API or browser flow - Update config.py to load S2A_API_MODE from configuration - Add api_mode option to config.toml.example with documentation - Improves performance and stability by eliminating browser overhead while maintaining compatibility with existing browser-based flow --- browser_automation.py | 15 +- config.py | 4 +- config.toml.example | 5 + s2a_service.py | 476 ++++++++++++++++++++++++++++++++++++++++++ telegram_bot.py | 93 ++++++++- 5 files changed, 588 insertions(+), 5 deletions(-) diff --git a/browser_automation.py b/browser_automation.py index 8da40fb..9dae842 100644 --- a/browser_automation.py +++ b/browser_automation.py @@ -17,6 +17,7 @@ from config import ( BROWSER_HEADLESS, BROWSER_RANDOM_FINGERPRINT, AUTH_PROVIDER, + S2A_API_MODE, get_random_name, get_random_birthday, get_random_fingerprint @@ -31,7 +32,8 @@ from cpa_service import ( ) from s2a_service import ( s2a_generate_auth_url, - s2a_create_account_from_oauth + s2a_create_account_from_oauth, + s2a_api_authorize, ) from logger import log @@ -2939,14 +2941,21 @@ def perform_s2a_authorization(page, email: str, password: str) -> bool: """执行 S2A 授权流程 (密码登录) Args: - page: 浏览器实例 + page: 浏览器实例 (API 模式下可为 None) email: 邮箱地址 password: 密码 Returns: bool: 授权是否成功 """ - log.info(f"开始 S2A 授权: {email}", icon="code") + # 检查是否使用 API 模式 + if S2A_API_MODE: + log.info(f"使用 S2A API 模式授权: {email}", icon="code") + success, result = s2a_api_authorize(email, password) + return success + + # 以下是浏览器模式 + log.info(f"开始 S2A 浏览器授权: {email}", icon="code") progress_update(phase="授权", step="开始 S2A 授权...") # 注意: 授权回调锁已在上层函数 (register_and_authorize/authorize_only) 中获取 diff --git a/config.py b/config.py index 60c8ae2..2e2240e 100644 --- a/config.py +++ b/config.py @@ -312,7 +312,7 @@ def reload_config() -> dict: global GPTMAIL_API_KEYS, GPTMAIL_DOMAINS, GPTMAIL_PREFIX global PROXY_ENABLED, PROXIES global S2A_API_BASE, S2A_ADMIN_KEY, S2A_ADMIN_TOKEN - global S2A_CONCURRENCY, S2A_PRIORITY, S2A_GROUP_NAMES, S2A_GROUP_IDS + global S2A_CONCURRENCY, S2A_PRIORITY, S2A_GROUP_NAMES, S2A_GROUP_IDS, S2A_API_MODE global CONCURRENT_ENABLED, CONCURRENT_WORKERS result = { @@ -387,6 +387,7 @@ def reload_config() -> dict: S2A_PRIORITY = _s2a.get("priority", 50) S2A_GROUP_NAMES = _s2a.get("group_names", []) S2A_GROUP_IDS = _s2a.get("group_ids", []) + S2A_API_MODE = _s2a.get("api_mode", False) except Exception as e: errors.append(f"config.toml: {e}") @@ -619,6 +620,7 @@ S2A_CONCURRENCY = _s2a.get("concurrency", 10) S2A_PRIORITY = _s2a.get("priority", 50) S2A_GROUP_NAMES = _s2a.get("group_names", []) S2A_GROUP_IDS = _s2a.get("group_ids", []) +S2A_API_MODE = _s2a.get("api_mode", False) # 是否使用纯 API 授权模式 (无需浏览器) # 账号 _account = _cfg.get("account", {}) diff --git a/config.toml.example b/config.toml.example index b0b5974..e4c7ce0 100644 --- a/config.toml.example +++ b/config.toml.example @@ -154,6 +154,11 @@ priority = 50 group_ids = [] # 分组名称列表 (优先使用 group_ids,如果未配置则通过名称查询 ID) group_names = [] +# 是否使用纯 API 授权模式 (无需浏览器) +# 开启后使用 curl_cffi 直接调用 OpenAI 认证 API 完成授权 +# 优点: 更快、更稳定、无需浏览器 +# 缺点: 需要安装 curl_cffi (pip install curl_cffi) +api_mode = false # ==================== 账号配置 ==================== [account] diff --git a/s2a_service.py b/s2a_service.py index 8a5df71..fcda13c 100644 --- a/s2a_service.py +++ b/s2a_service.py @@ -6,12 +6,24 @@ # - 会话标识: S2A 使用 session_id # - 授权流程: S2A 生成授权 URL -> 用户授权 -> 提交 code 换取 token -> 创建账号 # - 账号入库: S2A 可一步完成 (create-from-oauth) 或分步完成 (exchange + add_account) +# +# 新增: 纯 API 授权模式 (无需浏览器) +# - 使用 curl_cffi 模拟浏览器指纹 +# - 支持 Sentinel PoW 验证 +# - 直接通过 API 完成 OAuth 流程 import requests from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry from urllib.parse import urlparse, parse_qs from typing import Optional, Tuple, Dict, List, Any +import json +import uuid +import time +import random +import base64 +import hashlib +from datetime import datetime, timedelta, timezone from config import ( S2A_API_BASE, @@ -23,6 +35,8 @@ from config import ( S2A_GROUP_NAMES, REQUEST_TIMEOUT, USER_AGENT, + get_next_proxy, + format_proxy_url, ) from logger import log @@ -49,6 +63,349 @@ def create_session_with_retry() -> requests.Session: http_session = create_session_with_retry() +# ==================== PoW Solver (Sentinel 验证) ==================== + +def _fnv1a_32(data: bytes) -> int: + """FNV-1a 32-bit hash""" + h = 2166136261 + for byte in data: + h ^= byte + h = (h * 16777619) & 0xFFFFFFFF + h ^= (h >> 16) + h = (h * 2246822507) & 0xFFFFFFFF + h ^= (h >> 13) + h = (h * 3266489909) & 0xFFFFFFFF + h ^= (h >> 16) + return h + + +def _get_parse_time() -> str: + """生成 JS Date().toString() 格式的时间戳""" + now = datetime.now(timezone(timedelta(hours=8))) + return now.strftime("%a %b %d %Y %H:%M:%S") + " GMT+0800 (中国标准时间)" + + +def _get_pow_config(user_agent: str, sid: str = None) -> list: + """生成 PoW 配置数组""" + if not sid: + sid = str(uuid.uuid4()) + return [ + random.randint(2500, 3500), + _get_parse_time(), + 4294967296, + 0, + user_agent, + "chrome-extension://pgojnojmmhpofjgdmaebadhbocahppod/assets/aW5qZWN0X2hhc2g/aW5qZ", + None, + "zh-CN", + "zh-CN", + 0, + f"canShare−function canShare() {{ [native code] }}", + f"_reactListening{random.randint(1000000, 9999999)}", + "onhashchange", + time.perf_counter() * 1000, + sid, + "", + 24, + int(time.time() * 1000 - random.randint(10000, 50000)) + ] + + +def _solve_pow(seed: str, difficulty: str, config: list, max_iterations: int = 5000000) -> Optional[str]: + """CPU 求解 PoW""" + start_time = time.perf_counter() + seed_bytes = seed.encode() + + for iteration in range(max_iterations): + config[3] = iteration + config[9] = 0 + + json_str = json.dumps(config, separators=(',', ':')) + encoded = base64.b64encode(json_str.encode()) + + h = _fnv1a_32(seed_bytes + encoded) + hex_hash = f"{h:08x}" + + if hex_hash[:len(difficulty)] <= difficulty: + elapsed = time.perf_counter() - start_time + log.debug(f"[PoW] 求解完成: {elapsed:.2f}s (迭代 {iteration:,}, 难度={difficulty})") + return f"{encoded.decode()}~S" + + return None + + +def _get_requirements_token(user_agent: str, sid: str = None) -> str: + """生成 requirements token""" + if not sid: + sid = str(uuid.uuid4()) + config = _get_pow_config(user_agent, sid) + config[3] = 0 + config[9] = 0 + json_str = json.dumps(config, separators=(',', ':')) + encoded = base64.b64encode(json_str.encode()).decode() + return f"gAAAAAC{encoded}~S" + + +# ==================== S2A API 授权器 ==================== + +class S2AApiAuthorizer: + """S2A 纯 API 授权器 - 无需浏览器""" + + def __init__(self, email: str, password: str, proxy: str = None): + self.email = email + self.password = password + + # 尝试导入 curl_cffi,如果失败则使用 requests + try: + from curl_cffi import requests as cffi_requests + self.session = cffi_requests.Session(impersonate="chrome110") + self._use_cffi = True + except ImportError: + log.warning("curl_cffi 未安装,使用 requests (可能被检测)") + self.session = requests.Session() + self._use_cffi = False + + if proxy: + self.session.proxies = {"http": proxy, "https": proxy} + + self.ua = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36" + self.sid = str(uuid.uuid4()) + self.device_id = str(uuid.uuid4()) + self.sentinel_token = None + self.solved_pow = None + + self.session.headers.update({ + "User-Agent": self.ua, + "Accept": "*/*", + "Accept-Language": "en-US,en;q=0.9", + "sec-ch-ua": '"Not(A:Brand";v="8", "Chromium";v="144", "Microsoft Edge";v="144"', + "sec-ch-ua-mobile": "?0", + "sec-ch-ua-platform": '"Windows"', + "sec-fetch-dest": "empty", + "sec-fetch-mode": "cors", + "sec-fetch-site": "same-origin", + }) + + def _call_sentinel_req(self, flow: str) -> Optional[dict]: + """调用 sentinel 获取 token 和处理 PoW""" + init_token = _get_requirements_token(self.ua, self.sid) + payload = {"p": init_token, "id": self.device_id, "flow": flow} + + try: + resp = self.session.post( + "https://sentinel.openai.com/backend-api/sentinel/req", + json=payload, + timeout=15 + ) + if resp.status_code != 200: + log.warning(f"Sentinel 请求失败: {resp.status_code}") + return None + + data = resp.json() + self.sentinel_token = data.get('token') + pow_req = data.get('proofofwork', {}) + + if pow_req.get('required'): + seed = pow_req.get('seed', '') + difficulty = pow_req.get('difficulty', '') + config = _get_pow_config(self.ua, self.sid) + solved = _solve_pow(seed, difficulty, config) + if solved: + self.solved_pow = f"gAAAAAB{solved}" + else: + log.error("PoW 求解失败") + return None + else: + self.solved_pow = init_token + return data + except Exception as e: + log.error(f"Sentinel 异常: {e}") + return None + + def _get_sentinel_header(self, header_flow: str) -> str: + """构建 sentinel header""" + sentinel_obj = {"p": self.solved_pow, "id": self.device_id, "flow": header_flow} + if self.sentinel_token: + sentinel_obj["c"] = self.sentinel_token + return json.dumps(sentinel_obj) + + def get_authorization_code(self, auth_url: str) -> Optional[str]: + """执行 OAuth 流程,返回 authorization code + + Args: + auth_url: S2A 生成的授权 URL + + Returns: + str: 授权码 或 None + """ + log.step("开始 API 授权流程...") + + headers = { + "Origin": "https://auth.openai.com", + "Referer": "https://auth.openai.com/log-in", + "Content-Type": "application/json" + } + + try: + # 1. 访问授权端点 + log.step("访问授权端点...") + resp = self.session.get(auth_url, allow_redirects=True) + headers["Referer"] = resp.url + + # 2. 提交邮箱 + log.step("提交邮箱...") + if not self._call_sentinel_req("login_web_init"): + return None + + auth_headers = headers.copy() + auth_headers["OpenAI-Sentinel-Token"] = self._get_sentinel_header("authorize_continue") + resp = self.session.post( + "https://auth.openai.com/api/accounts/authorize/continue", + json={"username": {"kind": "email", "value": self.email}}, + headers=auth_headers + ) + if resp.status_code != 200: + log.error(f"邮箱提交失败: {resp.status_code} - {resp.text[:200]}") + return None + + data = resp.json() + page_type = data.get("page", {}).get("type", "") + + # 3. 验证密码 + if page_type == "password" or "password" in str(data): + log.step("验证密码...") + if not self._call_sentinel_req("authorize_continue__auto"): + return None + + verify_headers = headers.copy() + verify_headers["OpenAI-Sentinel-Token"] = self._get_sentinel_header("password_verify") + resp = self.session.post( + "https://auth.openai.com/api/accounts/password/verify", + json={"username": self.email, "password": self.password}, + headers=verify_headers + ) + if resp.status_code != 200: + log.error(f"密码验证失败: {resp.status_code} - {resp.text[:200]}") + return None + + # 4. 获取 continue_url (无需选择 workspace,S2A 授权链接已包含) + data = resp.json() + continue_url = data.get("continue_url") + + # 如果没有 continue_url,可能需要额外的 sentinel 调用 + if not continue_url: + log.step("获取重定向 URL...") + if not self._call_sentinel_req("password_verify__auto"): + return None + + # 尝试再次获取 + resp = self.session.post( + "https://auth.openai.com/api/accounts/authorize/continue", + json={}, + headers=auth_headers + ) + if resp.status_code == 200: + data = resp.json() + continue_url = data.get("continue_url") + + if not continue_url: + log.error(f"无法获取 continue_url: {data}") + return None + + # 5. 跟踪重定向直到获取 code + log.step("跟踪重定向...") + for _ in range(10): + resp = self.session.get(continue_url, allow_redirects=False) + if resp.status_code in (301, 302, 303, 307, 308): + location = resp.headers.get('Location', '') + if "localhost:1455" in location: + parsed = urlparse(location) + query = parse_qs(parsed.query) + code = query.get('code', [None])[0] + if code: + log.success("成功获取授权码") + return code + continue_url = location + else: + break + + log.error("无法获取授权码") + return None + + except Exception as e: + log.error(f"API 授权异常: {e}") + import traceback + traceback.print_exc() + return None + + +def s2a_api_authorize( + email: str, + password: str, + proxy: str = None +) -> Tuple[bool, Optional[Dict[str, Any]]]: + """S2A 纯 API 授权 (无需浏览器) + + 使用 OpenAI 认证 API 直接完成授权流程,无需浏览器自动化。 + + Args: + email: 账号邮箱 + password: 账号密码 + proxy: 代理地址 (可选,格式: http://host:port 或 socks5://user:pass@host:port) + + Returns: + tuple: (是否成功, 账号数据或None) + """ + if not S2A_API_BASE or (not S2A_ADMIN_KEY and not S2A_ADMIN_TOKEN): + log.error("S2A 未配置") + return False, None + + # 使用配置的代理 + if not proxy: + proxy_config = get_next_proxy() + if proxy_config: + proxy = format_proxy_url(proxy_config) + + log.info(f"开始 S2A API 授权: {email}", icon="code") + if proxy: + log.debug(f"使用代理: {proxy[:30]}...") + + try: + # 1. 生成授权 URL + auth_url, session_id = s2a_generate_auth_url() + if not auth_url or not session_id: + log.error("无法获取 S2A 授权 URL") + return False, None + + log.debug(f"授权 URL: {auth_url[:80]}...") + log.debug(f"Session ID: {session_id[:16]}...") + + # 2. 使用 API 授权器获取 code + authorizer = S2AApiAuthorizer(email, password, proxy) + code = authorizer.get_authorization_code(auth_url) + + if not code: + log.error("API 授权失败,无法获取授权码") + return False, None + + log.debug(f"授权码: {code[:20]}...") + + # 3. 提交授权码创建账号 + log.step("提交授权码到 S2A...") + result = s2a_create_account_from_oauth(code, session_id, name=email) + + if result: + log.success(f"S2A API 授权成功: {email}") + return True, result + else: + log.error("S2A 账号入库失败") + return False, None + + except Exception as e: + log.error(f"S2A API 授权异常: {e}") + return False, None + + def build_s2a_headers() -> Dict[str, str]: """构建 S2A API 请求的 Headers @@ -1211,3 +1568,122 @@ def format_keys_usage(keys: List[Dict[str, Any]], period_text: str = "今日") - lines.append(f" 费用: {fmt_cost(total_cost)}") return "\n".join(lines) + + +# ==================== 批量 API 授权 ==================== + +def s2a_batch_api_authorize( + accounts: List[Dict[str, str]], + proxy: str = None, + progress_callback: Optional[callable] = None +) -> Dict[str, Any]: + """批量使用 API 模式授权账号到 S2A + + 无需浏览器,直接通过 OpenAI 认证 API 完成授权。 + + Args: + accounts: 账号列表 [{"email": "xxx", "password": "xxx"}, ...] + proxy: 代理地址 (可选) + progress_callback: 进度回调函数 (current, total, email, status, message) + + Returns: + dict: { + "success": int, + "failed": int, + "total": int, + "details": [{"email": "xxx", "status": "success/failed", "message": "xxx"}, ...] + } + """ + results = { + "success": 0, + "failed": 0, + "total": len(accounts), + "details": [] + } + + if not S2A_API_BASE or (not S2A_ADMIN_KEY and not S2A_ADMIN_TOKEN): + log.error("S2A 未配置") + return results + + # 使用配置的代理 + if not proxy: + proxy_config = get_next_proxy() + if proxy_config: + proxy = format_proxy_url(proxy_config) + + log.info(f"开始批量 API 授权: {len(accounts)} 个账号") + + for i, acc in enumerate(accounts): + email = acc.get("email", "") + password = acc.get("password", "") + + if not email or not password: + results["failed"] += 1 + results["details"].append({ + "email": email or "unknown", + "status": "failed", + "message": "缺少邮箱或密码" + }) + if progress_callback: + progress_callback(i + 1, len(accounts), email, "failed", "缺少邮箱或密码") + continue + + try: + success, result = s2a_api_authorize(email, password, proxy) + + if success: + results["success"] += 1 + account_id = result.get("id", "") if result else "" + results["details"].append({ + "email": email, + "status": "success", + "message": f"ID: {account_id}" + }) + if progress_callback: + progress_callback(i + 1, len(accounts), email, "success", f"ID: {account_id}") + else: + results["failed"] += 1 + results["details"].append({ + "email": email, + "status": "failed", + "message": "授权失败" + }) + if progress_callback: + progress_callback(i + 1, len(accounts), email, "failed", "授权失败") + + except Exception as e: + results["failed"] += 1 + results["details"].append({ + "email": email, + "status": "failed", + "message": str(e) + }) + if progress_callback: + progress_callback(i + 1, len(accounts), email, "failed", str(e)) + + log.success(f"批量授权完成: 成功 {results['success']}, 失败 {results['failed']}") + return results + + +def s2a_api_authorize_single( + email: str, + password: str, + proxy: str = None +) -> Tuple[bool, str]: + """单个账号 API 授权 (简化返回值) + + Args: + email: 账号邮箱 + password: 账号密码 + proxy: 代理地址 (可选) + + Returns: + tuple: (是否成功, 消息) + """ + success, result = s2a_api_authorize(email, password, proxy) + + if success: + account_id = result.get("id", "") if result else "" + return True, f"授权成功 (ID: {account_id})" + else: + return False, "授权失败" diff --git a/telegram_bot.py b/telegram_bot.py index 7bac25a..77a5b5a 100644 --- a/telegram_bot.py +++ b/telegram_bot.py @@ -48,6 +48,7 @@ from config import ( S2A_GROUP_NAMES, S2A_GROUP_IDS, S2A_ADMIN_KEY, + S2A_API_MODE, BROWSER_RANDOM_FINGERPRINT, batch_remove_teams_by_names, ) @@ -55,7 +56,8 @@ from utils import load_team_tracker, get_all_incomplete_accounts, save_team_trac from bot_notifier import BotNotifier, set_notifier, progress_finish from s2a_service import ( s2a_get_dashboard_stats, format_dashboard_stats, s2a_get_keys_with_usage, format_keys_usage, - s2a_get_error_accounts, s2a_delete_account, s2a_batch_delete_error_accounts + s2a_get_error_accounts, s2a_delete_account, s2a_batch_delete_error_accounts, + s2a_api_authorize_single, s2a_batch_api_authorize ) from email_service import gptmail_service, unified_create_email from logger import log @@ -159,6 +161,7 @@ class ProvisionerBot: ("include_owners", self.cmd_include_owners), ("reload", self.cmd_reload), ("s2a_config", self.cmd_s2a_config), + ("api_auth", self.cmd_api_auth), ("clean", self.cmd_clean), ("clean_errors", self.cmd_clean_errors), ("clean_teams", self.cmd_clean_teams), @@ -287,6 +290,7 @@ class ProvisionerBot: BotCommand("keys_usage", "查看 API 密钥用量"), BotCommand("stock", "查看账号库存"), BotCommand("s2a_config", "配置 S2A 参数"), + BotCommand("api_auth", "API 模式授权账号"), BotCommand("import", "导入账号到 team.json"), BotCommand("verify", "验证未验证的账号"), BotCommand("verify_all", "强制重新验证所有账号"), @@ -355,6 +359,7 @@ class ProvisionerBot: /keys_usage - 查看 API 密钥用量 /stock - 查看账号库存 /s2a_config - 配置 S2A 参数 +/api_auth - API 模式授权 (无需浏览器) /clean_errors - 清理错误状态账号 🧹 清理管理: @@ -1072,6 +1077,92 @@ class ProvisionerBot: except Exception as e: await update.message.reply_text(f"❌ 修改配置失败: {e}") + @admin_only + async def cmd_api_auth(self, update: Update, context: ContextTypes.DEFAULT_TYPE): + """使用纯 API 模式授权账号到 S2A (无需浏览器) + + 用法: + /api_auth email password - 授权单个账号 + /api_auth - 显示帮助信息 + """ + if AUTH_PROVIDER != "s2a": + await update.message.reply_text( + "❌ 此命令仅在 S2A 模式下可用\n" + "当前授权服务: " + AUTH_PROVIDER + ) + return + + # 无参数时显示帮助 + if not context.args: + api_mode_status = "✅ 已启用" if S2A_API_MODE else "❌ 未启用" + lines = [ + "🔐 S2A API 授权", + "", + f"API 模式: {api_mode_status}", + "", + "用法:", + "/api_auth email password", + "", + "示例:", + "/api_auth test@example.com MyPassword123", + "", + "说明:", + "• 使用纯 API 方式完成 OAuth 授权", + "• 无需浏览器,更快更稳定", + "• 需要安装 curl_cffi: pip install curl_cffi", + "", + "💡 提示:", + "• 在 config.toml 中设置 s2a.api_mode = true", + "• 可让所有 S2A 授权自动使用 API 模式", + ] + await update.message.reply_text("\n".join(lines), parse_mode="HTML") + return + + # 解析参数 + if len(context.args) < 2: + await update.message.reply_text( + "❌ 参数不足\n" + "用法: /api_auth " + ) + return + + email = context.args[0] + password = " ".join(context.args[1:]) # 密码可能包含空格 + + # 发送处理中消息 + msg = await update.message.reply_text( + f"🔄 正在授权: {email}\n" + "请稍候...", + parse_mode="HTML" + ) + + try: + # 执行 API 授权 + success, message = s2a_api_authorize_single(email, password) + + if success: + await msg.edit_text( + f"✅ 授权成功\n\n" + f"📧 邮箱: {email}\n" + f"📝 {message}", + parse_mode="HTML" + ) + else: + await msg.edit_text( + f"❌ 授权失败\n\n" + f"📧 邮箱: {email}\n" + f"📝 {message}", + parse_mode="HTML" + ) + + except Exception as e: + await msg.edit_text( + f"❌ 授权异常\n\n" + f"📧 邮箱: {email}\n" + f"📝 错误: {str(e)}", + parse_mode="HTML" + ) + @admin_only async def cmd_run(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """启动处理指定数量的 Team - 交互式选择"""