feat(s2a_service): Add pure API authorization mode without browser

- Add S2A_API_MODE configuration option to enable browser-less authorization
- Implement S2AApiAuthorizer class using curl_cffi for browser fingerprint simulation
- Add Sentinel PoW (Proof of Work) solver with FNV-1a hashing algorithm
- Implement OAuth flow via direct API calls instead of browser automation
- Add s2a_api_authorize() function to handle email/password authentication
- Support proxy configuration for API requests
- Add requirements token generation for API authentication
- Update browser_automation.py to check S2A_API_MODE and route to API or browser flow
- Update config.py to load S2A_API_MODE from configuration
- Add api_mode option to config.toml.example with documentation
- Improves performance and stability by eliminating browser overhead while maintaining compatibility with existing browser-based flow
This commit is contained in:
2026-02-02 09:26:57 +08:00
parent ae86ca42df
commit a7867ae406
5 changed files with 588 additions and 5 deletions

View File

@@ -17,6 +17,7 @@ from config import (
BROWSER_HEADLESS, BROWSER_HEADLESS,
BROWSER_RANDOM_FINGERPRINT, BROWSER_RANDOM_FINGERPRINT,
AUTH_PROVIDER, AUTH_PROVIDER,
S2A_API_MODE,
get_random_name, get_random_name,
get_random_birthday, get_random_birthday,
get_random_fingerprint get_random_fingerprint
@@ -31,7 +32,8 @@ from cpa_service import (
) )
from s2a_service import ( from s2a_service import (
s2a_generate_auth_url, s2a_generate_auth_url,
s2a_create_account_from_oauth s2a_create_account_from_oauth,
s2a_api_authorize,
) )
from logger import log from logger import log
@@ -2939,14 +2941,21 @@ def perform_s2a_authorization(page, email: str, password: str) -> bool:
"""执行 S2A 授权流程 (密码登录) """执行 S2A 授权流程 (密码登录)
Args: Args:
page: 浏览器实例 page: 浏览器实例 (API 模式下可为 None)
email: 邮箱地址 email: 邮箱地址
password: 密码 password: 密码
Returns: Returns:
bool: 授权是否成功 bool: 授权是否成功
""" """
log.info(f"开始 S2A 授权: {email}", icon="code") # 检查是否使用 API 模式
if S2A_API_MODE:
log.info(f"使用 S2A API 模式授权: {email}", icon="code")
success, result = s2a_api_authorize(email, password)
return success
# 以下是浏览器模式
log.info(f"开始 S2A 浏览器授权: {email}", icon="code")
progress_update(phase="授权", step="开始 S2A 授权...") progress_update(phase="授权", step="开始 S2A 授权...")
# 注意: 授权回调锁已在上层函数 (register_and_authorize/authorize_only) 中获取 # 注意: 授权回调锁已在上层函数 (register_and_authorize/authorize_only) 中获取

View File

@@ -312,7 +312,7 @@ def reload_config() -> dict:
global GPTMAIL_API_KEYS, GPTMAIL_DOMAINS, GPTMAIL_PREFIX global GPTMAIL_API_KEYS, GPTMAIL_DOMAINS, GPTMAIL_PREFIX
global PROXY_ENABLED, PROXIES global PROXY_ENABLED, PROXIES
global S2A_API_BASE, S2A_ADMIN_KEY, S2A_ADMIN_TOKEN global S2A_API_BASE, S2A_ADMIN_KEY, S2A_ADMIN_TOKEN
global S2A_CONCURRENCY, S2A_PRIORITY, S2A_GROUP_NAMES, S2A_GROUP_IDS global S2A_CONCURRENCY, S2A_PRIORITY, S2A_GROUP_NAMES, S2A_GROUP_IDS, S2A_API_MODE
global CONCURRENT_ENABLED, CONCURRENT_WORKERS global CONCURRENT_ENABLED, CONCURRENT_WORKERS
result = { result = {
@@ -387,6 +387,7 @@ def reload_config() -> dict:
S2A_PRIORITY = _s2a.get("priority", 50) S2A_PRIORITY = _s2a.get("priority", 50)
S2A_GROUP_NAMES = _s2a.get("group_names", []) S2A_GROUP_NAMES = _s2a.get("group_names", [])
S2A_GROUP_IDS = _s2a.get("group_ids", []) S2A_GROUP_IDS = _s2a.get("group_ids", [])
S2A_API_MODE = _s2a.get("api_mode", False)
except Exception as e: except Exception as e:
errors.append(f"config.toml: {e}") errors.append(f"config.toml: {e}")
@@ -619,6 +620,7 @@ S2A_CONCURRENCY = _s2a.get("concurrency", 10)
S2A_PRIORITY = _s2a.get("priority", 50) S2A_PRIORITY = _s2a.get("priority", 50)
S2A_GROUP_NAMES = _s2a.get("group_names", []) S2A_GROUP_NAMES = _s2a.get("group_names", [])
S2A_GROUP_IDS = _s2a.get("group_ids", []) S2A_GROUP_IDS = _s2a.get("group_ids", [])
S2A_API_MODE = _s2a.get("api_mode", False) # 是否使用纯 API 授权模式 (无需浏览器)
# 账号 # 账号
_account = _cfg.get("account", {}) _account = _cfg.get("account", {})

View File

@@ -154,6 +154,11 @@ priority = 50
group_ids = [] group_ids = []
# 分组名称列表 (优先使用 group_ids如果未配置则通过名称查询 ID) # 分组名称列表 (优先使用 group_ids如果未配置则通过名称查询 ID)
group_names = [] group_names = []
# 是否使用纯 API 授权模式 (无需浏览器)
# 开启后使用 curl_cffi 直接调用 OpenAI 认证 API 完成授权
# 优点: 更快、更稳定、无需浏览器
# 缺点: 需要安装 curl_cffi (pip install curl_cffi)
api_mode = false
# ==================== 账号配置 ==================== # ==================== 账号配置 ====================
[account] [account]

View File

@@ -6,12 +6,24 @@
# - 会话标识: S2A 使用 session_id # - 会话标识: S2A 使用 session_id
# - 授权流程: S2A 生成授权 URL -> 用户授权 -> 提交 code 换取 token -> 创建账号 # - 授权流程: S2A 生成授权 URL -> 用户授权 -> 提交 code 换取 token -> 创建账号
# - 账号入库: S2A 可一步完成 (create-from-oauth) 或分步完成 (exchange + add_account) # - 账号入库: S2A 可一步完成 (create-from-oauth) 或分步完成 (exchange + add_account)
#
# 新增: 纯 API 授权模式 (无需浏览器)
# - 使用 curl_cffi 模拟浏览器指纹
# - 支持 Sentinel PoW 验证
# - 直接通过 API 完成 OAuth 流程
import requests import requests
from requests.adapters import HTTPAdapter from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry from urllib3.util.retry import Retry
from urllib.parse import urlparse, parse_qs from urllib.parse import urlparse, parse_qs
from typing import Optional, Tuple, Dict, List, Any from typing import Optional, Tuple, Dict, List, Any
import json
import uuid
import time
import random
import base64
import hashlib
from datetime import datetime, timedelta, timezone
from config import ( from config import (
S2A_API_BASE, S2A_API_BASE,
@@ -23,6 +35,8 @@ from config import (
S2A_GROUP_NAMES, S2A_GROUP_NAMES,
REQUEST_TIMEOUT, REQUEST_TIMEOUT,
USER_AGENT, USER_AGENT,
get_next_proxy,
format_proxy_url,
) )
from logger import log from logger import log
@@ -49,6 +63,349 @@ def create_session_with_retry() -> requests.Session:
http_session = create_session_with_retry() http_session = create_session_with_retry()
# ==================== PoW Solver (Sentinel 验证) ====================
def _fnv1a_32(data: bytes) -> int:
"""FNV-1a 32-bit hash"""
h = 2166136261
for byte in data:
h ^= byte
h = (h * 16777619) & 0xFFFFFFFF
h ^= (h >> 16)
h = (h * 2246822507) & 0xFFFFFFFF
h ^= (h >> 13)
h = (h * 3266489909) & 0xFFFFFFFF
h ^= (h >> 16)
return h
def _get_parse_time() -> str:
"""生成 JS Date().toString() 格式的时间戳"""
now = datetime.now(timezone(timedelta(hours=8)))
return now.strftime("%a %b %d %Y %H:%M:%S") + " GMT+0800 (中国标准时间)"
def _get_pow_config(user_agent: str, sid: str = None) -> list:
"""生成 PoW 配置数组"""
if not sid:
sid = str(uuid.uuid4())
return [
random.randint(2500, 3500),
_get_parse_time(),
4294967296,
0,
user_agent,
"chrome-extension://pgojnojmmhpofjgdmaebadhbocahppod/assets/aW5qZWN0X2hhc2g/aW5qZ",
None,
"zh-CN",
"zh-CN",
0,
f"canSharefunction canShare() {{ [native code] }}",
f"_reactListening{random.randint(1000000, 9999999)}",
"onhashchange",
time.perf_counter() * 1000,
sid,
"",
24,
int(time.time() * 1000 - random.randint(10000, 50000))
]
def _solve_pow(seed: str, difficulty: str, config: list, max_iterations: int = 5000000) -> Optional[str]:
"""CPU 求解 PoW"""
start_time = time.perf_counter()
seed_bytes = seed.encode()
for iteration in range(max_iterations):
config[3] = iteration
config[9] = 0
json_str = json.dumps(config, separators=(',', ':'))
encoded = base64.b64encode(json_str.encode())
h = _fnv1a_32(seed_bytes + encoded)
hex_hash = f"{h:08x}"
if hex_hash[:len(difficulty)] <= difficulty:
elapsed = time.perf_counter() - start_time
log.debug(f"[PoW] 求解完成: {elapsed:.2f}s (迭代 {iteration:,}, 难度={difficulty})")
return f"{encoded.decode()}~S"
return None
def _get_requirements_token(user_agent: str, sid: str = None) -> str:
"""生成 requirements token"""
if not sid:
sid = str(uuid.uuid4())
config = _get_pow_config(user_agent, sid)
config[3] = 0
config[9] = 0
json_str = json.dumps(config, separators=(',', ':'))
encoded = base64.b64encode(json_str.encode()).decode()
return f"gAAAAAC{encoded}~S"
# ==================== S2A API 授权器 ====================
class S2AApiAuthorizer:
"""S2A 纯 API 授权器 - 无需浏览器"""
def __init__(self, email: str, password: str, proxy: str = None):
self.email = email
self.password = password
# 尝试导入 curl_cffi如果失败则使用 requests
try:
from curl_cffi import requests as cffi_requests
self.session = cffi_requests.Session(impersonate="chrome110")
self._use_cffi = True
except ImportError:
log.warning("curl_cffi 未安装,使用 requests (可能被检测)")
self.session = requests.Session()
self._use_cffi = False
if proxy:
self.session.proxies = {"http": proxy, "https": proxy}
self.ua = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36"
self.sid = str(uuid.uuid4())
self.device_id = str(uuid.uuid4())
self.sentinel_token = None
self.solved_pow = None
self.session.headers.update({
"User-Agent": self.ua,
"Accept": "*/*",
"Accept-Language": "en-US,en;q=0.9",
"sec-ch-ua": '"Not(A:Brand";v="8", "Chromium";v="144", "Microsoft Edge";v="144"',
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": '"Windows"',
"sec-fetch-dest": "empty",
"sec-fetch-mode": "cors",
"sec-fetch-site": "same-origin",
})
def _call_sentinel_req(self, flow: str) -> Optional[dict]:
"""调用 sentinel 获取 token 和处理 PoW"""
init_token = _get_requirements_token(self.ua, self.sid)
payload = {"p": init_token, "id": self.device_id, "flow": flow}
try:
resp = self.session.post(
"https://sentinel.openai.com/backend-api/sentinel/req",
json=payload,
timeout=15
)
if resp.status_code != 200:
log.warning(f"Sentinel 请求失败: {resp.status_code}")
return None
data = resp.json()
self.sentinel_token = data.get('token')
pow_req = data.get('proofofwork', {})
if pow_req.get('required'):
seed = pow_req.get('seed', '')
difficulty = pow_req.get('difficulty', '')
config = _get_pow_config(self.ua, self.sid)
solved = _solve_pow(seed, difficulty, config)
if solved:
self.solved_pow = f"gAAAAAB{solved}"
else:
log.error("PoW 求解失败")
return None
else:
self.solved_pow = init_token
return data
except Exception as e:
log.error(f"Sentinel 异常: {e}")
return None
def _get_sentinel_header(self, header_flow: str) -> str:
"""构建 sentinel header"""
sentinel_obj = {"p": self.solved_pow, "id": self.device_id, "flow": header_flow}
if self.sentinel_token:
sentinel_obj["c"] = self.sentinel_token
return json.dumps(sentinel_obj)
def get_authorization_code(self, auth_url: str) -> Optional[str]:
"""执行 OAuth 流程,返回 authorization code
Args:
auth_url: S2A 生成的授权 URL
Returns:
str: 授权码 或 None
"""
log.step("开始 API 授权流程...")
headers = {
"Origin": "https://auth.openai.com",
"Referer": "https://auth.openai.com/log-in",
"Content-Type": "application/json"
}
try:
# 1. 访问授权端点
log.step("访问授权端点...")
resp = self.session.get(auth_url, allow_redirects=True)
headers["Referer"] = resp.url
# 2. 提交邮箱
log.step("提交邮箱...")
if not self._call_sentinel_req("login_web_init"):
return None
auth_headers = headers.copy()
auth_headers["OpenAI-Sentinel-Token"] = self._get_sentinel_header("authorize_continue")
resp = self.session.post(
"https://auth.openai.com/api/accounts/authorize/continue",
json={"username": {"kind": "email", "value": self.email}},
headers=auth_headers
)
if resp.status_code != 200:
log.error(f"邮箱提交失败: {resp.status_code} - {resp.text[:200]}")
return None
data = resp.json()
page_type = data.get("page", {}).get("type", "")
# 3. 验证密码
if page_type == "password" or "password" in str(data):
log.step("验证密码...")
if not self._call_sentinel_req("authorize_continue__auto"):
return None
verify_headers = headers.copy()
verify_headers["OpenAI-Sentinel-Token"] = self._get_sentinel_header("password_verify")
resp = self.session.post(
"https://auth.openai.com/api/accounts/password/verify",
json={"username": self.email, "password": self.password},
headers=verify_headers
)
if resp.status_code != 200:
log.error(f"密码验证失败: {resp.status_code} - {resp.text[:200]}")
return None
# 4. 获取 continue_url (无需选择 workspaceS2A 授权链接已包含)
data = resp.json()
continue_url = data.get("continue_url")
# 如果没有 continue_url可能需要额外的 sentinel 调用
if not continue_url:
log.step("获取重定向 URL...")
if not self._call_sentinel_req("password_verify__auto"):
return None
# 尝试再次获取
resp = self.session.post(
"https://auth.openai.com/api/accounts/authorize/continue",
json={},
headers=auth_headers
)
if resp.status_code == 200:
data = resp.json()
continue_url = data.get("continue_url")
if not continue_url:
log.error(f"无法获取 continue_url: {data}")
return None
# 5. 跟踪重定向直到获取 code
log.step("跟踪重定向...")
for _ in range(10):
resp = self.session.get(continue_url, allow_redirects=False)
if resp.status_code in (301, 302, 303, 307, 308):
location = resp.headers.get('Location', '')
if "localhost:1455" in location:
parsed = urlparse(location)
query = parse_qs(parsed.query)
code = query.get('code', [None])[0]
if code:
log.success("成功获取授权码")
return code
continue_url = location
else:
break
log.error("无法获取授权码")
return None
except Exception as e:
log.error(f"API 授权异常: {e}")
import traceback
traceback.print_exc()
return None
def s2a_api_authorize(
email: str,
password: str,
proxy: str = None
) -> Tuple[bool, Optional[Dict[str, Any]]]:
"""S2A 纯 API 授权 (无需浏览器)
使用 OpenAI 认证 API 直接完成授权流程,无需浏览器自动化。
Args:
email: 账号邮箱
password: 账号密码
proxy: 代理地址 (可选,格式: http://host:port 或 socks5://user:pass@host:port)
Returns:
tuple: (是否成功, 账号数据或None)
"""
if not S2A_API_BASE or (not S2A_ADMIN_KEY and not S2A_ADMIN_TOKEN):
log.error("S2A 未配置")
return False, None
# 使用配置的代理
if not proxy:
proxy_config = get_next_proxy()
if proxy_config:
proxy = format_proxy_url(proxy_config)
log.info(f"开始 S2A API 授权: {email}", icon="code")
if proxy:
log.debug(f"使用代理: {proxy[:30]}...")
try:
# 1. 生成授权 URL
auth_url, session_id = s2a_generate_auth_url()
if not auth_url or not session_id:
log.error("无法获取 S2A 授权 URL")
return False, None
log.debug(f"授权 URL: {auth_url[:80]}...")
log.debug(f"Session ID: {session_id[:16]}...")
# 2. 使用 API 授权器获取 code
authorizer = S2AApiAuthorizer(email, password, proxy)
code = authorizer.get_authorization_code(auth_url)
if not code:
log.error("API 授权失败,无法获取授权码")
return False, None
log.debug(f"授权码: {code[:20]}...")
# 3. 提交授权码创建账号
log.step("提交授权码到 S2A...")
result = s2a_create_account_from_oauth(code, session_id, name=email)
if result:
log.success(f"S2A API 授权成功: {email}")
return True, result
else:
log.error("S2A 账号入库失败")
return False, None
except Exception as e:
log.error(f"S2A API 授权异常: {e}")
return False, None
def build_s2a_headers() -> Dict[str, str]: def build_s2a_headers() -> Dict[str, str]:
"""构建 S2A API 请求的 Headers """构建 S2A API 请求的 Headers
@@ -1211,3 +1568,122 @@ def format_keys_usage(keys: List[Dict[str, Any]], period_text: str = "今日") -
lines.append(f" 费用: {fmt_cost(total_cost)}") lines.append(f" 费用: {fmt_cost(total_cost)}")
return "\n".join(lines) return "\n".join(lines)
# ==================== 批量 API 授权 ====================
def s2a_batch_api_authorize(
accounts: List[Dict[str, str]],
proxy: str = None,
progress_callback: Optional[callable] = None
) -> Dict[str, Any]:
"""批量使用 API 模式授权账号到 S2A
无需浏览器,直接通过 OpenAI 认证 API 完成授权。
Args:
accounts: 账号列表 [{"email": "xxx", "password": "xxx"}, ...]
proxy: 代理地址 (可选)
progress_callback: 进度回调函数 (current, total, email, status, message)
Returns:
dict: {
"success": int,
"failed": int,
"total": int,
"details": [{"email": "xxx", "status": "success/failed", "message": "xxx"}, ...]
}
"""
results = {
"success": 0,
"failed": 0,
"total": len(accounts),
"details": []
}
if not S2A_API_BASE or (not S2A_ADMIN_KEY and not S2A_ADMIN_TOKEN):
log.error("S2A 未配置")
return results
# 使用配置的代理
if not proxy:
proxy_config = get_next_proxy()
if proxy_config:
proxy = format_proxy_url(proxy_config)
log.info(f"开始批量 API 授权: {len(accounts)} 个账号")
for i, acc in enumerate(accounts):
email = acc.get("email", "")
password = acc.get("password", "")
if not email or not password:
results["failed"] += 1
results["details"].append({
"email": email or "unknown",
"status": "failed",
"message": "缺少邮箱或密码"
})
if progress_callback:
progress_callback(i + 1, len(accounts), email, "failed", "缺少邮箱或密码")
continue
try:
success, result = s2a_api_authorize(email, password, proxy)
if success:
results["success"] += 1
account_id = result.get("id", "") if result else ""
results["details"].append({
"email": email,
"status": "success",
"message": f"ID: {account_id}"
})
if progress_callback:
progress_callback(i + 1, len(accounts), email, "success", f"ID: {account_id}")
else:
results["failed"] += 1
results["details"].append({
"email": email,
"status": "failed",
"message": "授权失败"
})
if progress_callback:
progress_callback(i + 1, len(accounts), email, "failed", "授权失败")
except Exception as e:
results["failed"] += 1
results["details"].append({
"email": email,
"status": "failed",
"message": str(e)
})
if progress_callback:
progress_callback(i + 1, len(accounts), email, "failed", str(e))
log.success(f"批量授权完成: 成功 {results['success']}, 失败 {results['failed']}")
return results
def s2a_api_authorize_single(
email: str,
password: str,
proxy: str = None
) -> Tuple[bool, str]:
"""单个账号 API 授权 (简化返回值)
Args:
email: 账号邮箱
password: 账号密码
proxy: 代理地址 (可选)
Returns:
tuple: (是否成功, 消息)
"""
success, result = s2a_api_authorize(email, password, proxy)
if success:
account_id = result.get("id", "") if result else ""
return True, f"授权成功 (ID: {account_id})"
else:
return False, "授权失败"

View File

@@ -48,6 +48,7 @@ from config import (
S2A_GROUP_NAMES, S2A_GROUP_NAMES,
S2A_GROUP_IDS, S2A_GROUP_IDS,
S2A_ADMIN_KEY, S2A_ADMIN_KEY,
S2A_API_MODE,
BROWSER_RANDOM_FINGERPRINT, BROWSER_RANDOM_FINGERPRINT,
batch_remove_teams_by_names, batch_remove_teams_by_names,
) )
@@ -55,7 +56,8 @@ from utils import load_team_tracker, get_all_incomplete_accounts, save_team_trac
from bot_notifier import BotNotifier, set_notifier, progress_finish from bot_notifier import BotNotifier, set_notifier, progress_finish
from s2a_service import ( from s2a_service import (
s2a_get_dashboard_stats, format_dashboard_stats, s2a_get_keys_with_usage, format_keys_usage, s2a_get_dashboard_stats, format_dashboard_stats, s2a_get_keys_with_usage, format_keys_usage,
s2a_get_error_accounts, s2a_delete_account, s2a_batch_delete_error_accounts s2a_get_error_accounts, s2a_delete_account, s2a_batch_delete_error_accounts,
s2a_api_authorize_single, s2a_batch_api_authorize
) )
from email_service import gptmail_service, unified_create_email from email_service import gptmail_service, unified_create_email
from logger import log from logger import log
@@ -159,6 +161,7 @@ class ProvisionerBot:
("include_owners", self.cmd_include_owners), ("include_owners", self.cmd_include_owners),
("reload", self.cmd_reload), ("reload", self.cmd_reload),
("s2a_config", self.cmd_s2a_config), ("s2a_config", self.cmd_s2a_config),
("api_auth", self.cmd_api_auth),
("clean", self.cmd_clean), ("clean", self.cmd_clean),
("clean_errors", self.cmd_clean_errors), ("clean_errors", self.cmd_clean_errors),
("clean_teams", self.cmd_clean_teams), ("clean_teams", self.cmd_clean_teams),
@@ -287,6 +290,7 @@ class ProvisionerBot:
BotCommand("keys_usage", "查看 API 密钥用量"), BotCommand("keys_usage", "查看 API 密钥用量"),
BotCommand("stock", "查看账号库存"), BotCommand("stock", "查看账号库存"),
BotCommand("s2a_config", "配置 S2A 参数"), BotCommand("s2a_config", "配置 S2A 参数"),
BotCommand("api_auth", "API 模式授权账号"),
BotCommand("import", "导入账号到 team.json"), BotCommand("import", "导入账号到 team.json"),
BotCommand("verify", "验证未验证的账号"), BotCommand("verify", "验证未验证的账号"),
BotCommand("verify_all", "强制重新验证所有账号"), BotCommand("verify_all", "强制重新验证所有账号"),
@@ -355,6 +359,7 @@ class ProvisionerBot:
/keys_usage - 查看 API 密钥用量 /keys_usage - 查看 API 密钥用量
/stock - 查看账号库存 /stock - 查看账号库存
/s2a_config - 配置 S2A 参数 /s2a_config - 配置 S2A 参数
/api_auth - API 模式授权 (无需浏览器)
/clean_errors - 清理错误状态账号 /clean_errors - 清理错误状态账号
<b>🧹 清理管理:</b> <b>🧹 清理管理:</b>
@@ -1072,6 +1077,92 @@ class ProvisionerBot:
except Exception as e: except Exception as e:
await update.message.reply_text(f"❌ 修改配置失败: {e}") await update.message.reply_text(f"❌ 修改配置失败: {e}")
@admin_only
async def cmd_api_auth(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""使用纯 API 模式授权账号到 S2A (无需浏览器)
用法:
/api_auth email password - 授权单个账号
/api_auth - 显示帮助信息
"""
if AUTH_PROVIDER != "s2a":
await update.message.reply_text(
"❌ 此命令仅在 S2A 模式下可用\n"
"当前授权服务: " + AUTH_PROVIDER
)
return
# 无参数时显示帮助
if not context.args:
api_mode_status = "✅ 已启用" if S2A_API_MODE else "❌ 未启用"
lines = [
"<b>🔐 S2A API 授权</b>",
"",
f"<b>API 模式:</b> {api_mode_status}",
"",
"<b>用法:</b>",
"<code>/api_auth email password</code>",
"",
"<b>示例:</b>",
"<code>/api_auth test@example.com MyPassword123</code>",
"",
"<b>说明:</b>",
"• 使用纯 API 方式完成 OAuth 授权",
"• 无需浏览器,更快更稳定",
"• 需要安装 curl_cffi: <code>pip install curl_cffi</code>",
"",
"<b>💡 提示:</b>",
"• 在 config.toml 中设置 <code>s2a.api_mode = true</code>",
"• 可让所有 S2A 授权自动使用 API 模式",
]
await update.message.reply_text("\n".join(lines), parse_mode="HTML")
return
# 解析参数
if len(context.args) < 2:
await update.message.reply_text(
"❌ 参数不足\n"
"用法: /api_auth <email> <password>"
)
return
email = context.args[0]
password = " ".join(context.args[1:]) # 密码可能包含空格
# 发送处理中消息
msg = await update.message.reply_text(
f"🔄 正在授权: <code>{email}</code>\n"
"请稍候...",
parse_mode="HTML"
)
try:
# 执行 API 授权
success, message = s2a_api_authorize_single(email, password)
if success:
await msg.edit_text(
f"✅ <b>授权成功</b>\n\n"
f"📧 邮箱: <code>{email}</code>\n"
f"📝 {message}",
parse_mode="HTML"
)
else:
await msg.edit_text(
f"❌ <b>授权失败</b>\n\n"
f"📧 邮箱: <code>{email}</code>\n"
f"📝 {message}",
parse_mode="HTML"
)
except Exception as e:
await msg.edit_text(
f"❌ <b>授权异常</b>\n\n"
f"📧 邮箱: <code>{email}</code>\n"
f"📝 错误: {str(e)}",
parse_mode="HTML"
)
@admin_only @admin_only
async def cmd_run(self, update: Update, context: ContextTypes.DEFAULT_TYPE): async def cmd_run(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""启动处理指定数量的 Team - 交互式选择""" """启动处理指定数量的 Team - 交互式选择"""