diff --git a/browser_automation.py b/browser_automation.py index 908dc40..7af8580 100644 --- a/browser_automation.py +++ b/browser_automation.py @@ -2121,6 +2121,15 @@ def register_and_authorize(email: str, password: str, use_api_register: bool = T - CRS 模式: codex_data 包含 tokens - CPA/S2A 模式: codex_data 为 None (后台自动处理) """ + # 获取授权回调锁 (CPA/S2A 模式需要串行授权) + auth_lock = None + if AUTH_PROVIDER in ("cpa", "s2a"): + try: + import run + auth_lock = run._auth_callback_lock + except (ImportError, AttributeError): + pass + with browser_context_with_retry(max_browser_retries=2) as ctx: for attempt in ctx.attempts(): try: @@ -2144,19 +2153,31 @@ def register_and_authorize(email: str, password: str, use_api_register: bool = T # 短暂等待确保注册完成 time.sleep(0.5) - # 根据配置选择授权方式 - if AUTH_PROVIDER == "cpa": - # CPA 模式: 授权成功即完成,后台自动处理账号 - success = perform_cpa_authorization(ctx.page, email, password) - return True, None if success else (True, None) # 注册成功,授权可能失败 - elif AUTH_PROVIDER == "s2a": - # S2A 模式: 授权成功即完成,后台自动处理账号 - success = perform_s2a_authorization(ctx.page, email, password) - return True, None if success else (True, None) # 注册成功,授权可能失败 - else: - # CRS 模式: 需要 codex_data - codex_data = perform_codex_authorization(ctx.page, email, password) - return True, codex_data + # ========== 授权流程 - CPA/S2A 需要串行执行 (避免回调端口冲突) ========== + # 在调用授权函数之前获取锁,确保浏览器不会在等待锁时空闲断开 + if auth_lock: + log.step("等待授权回调锁...") + auth_lock.acquire() + log.step("获取授权回调锁,开始授权...") + + try: + # 根据配置选择授权方式 + if AUTH_PROVIDER == "cpa": + # CPA 模式: 授权成功即完成,后台自动处理账号 + success = perform_cpa_authorization(ctx.page, email, password) + return True, None if success else (True, None) # 注册成功,授权可能失败 + elif AUTH_PROVIDER == "s2a": + # S2A 模式: 授权成功即完成,后台自动处理账号 + success = perform_s2a_authorization(ctx.page, email, password) + return True, None if success else (True, None) # 注册成功,授权可能失败 + else: + # CRS 模式: 需要 codex_data + codex_data = perform_codex_authorization(ctx.page, email, password) + return True, codex_data + finally: + if auth_lock: + log.step("释放授权回调锁") + auth_lock.release() except Exception as e: ctx.handle_error(e) @@ -2178,42 +2199,62 @@ def authorize_only(email: str, password: str) -> tuple[bool, dict]: - CRS 模式: codex_data 包含 tokens - CPA/S2A 模式: codex_data 为 None (后台自动处理) """ + # 获取授权回调锁 (CPA/S2A 模式需要串行授权) + auth_lock = None + if AUTH_PROVIDER in ("cpa", "s2a"): + try: + import run + auth_lock = run._auth_callback_lock + except (ImportError, AttributeError): + pass + with browser_context_with_retry(max_browser_retries=2) as ctx: for attempt in ctx.attempts(): try: - # 根据配置选择授权方式 - if AUTH_PROVIDER == "cpa": - log.info("已注册账号,使用 CPA 进行 Codex 授权...", icon="auth") - success = perform_cpa_authorization(ctx.page, email, password) - if success: - return True, None # CPA 模式不返回 codex_data - else: - if attempt < ctx.max_retries - 1: - log.warning("CPA 授权失败,准备重试...") - continue - return False, None - elif AUTH_PROVIDER == "s2a": - log.info("已注册账号,使用 S2A 进行 Codex 授权...", icon="auth") - success = perform_s2a_authorization(ctx.page, email, password) - if success: - return True, None # S2A 模式不返回 codex_data - else: - if attempt < ctx.max_retries - 1: - log.warning("S2A 授权失败,准备重试...") - continue - return False, None - else: - # CRS 模式 - log.info("已注册账号,直接进行 Codex 授权...", icon="auth") - codex_data = perform_codex_authorization(ctx.page, email, password) + # ========== 授权流程 - CPA/S2A 需要串行执行 (避免回调端口冲突) ========== + if auth_lock: + log.step("等待授权回调锁...") + auth_lock.acquire() + log.step("获取授权回调锁,开始授权...") - if codex_data: - return True, codex_data + try: + # 根据配置选择授权方式 + if AUTH_PROVIDER == "cpa": + log.info("已注册账号,使用 CPA 进行 Codex 授权...", icon="auth") + success = perform_cpa_authorization(ctx.page, email, password) + if success: + return True, None # CPA 模式不返回 codex_data + else: + if attempt < ctx.max_retries - 1: + log.warning("CPA 授权失败,准备重试...") + continue + return False, None + elif AUTH_PROVIDER == "s2a": + log.info("已注册账号,使用 S2A 进行 Codex 授权...", icon="auth") + success = perform_s2a_authorization(ctx.page, email, password) + if success: + return True, None # S2A 模式不返回 codex_data + else: + if attempt < ctx.max_retries - 1: + log.warning("S2A 授权失败,准备重试...") + continue + return False, None else: - if attempt < ctx.max_retries - 1: - log.warning("授权失败,准备重试...") - continue - return False, None + # CRS 模式 + log.info("已注册账号,直接进行 Codex 授权...", icon="auth") + codex_data = perform_codex_authorization(ctx.page, email, password) + + if codex_data: + return True, codex_data + else: + if attempt < ctx.max_retries - 1: + log.warning("授权失败,准备重试...") + continue + return False, None + finally: + if auth_lock: + log.step("释放授权回调锁") + auth_lock.release() except Exception as e: ctx.handle_error(e) @@ -2243,17 +2284,8 @@ def perform_cpa_authorization(page, email: str, password: str) -> bool: """ log.info(f"开始 CPA 授权: {email}", icon="code") - # ========== 授权流程 - 需要串行执行 (避免回调端口冲突) ========== - try: - import run - auth_lock = run._auth_callback_lock - except (ImportError, AttributeError): - auth_lock = None - - if auth_lock: - log.step("等待授权锁...") - auth_lock.acquire() - log.step("获取授权锁,开始授权流程...") + # 注意: 授权回调锁已在上层函数 (register_and_authorize/authorize_only) 中获取 + # 这里不再重复获取锁 try: # 生成授权 URL @@ -2397,14 +2429,9 @@ def perform_cpa_authorization(page, email: str, password: str) -> bool: log.error("CPA 授权状态检查失败") return False - finally: - # 确保释放锁 - if auth_lock: - try: - auth_lock.release() - log.step("释放授权回调锁") - except RuntimeError: - pass # 锁可能已经被释放 + except Exception as e: + log.error(f"CPA 授权异常: {e}") + return False def perform_cpa_authorization_with_otp(page, email: str) -> bool: @@ -2654,17 +2681,8 @@ def perform_s2a_authorization(page, email: str, password: str) -> bool: log.info(f"开始 S2A 授权: {email}", icon="code") progress_update(phase="授权", step="开始 S2A 授权...") - # ========== 授权流程 - 需要串行执行 (避免回调端口冲突) ========== - try: - import run - auth_lock = run._auth_callback_lock - except (ImportError, AttributeError): - auth_lock = None - - if auth_lock: - log.step("等待授权锁...") - auth_lock.acquire() - log.step("获取授权锁,开始授权流程...") + # 注意: 授权回调锁已在上层函数 (register_and_authorize/authorize_only) 中获取 + # 这里不再重复获取锁 try: # 生成授权 URL @@ -2864,14 +2882,9 @@ def perform_s2a_authorization(page, email: str, password: str) -> bool: log.error("S2A 账号入库失败") return False - finally: - # 确保释放锁 - if auth_lock: - try: - auth_lock.release() - log.step("释放授权回调锁") - except RuntimeError: - pass # 锁可能已经被释放 + except Exception as e: + log.error(f"S2A 授权异常: {e}") + return False # ==================== 格式3专用: 登录获取 Session ==================== diff --git a/run.py b/run.py index 5dfd9c5..83b31dc 100644 --- a/run.py +++ b/run.py @@ -775,7 +775,7 @@ def process_accounts_concurrent( if max_workers is None: max_workers = CONCURRENT_WORKERS - stagger_delay = 2.0 # 线程错开启动间隔 (秒) + stagger_delay = 4.0 # 线程错开启动间隔 (秒) # 过滤已完成的账号 pending_accounts = [acc for acc in accounts if acc.get("status") != "completed"]