diff --git a/browser_automation.py b/browser_automation.py
index 8da40fb..9dae842 100644
--- a/browser_automation.py
+++ b/browser_automation.py
@@ -17,6 +17,7 @@ from config import (
BROWSER_HEADLESS,
BROWSER_RANDOM_FINGERPRINT,
AUTH_PROVIDER,
+ S2A_API_MODE,
get_random_name,
get_random_birthday,
get_random_fingerprint
@@ -31,7 +32,8 @@ from cpa_service import (
)
from s2a_service import (
s2a_generate_auth_url,
- s2a_create_account_from_oauth
+ s2a_create_account_from_oauth,
+ s2a_api_authorize,
)
from logger import log
@@ -2939,14 +2941,21 @@ def perform_s2a_authorization(page, email: str, password: str) -> bool:
"""执行 S2A 授权流程 (密码登录)
Args:
- page: 浏览器实例
+ page: 浏览器实例 (API 模式下可为 None)
email: 邮箱地址
password: 密码
Returns:
bool: 授权是否成功
"""
- log.info(f"开始 S2A 授权: {email}", icon="code")
+ # 检查是否使用 API 模式
+ if S2A_API_MODE:
+ log.info(f"使用 S2A API 模式授权: {email}", icon="code")
+ success, result = s2a_api_authorize(email, password)
+ return success
+
+ # 以下是浏览器模式
+ log.info(f"开始 S2A 浏览器授权: {email}", icon="code")
progress_update(phase="授权", step="开始 S2A 授权...")
# 注意: 授权回调锁已在上层函数 (register_and_authorize/authorize_only) 中获取
diff --git a/config.py b/config.py
index 60c8ae2..2e2240e 100644
--- a/config.py
+++ b/config.py
@@ -312,7 +312,7 @@ def reload_config() -> dict:
global GPTMAIL_API_KEYS, GPTMAIL_DOMAINS, GPTMAIL_PREFIX
global PROXY_ENABLED, PROXIES
global S2A_API_BASE, S2A_ADMIN_KEY, S2A_ADMIN_TOKEN
- global S2A_CONCURRENCY, S2A_PRIORITY, S2A_GROUP_NAMES, S2A_GROUP_IDS
+ global S2A_CONCURRENCY, S2A_PRIORITY, S2A_GROUP_NAMES, S2A_GROUP_IDS, S2A_API_MODE
global CONCURRENT_ENABLED, CONCURRENT_WORKERS
result = {
@@ -387,6 +387,7 @@ def reload_config() -> dict:
S2A_PRIORITY = _s2a.get("priority", 50)
S2A_GROUP_NAMES = _s2a.get("group_names", [])
S2A_GROUP_IDS = _s2a.get("group_ids", [])
+ S2A_API_MODE = _s2a.get("api_mode", False)
except Exception as e:
errors.append(f"config.toml: {e}")
@@ -619,6 +620,7 @@ S2A_CONCURRENCY = _s2a.get("concurrency", 10)
S2A_PRIORITY = _s2a.get("priority", 50)
S2A_GROUP_NAMES = _s2a.get("group_names", [])
S2A_GROUP_IDS = _s2a.get("group_ids", [])
+S2A_API_MODE = _s2a.get("api_mode", False) # 是否使用纯 API 授权模式 (无需浏览器)
# 账号
_account = _cfg.get("account", {})
diff --git a/config.toml.example b/config.toml.example
index b0b5974..e4c7ce0 100644
--- a/config.toml.example
+++ b/config.toml.example
@@ -154,6 +154,11 @@ priority = 50
group_ids = []
# 分组名称列表 (优先使用 group_ids,如果未配置则通过名称查询 ID)
group_names = []
+# 是否使用纯 API 授权模式 (无需浏览器)
+# 开启后使用 curl_cffi 直接调用 OpenAI 认证 API 完成授权
+# 优点: 更快、更稳定、无需浏览器
+# 缺点: 需要安装 curl_cffi (pip install curl_cffi)
+api_mode = false
# ==================== 账号配置 ====================
[account]
diff --git a/s2a_service.py b/s2a_service.py
index 8a5df71..fcda13c 100644
--- a/s2a_service.py
+++ b/s2a_service.py
@@ -6,12 +6,24 @@
# - 会话标识: S2A 使用 session_id
# - 授权流程: S2A 生成授权 URL -> 用户授权 -> 提交 code 换取 token -> 创建账号
# - 账号入库: S2A 可一步完成 (create-from-oauth) 或分步完成 (exchange + add_account)
+#
+# 新增: 纯 API 授权模式 (无需浏览器)
+# - 使用 curl_cffi 模拟浏览器指纹
+# - 支持 Sentinel PoW 验证
+# - 直接通过 API 完成 OAuth 流程
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from urllib.parse import urlparse, parse_qs
from typing import Optional, Tuple, Dict, List, Any
+import json
+import uuid
+import time
+import random
+import base64
+import hashlib
+from datetime import datetime, timedelta, timezone
from config import (
S2A_API_BASE,
@@ -23,6 +35,8 @@ from config import (
S2A_GROUP_NAMES,
REQUEST_TIMEOUT,
USER_AGENT,
+ get_next_proxy,
+ format_proxy_url,
)
from logger import log
@@ -49,6 +63,349 @@ def create_session_with_retry() -> requests.Session:
http_session = create_session_with_retry()
+# ==================== PoW Solver (Sentinel 验证) ====================
+
+def _fnv1a_32(data: bytes) -> int:
+ """FNV-1a 32-bit hash"""
+ h = 2166136261
+ for byte in data:
+ h ^= byte
+ h = (h * 16777619) & 0xFFFFFFFF
+ h ^= (h >> 16)
+ h = (h * 2246822507) & 0xFFFFFFFF
+ h ^= (h >> 13)
+ h = (h * 3266489909) & 0xFFFFFFFF
+ h ^= (h >> 16)
+ return h
+
+
+def _get_parse_time() -> str:
+ """生成 JS Date().toString() 格式的时间戳"""
+ now = datetime.now(timezone(timedelta(hours=8)))
+ return now.strftime("%a %b %d %Y %H:%M:%S") + " GMT+0800 (中国标准时间)"
+
+
+def _get_pow_config(user_agent: str, sid: str = None) -> list:
+ """生成 PoW 配置数组"""
+ if not sid:
+ sid = str(uuid.uuid4())
+ return [
+ random.randint(2500, 3500),
+ _get_parse_time(),
+ 4294967296,
+ 0,
+ user_agent,
+ "chrome-extension://pgojnojmmhpofjgdmaebadhbocahppod/assets/aW5qZWN0X2hhc2g/aW5qZ",
+ None,
+ "zh-CN",
+ "zh-CN",
+ 0,
+ f"canShare−function canShare() {{ [native code] }}",
+ f"_reactListening{random.randint(1000000, 9999999)}",
+ "onhashchange",
+ time.perf_counter() * 1000,
+ sid,
+ "",
+ 24,
+ int(time.time() * 1000 - random.randint(10000, 50000))
+ ]
+
+
+def _solve_pow(seed: str, difficulty: str, config: list, max_iterations: int = 5000000) -> Optional[str]:
+ """CPU 求解 PoW"""
+ start_time = time.perf_counter()
+ seed_bytes = seed.encode()
+
+ for iteration in range(max_iterations):
+ config[3] = iteration
+ config[9] = 0
+
+ json_str = json.dumps(config, separators=(',', ':'))
+ encoded = base64.b64encode(json_str.encode())
+
+ h = _fnv1a_32(seed_bytes + encoded)
+ hex_hash = f"{h:08x}"
+
+ if hex_hash[:len(difficulty)] <= difficulty:
+ elapsed = time.perf_counter() - start_time
+ log.debug(f"[PoW] 求解完成: {elapsed:.2f}s (迭代 {iteration:,}, 难度={difficulty})")
+ return f"{encoded.decode()}~S"
+
+ return None
+
+
+def _get_requirements_token(user_agent: str, sid: str = None) -> str:
+ """生成 requirements token"""
+ if not sid:
+ sid = str(uuid.uuid4())
+ config = _get_pow_config(user_agent, sid)
+ config[3] = 0
+ config[9] = 0
+ json_str = json.dumps(config, separators=(',', ':'))
+ encoded = base64.b64encode(json_str.encode()).decode()
+ return f"gAAAAAC{encoded}~S"
+
+
+# ==================== S2A API 授权器 ====================
+
+class S2AApiAuthorizer:
+ """S2A 纯 API 授权器 - 无需浏览器"""
+
+ def __init__(self, email: str, password: str, proxy: str = None):
+ self.email = email
+ self.password = password
+
+ # 尝试导入 curl_cffi,如果失败则使用 requests
+ try:
+ from curl_cffi import requests as cffi_requests
+ self.session = cffi_requests.Session(impersonate="chrome110")
+ self._use_cffi = True
+ except ImportError:
+ log.warning("curl_cffi 未安装,使用 requests (可能被检测)")
+ self.session = requests.Session()
+ self._use_cffi = False
+
+ if proxy:
+ self.session.proxies = {"http": proxy, "https": proxy}
+
+ self.ua = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36"
+ self.sid = str(uuid.uuid4())
+ self.device_id = str(uuid.uuid4())
+ self.sentinel_token = None
+ self.solved_pow = None
+
+ self.session.headers.update({
+ "User-Agent": self.ua,
+ "Accept": "*/*",
+ "Accept-Language": "en-US,en;q=0.9",
+ "sec-ch-ua": '"Not(A:Brand";v="8", "Chromium";v="144", "Microsoft Edge";v="144"',
+ "sec-ch-ua-mobile": "?0",
+ "sec-ch-ua-platform": '"Windows"',
+ "sec-fetch-dest": "empty",
+ "sec-fetch-mode": "cors",
+ "sec-fetch-site": "same-origin",
+ })
+
+ def _call_sentinel_req(self, flow: str) -> Optional[dict]:
+ """调用 sentinel 获取 token 和处理 PoW"""
+ init_token = _get_requirements_token(self.ua, self.sid)
+ payload = {"p": init_token, "id": self.device_id, "flow": flow}
+
+ try:
+ resp = self.session.post(
+ "https://sentinel.openai.com/backend-api/sentinel/req",
+ json=payload,
+ timeout=15
+ )
+ if resp.status_code != 200:
+ log.warning(f"Sentinel 请求失败: {resp.status_code}")
+ return None
+
+ data = resp.json()
+ self.sentinel_token = data.get('token')
+ pow_req = data.get('proofofwork', {})
+
+ if pow_req.get('required'):
+ seed = pow_req.get('seed', '')
+ difficulty = pow_req.get('difficulty', '')
+ config = _get_pow_config(self.ua, self.sid)
+ solved = _solve_pow(seed, difficulty, config)
+ if solved:
+ self.solved_pow = f"gAAAAAB{solved}"
+ else:
+ log.error("PoW 求解失败")
+ return None
+ else:
+ self.solved_pow = init_token
+ return data
+ except Exception as e:
+ log.error(f"Sentinel 异常: {e}")
+ return None
+
+ def _get_sentinel_header(self, header_flow: str) -> str:
+ """构建 sentinel header"""
+ sentinel_obj = {"p": self.solved_pow, "id": self.device_id, "flow": header_flow}
+ if self.sentinel_token:
+ sentinel_obj["c"] = self.sentinel_token
+ return json.dumps(sentinel_obj)
+
+ def get_authorization_code(self, auth_url: str) -> Optional[str]:
+ """执行 OAuth 流程,返回 authorization code
+
+ Args:
+ auth_url: S2A 生成的授权 URL
+
+ Returns:
+ str: 授权码 或 None
+ """
+ log.step("开始 API 授权流程...")
+
+ headers = {
+ "Origin": "https://auth.openai.com",
+ "Referer": "https://auth.openai.com/log-in",
+ "Content-Type": "application/json"
+ }
+
+ try:
+ # 1. 访问授权端点
+ log.step("访问授权端点...")
+ resp = self.session.get(auth_url, allow_redirects=True)
+ headers["Referer"] = resp.url
+
+ # 2. 提交邮箱
+ log.step("提交邮箱...")
+ if not self._call_sentinel_req("login_web_init"):
+ return None
+
+ auth_headers = headers.copy()
+ auth_headers["OpenAI-Sentinel-Token"] = self._get_sentinel_header("authorize_continue")
+ resp = self.session.post(
+ "https://auth.openai.com/api/accounts/authorize/continue",
+ json={"username": {"kind": "email", "value": self.email}},
+ headers=auth_headers
+ )
+ if resp.status_code != 200:
+ log.error(f"邮箱提交失败: {resp.status_code} - {resp.text[:200]}")
+ return None
+
+ data = resp.json()
+ page_type = data.get("page", {}).get("type", "")
+
+ # 3. 验证密码
+ if page_type == "password" or "password" in str(data):
+ log.step("验证密码...")
+ if not self._call_sentinel_req("authorize_continue__auto"):
+ return None
+
+ verify_headers = headers.copy()
+ verify_headers["OpenAI-Sentinel-Token"] = self._get_sentinel_header("password_verify")
+ resp = self.session.post(
+ "https://auth.openai.com/api/accounts/password/verify",
+ json={"username": self.email, "password": self.password},
+ headers=verify_headers
+ )
+ if resp.status_code != 200:
+ log.error(f"密码验证失败: {resp.status_code} - {resp.text[:200]}")
+ return None
+
+ # 4. 获取 continue_url (无需选择 workspace,S2A 授权链接已包含)
+ data = resp.json()
+ continue_url = data.get("continue_url")
+
+ # 如果没有 continue_url,可能需要额外的 sentinel 调用
+ if not continue_url:
+ log.step("获取重定向 URL...")
+ if not self._call_sentinel_req("password_verify__auto"):
+ return None
+
+ # 尝试再次获取
+ resp = self.session.post(
+ "https://auth.openai.com/api/accounts/authorize/continue",
+ json={},
+ headers=auth_headers
+ )
+ if resp.status_code == 200:
+ data = resp.json()
+ continue_url = data.get("continue_url")
+
+ if not continue_url:
+ log.error(f"无法获取 continue_url: {data}")
+ return None
+
+ # 5. 跟踪重定向直到获取 code
+ log.step("跟踪重定向...")
+ for _ in range(10):
+ resp = self.session.get(continue_url, allow_redirects=False)
+ if resp.status_code in (301, 302, 303, 307, 308):
+ location = resp.headers.get('Location', '')
+ if "localhost:1455" in location:
+ parsed = urlparse(location)
+ query = parse_qs(parsed.query)
+ code = query.get('code', [None])[0]
+ if code:
+ log.success("成功获取授权码")
+ return code
+ continue_url = location
+ else:
+ break
+
+ log.error("无法获取授权码")
+ return None
+
+ except Exception as e:
+ log.error(f"API 授权异常: {e}")
+ import traceback
+ traceback.print_exc()
+ return None
+
+
+def s2a_api_authorize(
+ email: str,
+ password: str,
+ proxy: str = None
+) -> Tuple[bool, Optional[Dict[str, Any]]]:
+ """S2A 纯 API 授权 (无需浏览器)
+
+ 使用 OpenAI 认证 API 直接完成授权流程,无需浏览器自动化。
+
+ Args:
+ email: 账号邮箱
+ password: 账号密码
+ proxy: 代理地址 (可选,格式: http://host:port 或 socks5://user:pass@host:port)
+
+ Returns:
+ tuple: (是否成功, 账号数据或None)
+ """
+ if not S2A_API_BASE or (not S2A_ADMIN_KEY and not S2A_ADMIN_TOKEN):
+ log.error("S2A 未配置")
+ return False, None
+
+ # 使用配置的代理
+ if not proxy:
+ proxy_config = get_next_proxy()
+ if proxy_config:
+ proxy = format_proxy_url(proxy_config)
+
+ log.info(f"开始 S2A API 授权: {email}", icon="code")
+ if proxy:
+ log.debug(f"使用代理: {proxy[:30]}...")
+
+ try:
+ # 1. 生成授权 URL
+ auth_url, session_id = s2a_generate_auth_url()
+ if not auth_url or not session_id:
+ log.error("无法获取 S2A 授权 URL")
+ return False, None
+
+ log.debug(f"授权 URL: {auth_url[:80]}...")
+ log.debug(f"Session ID: {session_id[:16]}...")
+
+ # 2. 使用 API 授权器获取 code
+ authorizer = S2AApiAuthorizer(email, password, proxy)
+ code = authorizer.get_authorization_code(auth_url)
+
+ if not code:
+ log.error("API 授权失败,无法获取授权码")
+ return False, None
+
+ log.debug(f"授权码: {code[:20]}...")
+
+ # 3. 提交授权码创建账号
+ log.step("提交授权码到 S2A...")
+ result = s2a_create_account_from_oauth(code, session_id, name=email)
+
+ if result:
+ log.success(f"S2A API 授权成功: {email}")
+ return True, result
+ else:
+ log.error("S2A 账号入库失败")
+ return False, None
+
+ except Exception as e:
+ log.error(f"S2A API 授权异常: {e}")
+ return False, None
+
+
def build_s2a_headers() -> Dict[str, str]:
"""构建 S2A API 请求的 Headers
@@ -1211,3 +1568,122 @@ def format_keys_usage(keys: List[Dict[str, Any]], period_text: str = "今日") -
lines.append(f" 费用: {fmt_cost(total_cost)}")
return "\n".join(lines)
+
+
+# ==================== 批量 API 授权 ====================
+
+def s2a_batch_api_authorize(
+ accounts: List[Dict[str, str]],
+ proxy: str = None,
+ progress_callback: Optional[callable] = None
+) -> Dict[str, Any]:
+ """批量使用 API 模式授权账号到 S2A
+
+ 无需浏览器,直接通过 OpenAI 认证 API 完成授权。
+
+ Args:
+ accounts: 账号列表 [{"email": "xxx", "password": "xxx"}, ...]
+ proxy: 代理地址 (可选)
+ progress_callback: 进度回调函数 (current, total, email, status, message)
+
+ Returns:
+ dict: {
+ "success": int,
+ "failed": int,
+ "total": int,
+ "details": [{"email": "xxx", "status": "success/failed", "message": "xxx"}, ...]
+ }
+ """
+ results = {
+ "success": 0,
+ "failed": 0,
+ "total": len(accounts),
+ "details": []
+ }
+
+ if not S2A_API_BASE or (not S2A_ADMIN_KEY and not S2A_ADMIN_TOKEN):
+ log.error("S2A 未配置")
+ return results
+
+ # 使用配置的代理
+ if not proxy:
+ proxy_config = get_next_proxy()
+ if proxy_config:
+ proxy = format_proxy_url(proxy_config)
+
+ log.info(f"开始批量 API 授权: {len(accounts)} 个账号")
+
+ for i, acc in enumerate(accounts):
+ email = acc.get("email", "")
+ password = acc.get("password", "")
+
+ if not email or not password:
+ results["failed"] += 1
+ results["details"].append({
+ "email": email or "unknown",
+ "status": "failed",
+ "message": "缺少邮箱或密码"
+ })
+ if progress_callback:
+ progress_callback(i + 1, len(accounts), email, "failed", "缺少邮箱或密码")
+ continue
+
+ try:
+ success, result = s2a_api_authorize(email, password, proxy)
+
+ if success:
+ results["success"] += 1
+ account_id = result.get("id", "") if result else ""
+ results["details"].append({
+ "email": email,
+ "status": "success",
+ "message": f"ID: {account_id}"
+ })
+ if progress_callback:
+ progress_callback(i + 1, len(accounts), email, "success", f"ID: {account_id}")
+ else:
+ results["failed"] += 1
+ results["details"].append({
+ "email": email,
+ "status": "failed",
+ "message": "授权失败"
+ })
+ if progress_callback:
+ progress_callback(i + 1, len(accounts), email, "failed", "授权失败")
+
+ except Exception as e:
+ results["failed"] += 1
+ results["details"].append({
+ "email": email,
+ "status": "failed",
+ "message": str(e)
+ })
+ if progress_callback:
+ progress_callback(i + 1, len(accounts), email, "failed", str(e))
+
+ log.success(f"批量授权完成: 成功 {results['success']}, 失败 {results['failed']}")
+ return results
+
+
+def s2a_api_authorize_single(
+ email: str,
+ password: str,
+ proxy: str = None
+) -> Tuple[bool, str]:
+ """单个账号 API 授权 (简化返回值)
+
+ Args:
+ email: 账号邮箱
+ password: 账号密码
+ proxy: 代理地址 (可选)
+
+ Returns:
+ tuple: (是否成功, 消息)
+ """
+ success, result = s2a_api_authorize(email, password, proxy)
+
+ if success:
+ account_id = result.get("id", "") if result else ""
+ return True, f"授权成功 (ID: {account_id})"
+ else:
+ return False, "授权失败"
diff --git a/telegram_bot.py b/telegram_bot.py
index 7bac25a..77a5b5a 100644
--- a/telegram_bot.py
+++ b/telegram_bot.py
@@ -48,6 +48,7 @@ from config import (
S2A_GROUP_NAMES,
S2A_GROUP_IDS,
S2A_ADMIN_KEY,
+ S2A_API_MODE,
BROWSER_RANDOM_FINGERPRINT,
batch_remove_teams_by_names,
)
@@ -55,7 +56,8 @@ from utils import load_team_tracker, get_all_incomplete_accounts, save_team_trac
from bot_notifier import BotNotifier, set_notifier, progress_finish
from s2a_service import (
s2a_get_dashboard_stats, format_dashboard_stats, s2a_get_keys_with_usage, format_keys_usage,
- s2a_get_error_accounts, s2a_delete_account, s2a_batch_delete_error_accounts
+ s2a_get_error_accounts, s2a_delete_account, s2a_batch_delete_error_accounts,
+ s2a_api_authorize_single, s2a_batch_api_authorize
)
from email_service import gptmail_service, unified_create_email
from logger import log
@@ -159,6 +161,7 @@ class ProvisionerBot:
("include_owners", self.cmd_include_owners),
("reload", self.cmd_reload),
("s2a_config", self.cmd_s2a_config),
+ ("api_auth", self.cmd_api_auth),
("clean", self.cmd_clean),
("clean_errors", self.cmd_clean_errors),
("clean_teams", self.cmd_clean_teams),
@@ -287,6 +290,7 @@ class ProvisionerBot:
BotCommand("keys_usage", "查看 API 密钥用量"),
BotCommand("stock", "查看账号库存"),
BotCommand("s2a_config", "配置 S2A 参数"),
+ BotCommand("api_auth", "API 模式授权账号"),
BotCommand("import", "导入账号到 team.json"),
BotCommand("verify", "验证未验证的账号"),
BotCommand("verify_all", "强制重新验证所有账号"),
@@ -355,6 +359,7 @@ class ProvisionerBot:
/keys_usage - 查看 API 密钥用量
/stock - 查看账号库存
/s2a_config - 配置 S2A 参数
+/api_auth - API 模式授权 (无需浏览器)
/clean_errors - 清理错误状态账号
🧹 清理管理:
@@ -1072,6 +1077,92 @@ class ProvisionerBot:
except Exception as e:
await update.message.reply_text(f"❌ 修改配置失败: {e}")
+ @admin_only
+ async def cmd_api_auth(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
+ """使用纯 API 模式授权账号到 S2A (无需浏览器)
+
+ 用法:
+ /api_auth email password - 授权单个账号
+ /api_auth - 显示帮助信息
+ """
+ if AUTH_PROVIDER != "s2a":
+ await update.message.reply_text(
+ "❌ 此命令仅在 S2A 模式下可用\n"
+ "当前授权服务: " + AUTH_PROVIDER
+ )
+ return
+
+ # 无参数时显示帮助
+ if not context.args:
+ api_mode_status = "✅ 已启用" if S2A_API_MODE else "❌ 未启用"
+ lines = [
+ "🔐 S2A API 授权",
+ "",
+ f"API 模式: {api_mode_status}",
+ "",
+ "用法:",
+ "/api_auth email password",
+ "",
+ "示例:",
+ "/api_auth test@example.com MyPassword123",
+ "",
+ "说明:",
+ "• 使用纯 API 方式完成 OAuth 授权",
+ "• 无需浏览器,更快更稳定",
+ "• 需要安装 curl_cffi: pip install curl_cffi",
+ "",
+ "💡 提示:",
+ "• 在 config.toml 中设置 s2a.api_mode = true",
+ "• 可让所有 S2A 授权自动使用 API 模式",
+ ]
+ await update.message.reply_text("\n".join(lines), parse_mode="HTML")
+ return
+
+ # 解析参数
+ if len(context.args) < 2:
+ await update.message.reply_text(
+ "❌ 参数不足\n"
+ "用法: /api_auth "
+ )
+ return
+
+ email = context.args[0]
+ password = " ".join(context.args[1:]) # 密码可能包含空格
+
+ # 发送处理中消息
+ msg = await update.message.reply_text(
+ f"🔄 正在授权: {email}\n"
+ "请稍候...",
+ parse_mode="HTML"
+ )
+
+ try:
+ # 执行 API 授权
+ success, message = s2a_api_authorize_single(email, password)
+
+ if success:
+ await msg.edit_text(
+ f"✅ 授权成功\n\n"
+ f"📧 邮箱: {email}\n"
+ f"📝 {message}",
+ parse_mode="HTML"
+ )
+ else:
+ await msg.edit_text(
+ f"❌ 授权失败\n\n"
+ f"📧 邮箱: {email}\n"
+ f"📝 {message}",
+ parse_mode="HTML"
+ )
+
+ except Exception as e:
+ await msg.edit_text(
+ f"❌ 授权异常\n\n"
+ f"📧 邮箱: {email}\n"
+ f"📝 错误: {str(e)}",
+ parse_mode="HTML"
+ )
+
@admin_only
async def cmd_run(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""启动处理指定数量的 Team - 交互式选择"""