This commit is contained in:
2026-01-27 10:08:10 +08:00
parent 06eaff03b9
commit ad10d1f2b7
2 changed files with 96 additions and 83 deletions

View File

@@ -2121,6 +2121,15 @@ def register_and_authorize(email: str, password: str, use_api_register: bool = T
- CRS 模式: codex_data 包含 tokens
- CPA/S2A 模式: codex_data 为 None (后台自动处理)
"""
# 获取授权回调锁 (CPA/S2A 模式需要串行授权)
auth_lock = None
if AUTH_PROVIDER in ("cpa", "s2a"):
try:
import run
auth_lock = run._auth_callback_lock
except (ImportError, AttributeError):
pass
with browser_context_with_retry(max_browser_retries=2) as ctx:
for attempt in ctx.attempts():
try:
@@ -2144,19 +2153,31 @@ def register_and_authorize(email: str, password: str, use_api_register: bool = T
# 短暂等待确保注册完成
time.sleep(0.5)
# 根据配置选择授权方式
if AUTH_PROVIDER == "cpa":
# CPA 模式: 授权成功即完成,后台自动处理账号
success = perform_cpa_authorization(ctx.page, email, password)
return True, None if success else (True, None) # 注册成功,授权可能失败
elif AUTH_PROVIDER == "s2a":
# S2A 模式: 授权成功即完成,后台自动处理账号
success = perform_s2a_authorization(ctx.page, email, password)
return True, None if success else (True, None) # 注册成功,授权可能失败
else:
# CRS 模式: 需要 codex_data
codex_data = perform_codex_authorization(ctx.page, email, password)
return True, codex_data
# ========== 授权流程 - CPA/S2A 需要串行执行 (避免回调端口冲突) ==========
# 在调用授权函数之前获取锁,确保浏览器不会在等待锁时空闲断开
if auth_lock:
log.step("等待授权回调锁...")
auth_lock.acquire()
log.step("获取授权回调锁,开始授权...")
try:
# 根据配置选择授权方式
if AUTH_PROVIDER == "cpa":
# CPA 模式: 授权成功即完成,后台自动处理账号
success = perform_cpa_authorization(ctx.page, email, password)
return True, None if success else (True, None) # 注册成功,授权可能失败
elif AUTH_PROVIDER == "s2a":
# S2A 模式: 授权成功即完成,后台自动处理账号
success = perform_s2a_authorization(ctx.page, email, password)
return True, None if success else (True, None) # 注册成功,授权可能失败
else:
# CRS 模式: 需要 codex_data
codex_data = perform_codex_authorization(ctx.page, email, password)
return True, codex_data
finally:
if auth_lock:
log.step("释放授权回调锁")
auth_lock.release()
except Exception as e:
ctx.handle_error(e)
@@ -2178,42 +2199,62 @@ def authorize_only(email: str, password: str) -> tuple[bool, dict]:
- CRS 模式: codex_data 包含 tokens
- CPA/S2A 模式: codex_data 为 None (后台自动处理)
"""
# 获取授权回调锁 (CPA/S2A 模式需要串行授权)
auth_lock = None
if AUTH_PROVIDER in ("cpa", "s2a"):
try:
import run
auth_lock = run._auth_callback_lock
except (ImportError, AttributeError):
pass
with browser_context_with_retry(max_browser_retries=2) as ctx:
for attempt in ctx.attempts():
try:
# 根据配置选择授权方式
if AUTH_PROVIDER == "cpa":
log.info("已注册账号,使用 CPA 进行 Codex 授权...", icon="auth")
success = perform_cpa_authorization(ctx.page, email, password)
if success:
return True, None # CPA 模式不返回 codex_data
else:
if attempt < ctx.max_retries - 1:
log.warning("CPA 授权失败,准备重试...")
continue
return False, None
elif AUTH_PROVIDER == "s2a":
log.info("已注册账号,使用 S2A 进行 Codex 授权...", icon="auth")
success = perform_s2a_authorization(ctx.page, email, password)
if success:
return True, None # S2A 模式不返回 codex_data
else:
if attempt < ctx.max_retries - 1:
log.warning("S2A 授权失败,准备重试...")
continue
return False, None
else:
# CRS 模式
log.info("已注册账号,直接进行 Codex 授权...", icon="auth")
codex_data = perform_codex_authorization(ctx.page, email, password)
# ========== 授权流程 - CPA/S2A 需要串行执行 (避免回调端口冲突) ==========
if auth_lock:
log.step("等待授权回调锁...")
auth_lock.acquire()
log.step("获取授权回调锁,开始授权...")
if codex_data:
return True, codex_data
try:
# 根据配置选择授权方式
if AUTH_PROVIDER == "cpa":
log.info("已注册账号,使用 CPA 进行 Codex 授权...", icon="auth")
success = perform_cpa_authorization(ctx.page, email, password)
if success:
return True, None # CPA 模式不返回 codex_data
else:
if attempt < ctx.max_retries - 1:
log.warning("CPA 授权失败,准备重试...")
continue
return False, None
elif AUTH_PROVIDER == "s2a":
log.info("已注册账号,使用 S2A 进行 Codex 授权...", icon="auth")
success = perform_s2a_authorization(ctx.page, email, password)
if success:
return True, None # S2A 模式不返回 codex_data
else:
if attempt < ctx.max_retries - 1:
log.warning("S2A 授权失败,准备重试...")
continue
return False, None
else:
if attempt < ctx.max_retries - 1:
log.warning("授权失败,准备重试...")
continue
return False, None
# CRS 模式
log.info("已注册账号,直接进行 Codex 授权...", icon="auth")
codex_data = perform_codex_authorization(ctx.page, email, password)
if codex_data:
return True, codex_data
else:
if attempt < ctx.max_retries - 1:
log.warning("授权失败,准备重试...")
continue
return False, None
finally:
if auth_lock:
log.step("释放授权回调锁")
auth_lock.release()
except Exception as e:
ctx.handle_error(e)
@@ -2243,17 +2284,8 @@ def perform_cpa_authorization(page, email: str, password: str) -> bool:
"""
log.info(f"开始 CPA 授权: {email}", icon="code")
# ========== 授权流程 - 需要串行执行 (避免回调端口冲突) ==========
try:
import run
auth_lock = run._auth_callback_lock
except (ImportError, AttributeError):
auth_lock = None
if auth_lock:
log.step("等待授权锁...")
auth_lock.acquire()
log.step("获取授权锁,开始授权流程...")
# 注意: 授权回调锁已在上层函数 (register_and_authorize/authorize_only) 中获取
# 这里不再重复获取锁
try:
# 生成授权 URL
@@ -2397,14 +2429,9 @@ def perform_cpa_authorization(page, email: str, password: str) -> bool:
log.error("CPA 授权状态检查失败")
return False
finally:
# 确保释放锁
if auth_lock:
try:
auth_lock.release()
log.step("释放授权回调锁")
except RuntimeError:
pass # 锁可能已经被释放
except Exception as e:
log.error(f"CPA 授权异常: {e}")
return False
def perform_cpa_authorization_with_otp(page, email: str) -> bool:
@@ -2654,17 +2681,8 @@ def perform_s2a_authorization(page, email: str, password: str) -> bool:
log.info(f"开始 S2A 授权: {email}", icon="code")
progress_update(phase="授权", step="开始 S2A 授权...")
# ========== 授权流程 - 需要串行执行 (避免回调端口冲突) ==========
try:
import run
auth_lock = run._auth_callback_lock
except (ImportError, AttributeError):
auth_lock = None
if auth_lock:
log.step("等待授权锁...")
auth_lock.acquire()
log.step("获取授权锁,开始授权流程...")
# 注意: 授权回调锁已在上层函数 (register_and_authorize/authorize_only) 中获取
# 这里不再重复获取锁
try:
# 生成授权 URL
@@ -2864,14 +2882,9 @@ def perform_s2a_authorization(page, email: str, password: str) -> bool:
log.error("S2A 账号入库失败")
return False
finally:
# 确保释放锁
if auth_lock:
try:
auth_lock.release()
log.step("释放授权回调锁")
except RuntimeError:
pass # 锁可能已经被释放
except Exception as e:
log.error(f"S2A 授权异常: {e}")
return False
# ==================== 格式3专用: 登录获取 Session ====================

2
run.py
View File

@@ -775,7 +775,7 @@ def process_accounts_concurrent(
if max_workers is None:
max_workers = CONCURRENT_WORKERS
stagger_delay = 2.0 # 线程错开启动间隔 (秒)
stagger_delay = 4.0 # 线程错开启动间隔 (秒)
# 过滤已完成的账号
pending_accounts = [acc for acc in accounts if acc.get("status") != "completed"]