This commit is contained in:
2026-01-16 00:32:23 +08:00
parent 7e8c3784c1
commit 64707768f8
4 changed files with 273 additions and 20 deletions

View File

@@ -26,6 +26,10 @@ from cpa_service import (
cpa_poll_auth_status,
is_cpa_callback_url
)
from s2a_service import (
s2a_generate_auth_url,
s2a_create_account_from_oauth
)
from logger import log
@@ -1775,7 +1779,7 @@ def register_and_authorize(email: str, password: str) -> tuple:
tuple: (register_success, codex_data)
- register_success: True/False/"domain_blacklisted"
- CRS 模式: codex_data 包含 tokens
- CPA 模式: codex_data 为 None (后台自动处理)
- CPA/S2A 模式: codex_data 为 None (后台自动处理)
"""
with browser_context_with_retry(max_browser_retries=2) as ctx:
for attempt in ctx.attempts():
@@ -1802,6 +1806,10 @@ def register_and_authorize(email: str, password: str) -> tuple:
# CPA 模式: 授权成功即完成,后台自动处理账号
success = perform_cpa_authorization(ctx.page, email, password)
return True, None if success else (True, None) # 注册成功,授权可能失败
elif AUTH_PROVIDER == "s2a":
# S2A 模式: 授权成功即完成,后台自动处理账号
success = perform_s2a_authorization(ctx.page, email, password)
return True, None if success else (True, None) # 注册成功,授权可能失败
else:
# CRS 模式: 需要 codex_data
codex_data = perform_codex_authorization(ctx.page, email, password)
@@ -1825,7 +1833,7 @@ def authorize_only(email: str, password: str) -> tuple[bool, dict]:
Returns:
tuple: (success, codex_data)
- CRS 模式: codex_data 包含 tokens
- CPA 模式: codex_data 为 None (后台自动处理)
- CPA/S2A 模式: codex_data 为 None (后台自动处理)
"""
with browser_context_with_retry(max_browser_retries=2) as ctx:
for attempt in ctx.attempts():
@@ -1841,6 +1849,16 @@ def authorize_only(email: str, password: str) -> tuple[bool, dict]:
log.warning("CPA 授权失败,准备重试...")
continue
return False, None
elif AUTH_PROVIDER == "s2a":
log.info("已注册账号,使用 S2A 进行 Codex 授权...", icon="auth")
success = perform_s2a_authorization(ctx.page, email, password)
if success:
return True, None # S2A 模式不返回 codex_data
else:
if attempt < ctx.max_retries - 1:
log.warning("S2A 授权失败,准备重试...")
continue
return False, None
else:
# CRS 模式
log.info("已注册账号,直接进行 Codex 授权...", icon="auth")
@@ -2244,6 +2262,161 @@ def perform_cpa_authorization_with_otp(page, email: str) -> bool:
return False
# ==================== S2A 授权函数 ====================
def perform_s2a_authorization(page, email: str, password: str) -> bool:
"""执行 S2A 授权流程 (密码登录)
Args:
page: 浏览器实例
email: 邮箱地址
password: 密码
Returns:
bool: 授权是否成功
"""
log.info(f"开始 S2A 授权: {email}", icon="code")
# 生成授权 URL
auth_url, session_id = s2a_generate_auth_url()
if not auth_url or not session_id:
log.error("无法获取 S2A 授权 URL")
return False
# 打开授权页面
log.step("打开 S2A 授权页面...")
log.info(f"[URL] S2A授权URL: {auth_url}", icon="browser")
page.get(auth_url)
wait_for_page_stable(page, timeout=5)
log_current_url(page, "S2A授权页面加载完成", force=True)
# 检测错误页面
check_and_handle_error_page(page)
try:
# 输入邮箱
log.step("输入邮箱...")
email_input = wait_for_element(page, 'css:input[type="email"]', timeout=10)
if not email_input:
email_input = wait_for_element(page, 'css:input[name="email"]', timeout=5)
if email_input:
type_slowly(page, 'css:input[type="email"], input[name="email"]', email, base_delay=0.06)
# 点击继续
log.step("点击继续...")
continue_btn = wait_for_element(page, 'css:button[type="submit"]', timeout=5)
if continue_btn:
old_url = page.url
continue_btn.click()
wait_for_url_change(page, old_url, timeout=8)
log_url_change(page, old_url, "S2A-输入邮箱后点击继续")
except Exception as e:
log.warning(f"S2A 邮箱输入步骤异常: {e}")
log_current_url(page, "S2A-邮箱步骤完成后")
# 输入密码
current_url = page.url
if "/password" in current_url:
try:
log.step("输入密码...")
password_input = wait_for_element(page, 'css:input[type="password"]', timeout=10)
if password_input:
type_slowly(page, 'css:input[type="password"]', password, base_delay=0.06)
log.step("点击继续...")
continue_btn = wait_for_element(page, 'css:button[type="submit"]', timeout=5)
if continue_btn:
old_url = page.url
continue_btn.click()
wait_for_url_change(page, old_url, timeout=8)
log_url_change(page, old_url, "S2A-输入密码后点击继续")
except Exception as e:
log.warning(f"S2A 密码输入步骤异常: {e}")
log_current_url(page, "S2A-密码步骤完成后")
# 等待授权回调 (S2A 使用 localhost 回调)
max_wait = 45
start_time = time.time()
callback_url = None
progress_shown = False
last_url_in_loop = None
log.step(f"等待 S2A 授权回调 (最多 {max_wait}s)...")
while time.time() - start_time < max_wait:
try:
current_url = page.url
# 记录 URL 变化
if current_url != last_url_in_loop:
log_current_url(page, "S2A等待回调中")
last_url_in_loop = current_url
# 检查是否到达回调页面 (S2A 使用 localhost:1455 或类似端口)
if "localhost" in current_url and "code=" in current_url:
if progress_shown:
log.progress_clear()
progress_shown = False
callback_url = current_url
log.success(f"捕获 S2A 回调 URL")
break
# 检测错误
check_and_handle_error(page)
# 检查是否需要点击 Authorize
try:
auth_btn = page.ele('css:button[type="submit"]', timeout=0.5)
if auth_btn:
btn_text = auth_btn.text.lower() if auth_btn.text else ""
if 'authorize' in btn_text or '授权' in btn_text or 'continue' in btn_text:
log.step("点击授权按钮...")
old_url = page.url
auth_btn.click()
wait_for_url_change(page, old_url, timeout=5)
log_url_change(page, old_url, "S2A点击授权按钮后")
except Exception:
pass
elapsed = int(time.time() - start_time)
log.progress_inline(f"[S2A等待中... {elapsed}s]")
progress_shown = True
time.sleep(1.5)
except Exception as e:
if progress_shown:
log.progress_clear()
progress_shown = False
log.warning(f"S2A检查异常: {e}")
time.sleep(1.5)
if progress_shown:
log.progress_clear()
if not callback_url:
log.error("S2A 无法获取回调链接")
return False
# 从回调 URL 中提取 code
code = extract_code_from_url(callback_url)
if not code:
log.error("S2A 无法从回调链接提取授权码")
return False
# S2A 特有流程: 用授权码创建账号 (传入完整邮箱用于验证)
log.step("正在提交 S2A 授权码...")
result = s2a_create_account_from_oauth(code, session_id, name=email)
if result:
log.success("S2A 授权流程完成")
return True
else:
log.error("S2A 账号入库失败")
return False
# ==================== 格式3专用: 登录获取 Session ====================
def login_and_get_session(page, email: str, password: str) -> dict: