From 06eaff03b9aa8374ee4e0221b7be42da17d66290 Mon Sep 17 00:00:00 2001 From: kyx236 Date: Tue, 27 Jan 2026 09:59:16 +0800 Subject: [PATCH] 5 --- browser_automation.py | 259 +++++++++++++++++++++--------------------- 1 file changed, 129 insertions(+), 130 deletions(-) diff --git a/browser_automation.py b/browser_automation.py index e34cddd..908dc40 100644 --- a/browser_automation.py +++ b/browser_automation.py @@ -2243,67 +2243,7 @@ def perform_cpa_authorization(page, email: str, password: str) -> bool: """ log.info(f"开始 CPA 授权: {email}", icon="code") - # 生成授权 URL - auth_url, state = cpa_generate_auth_url() - if not auth_url or not state: - log.error("无法获取 CPA 授权 URL") - return False - - # 打开授权页面 - log.step("打开 CPA 授权页面...") - log.info(f"[URL] CPA授权URL: {auth_url}", icon="browser") - page.get(auth_url) - wait_for_page_stable(page, timeout=5) - log_current_url(page, "CPA授权页面加载完成", force=True) - - # 检测错误页面 - check_and_handle_error_page(page) - - try: - # 输入邮箱 - log.step("输入邮箱...") - email_input = wait_for_element(page, 'css:input[type="email"]', timeout=10) - if not email_input: - email_input = wait_for_element(page, 'css:input[name="email"]', timeout=5) - if email_input: - type_slowly(page, 'css:input[type="email"], input[name="email"]', email, base_delay=0.06) - - # 点击继续 - log.step("点击继续...") - continue_btn = wait_for_element(page, 'css:button[type="submit"]', timeout=5) - if continue_btn: - old_url = page.url - continue_btn.click() - wait_for_url_change(page, old_url, timeout=8) - log_url_change(page, old_url, "CPA-输入邮箱后点击继续") - except Exception as e: - log.warning(f"CPA 邮箱输入步骤异常: {e}") - - log_current_url(page, "CPA-邮箱步骤完成后") - - # 输入密码 - current_url = page.url - if "/password" in current_url: - try: - log.step("输入密码...") - password_input = wait_for_element(page, 'css:input[type="password"]', timeout=10) - - if password_input: - type_slowly(page, 'css:input[type="password"]', password, base_delay=0.06) - - log.step("点击继续...") - continue_btn = wait_for_element(page, 'css:button[type="submit"]', timeout=5) - if continue_btn: - old_url = page.url - continue_btn.click() - wait_for_url_change(page, old_url, timeout=8) - log_url_change(page, old_url, "CPA-输入密码后点击继续") - except Exception as e: - log.warning(f"CPA 密码输入步骤异常: {e}") - - log_current_url(page, "CPA-密码步骤完成后") - - # ========== 授权回调阶段 - 需要串行执行 ========== + # ========== 授权流程 - 需要串行执行 (避免回调端口冲突) ========== try: import run auth_lock = run._auth_callback_lock @@ -2311,11 +2251,71 @@ def perform_cpa_authorization(page, email: str, password: str) -> bool: auth_lock = None if auth_lock: - log.step("等待授权回调锁...") + log.step("等待授权锁...") auth_lock.acquire() - log.step("获取授权回调锁,开始授权...") + log.step("获取授权锁,开始授权流程...") try: + # 生成授权 URL + auth_url, state = cpa_generate_auth_url() + if not auth_url or not state: + log.error("无法获取 CPA 授权 URL") + return False + + # 打开授权页面 + log.step("打开 CPA 授权页面...") + log.info(f"[URL] CPA授权URL: {auth_url}", icon="browser") + page.get(auth_url) + wait_for_page_stable(page, timeout=5) + log_current_url(page, "CPA授权页面加载完成", force=True) + + # 检测错误页面 + check_and_handle_error_page(page) + + try: + # 输入邮箱 + log.step("输入邮箱...") + email_input = wait_for_element(page, 'css:input[type="email"]', timeout=10) + if not email_input: + email_input = wait_for_element(page, 'css:input[name="email"]', timeout=5) + if email_input: + type_slowly(page, 'css:input[type="email"], input[name="email"]', email, base_delay=0.06) + + # 点击继续 + log.step("点击继续...") + continue_btn = wait_for_element(page, 'css:button[type="submit"]', timeout=5) + if continue_btn: + old_url = page.url + continue_btn.click() + wait_for_url_change(page, old_url, timeout=8) + log_url_change(page, old_url, "CPA-输入邮箱后点击继续") + except Exception as e: + log.warning(f"CPA 邮箱输入步骤异常: {e}") + + log_current_url(page, "CPA-邮箱步骤完成后") + + # 输入密码 + current_url = page.url + if "/password" in current_url: + try: + log.step("输入密码...") + password_input = wait_for_element(page, 'css:input[type="password"]', timeout=10) + + if password_input: + type_slowly(page, 'css:input[type="password"]', password, base_delay=0.06) + + log.step("点击继续...") + continue_btn = wait_for_element(page, 'css:button[type="submit"]', timeout=5) + if continue_btn: + old_url = page.url + continue_btn.click() + wait_for_url_change(page, old_url, timeout=8) + log_url_change(page, old_url, "CPA-输入密码后点击继续") + except Exception as e: + log.warning(f"CPA 密码输入步骤异常: {e}") + + log_current_url(page, "CPA-密码步骤完成后") + # 等待授权回调 max_wait = 45 start_time = time.time() @@ -2654,71 +2654,7 @@ def perform_s2a_authorization(page, email: str, password: str) -> bool: log.info(f"开始 S2A 授权: {email}", icon="code") progress_update(phase="授权", step="开始 S2A 授权...") - # 生成授权 URL - auth_url, session_id = s2a_generate_auth_url() - if not auth_url or not session_id: - log.error("无法获取 S2A 授权 URL") - return False - - # 打开授权页面 - progress_update(phase="授权", step="打开授权页面...") - log.step("打开 S2A 授权页面...") - log.info(f"[URL] S2A授权URL: {auth_url}", icon="browser") - page.get(auth_url) - wait_for_page_stable(page, timeout=5) - log_current_url(page, "S2A授权页面加载完成", force=True) - - # 检测错误页面 - check_and_handle_error_page(page) - - try: - # 输入邮箱 - progress_update(phase="授权", step="输入邮箱...") - log.step("输入邮箱...") - email_input = wait_for_element(page, 'css:input[type="email"]', timeout=10) - if not email_input: - email_input = wait_for_element(page, 'css:input[name="email"]', timeout=5) - if email_input: - type_slowly(page, 'css:input[type="email"], input[name="email"]', email, base_delay=0.06) - - # 点击继续 - log.step("点击继续...") - continue_btn = wait_for_element(page, 'css:button[type="submit"]', timeout=5) - if continue_btn: - old_url = page.url - continue_btn.click() - wait_for_url_change(page, old_url, timeout=8) - log_url_change(page, old_url, "S2A-输入邮箱后点击继续") - except Exception as e: - log.warning(f"S2A 邮箱输入步骤异常: {e}") - - log_current_url(page, "S2A-邮箱步骤完成后") - - # 输入密码 - current_url = page.url - if "/password" in current_url: - try: - progress_update(phase="授权", step="输入密码...") - log.step("输入密码...") - password_input = wait_for_element(page, 'css:input[type="password"]', timeout=10) - - if password_input: - type_slowly(page, 'css:input[type="password"]', password, base_delay=0.06) - - log.step("点击继续...") - continue_btn = wait_for_element(page, 'css:button[type="submit"]', timeout=5) - if continue_btn: - old_url = page.url - continue_btn.click() - wait_for_url_change(page, old_url, timeout=8) - log_url_change(page, old_url, "S2A-输入密码后点击继续") - except Exception as e: - log.warning(f"S2A 密码输入步骤异常: {e}") - - log_current_url(page, "S2A-密码步骤完成后") - - # ========== 授权回调阶段 - 需要串行执行 ========== - # 获取授权回调锁,确保同一时间只有一个线程进行授权回调 + # ========== 授权流程 - 需要串行执行 (避免回调端口冲突) ========== try: import run auth_lock = run._auth_callback_lock @@ -2726,11 +2662,74 @@ def perform_s2a_authorization(page, email: str, password: str) -> bool: auth_lock = None if auth_lock: - log.step("等待授权回调锁...") + log.step("等待授权锁...") auth_lock.acquire() - log.step("获取授权回调锁,开始授权...") + log.step("获取授权锁,开始授权流程...") try: + # 生成授权 URL + auth_url, session_id = s2a_generate_auth_url() + if not auth_url or not session_id: + log.error("无法获取 S2A 授权 URL") + return False + + # 打开授权页面 + progress_update(phase="授权", step="打开授权页面...") + log.step("打开 S2A 授权页面...") + log.info(f"[URL] S2A授权URL: {auth_url}", icon="browser") + page.get(auth_url) + wait_for_page_stable(page, timeout=5) + log_current_url(page, "S2A授权页面加载完成", force=True) + + # 检测错误页面 + check_and_handle_error_page(page) + + try: + # 输入邮箱 + progress_update(phase="授权", step="输入邮箱...") + log.step("输入邮箱...") + email_input = wait_for_element(page, 'css:input[type="email"]', timeout=10) + if not email_input: + email_input = wait_for_element(page, 'css:input[name="email"]', timeout=5) + if email_input: + type_slowly(page, 'css:input[type="email"], input[name="email"]', email, base_delay=0.06) + + # 点击继续 + log.step("点击继续...") + continue_btn = wait_for_element(page, 'css:button[type="submit"]', timeout=5) + if continue_btn: + old_url = page.url + continue_btn.click() + wait_for_url_change(page, old_url, timeout=8) + log_url_change(page, old_url, "S2A-输入邮箱后点击继续") + except Exception as e: + log.warning(f"S2A 邮箱输入步骤异常: {e}") + + log_current_url(page, "S2A-邮箱步骤完成后") + + # 输入密码 + current_url = page.url + if "/password" in current_url: + try: + progress_update(phase="授权", step="输入密码...") + log.step("输入密码...") + password_input = wait_for_element(page, 'css:input[type="password"]', timeout=10) + + if password_input: + type_slowly(page, 'css:input[type="password"]', password, base_delay=0.06) + + log.step("点击继续...") + continue_btn = wait_for_element(page, 'css:button[type="submit"]', timeout=5) + if continue_btn: + old_url = page.url + continue_btn.click() + wait_for_url_change(page, old_url, timeout=8) + log_url_change(page, old_url, "S2A-输入密码后点击继续") + except Exception as e: + log.warning(f"S2A 密码输入步骤异常: {e}") + + log_current_url(page, "S2A-密码步骤完成后") + # 检查是否在授权确认页面,如果是则点击授权按钮 time.sleep(1) current_url = page.url