This commit is contained in:
2026-01-27 09:59:16 +08:00
parent a973343b48
commit 06eaff03b9

View File

@@ -2243,67 +2243,7 @@ def perform_cpa_authorization(page, email: str, password: str) -> bool:
""" """
log.info(f"开始 CPA 授权: {email}", icon="code") log.info(f"开始 CPA 授权: {email}", icon="code")
# 生成授权 URL # ========== 授权流程 - 需要串行执行 (避免回调端口冲突) ==========
auth_url, state = cpa_generate_auth_url()
if not auth_url or not state:
log.error("无法获取 CPA 授权 URL")
return False
# 打开授权页面
log.step("打开 CPA 授权页面...")
log.info(f"[URL] CPA授权URL: {auth_url}", icon="browser")
page.get(auth_url)
wait_for_page_stable(page, timeout=5)
log_current_url(page, "CPA授权页面加载完成", force=True)
# 检测错误页面
check_and_handle_error_page(page)
try:
# 输入邮箱
log.step("输入邮箱...")
email_input = wait_for_element(page, 'css:input[type="email"]', timeout=10)
if not email_input:
email_input = wait_for_element(page, 'css:input[name="email"]', timeout=5)
if email_input:
type_slowly(page, 'css:input[type="email"], input[name="email"]', email, base_delay=0.06)
# 点击继续
log.step("点击继续...")
continue_btn = wait_for_element(page, 'css:button[type="submit"]', timeout=5)
if continue_btn:
old_url = page.url
continue_btn.click()
wait_for_url_change(page, old_url, timeout=8)
log_url_change(page, old_url, "CPA-输入邮箱后点击继续")
except Exception as e:
log.warning(f"CPA 邮箱输入步骤异常: {e}")
log_current_url(page, "CPA-邮箱步骤完成后")
# 输入密码
current_url = page.url
if "/password" in current_url:
try:
log.step("输入密码...")
password_input = wait_for_element(page, 'css:input[type="password"]', timeout=10)
if password_input:
type_slowly(page, 'css:input[type="password"]', password, base_delay=0.06)
log.step("点击继续...")
continue_btn = wait_for_element(page, 'css:button[type="submit"]', timeout=5)
if continue_btn:
old_url = page.url
continue_btn.click()
wait_for_url_change(page, old_url, timeout=8)
log_url_change(page, old_url, "CPA-输入密码后点击继续")
except Exception as e:
log.warning(f"CPA 密码输入步骤异常: {e}")
log_current_url(page, "CPA-密码步骤完成后")
# ========== 授权回调阶段 - 需要串行执行 ==========
try: try:
import run import run
auth_lock = run._auth_callback_lock auth_lock = run._auth_callback_lock
@@ -2311,11 +2251,71 @@ def perform_cpa_authorization(page, email: str, password: str) -> bool:
auth_lock = None auth_lock = None
if auth_lock: if auth_lock:
log.step("等待授权回调锁...") log.step("等待授权锁...")
auth_lock.acquire() auth_lock.acquire()
log.step("获取授权回调锁,开始授权...") log.step("获取授权锁,开始授权流程...")
try: try:
# 生成授权 URL
auth_url, state = cpa_generate_auth_url()
if not auth_url or not state:
log.error("无法获取 CPA 授权 URL")
return False
# 打开授权页面
log.step("打开 CPA 授权页面...")
log.info(f"[URL] CPA授权URL: {auth_url}", icon="browser")
page.get(auth_url)
wait_for_page_stable(page, timeout=5)
log_current_url(page, "CPA授权页面加载完成", force=True)
# 检测错误页面
check_and_handle_error_page(page)
try:
# 输入邮箱
log.step("输入邮箱...")
email_input = wait_for_element(page, 'css:input[type="email"]', timeout=10)
if not email_input:
email_input = wait_for_element(page, 'css:input[name="email"]', timeout=5)
if email_input:
type_slowly(page, 'css:input[type="email"], input[name="email"]', email, base_delay=0.06)
# 点击继续
log.step("点击继续...")
continue_btn = wait_for_element(page, 'css:button[type="submit"]', timeout=5)
if continue_btn:
old_url = page.url
continue_btn.click()
wait_for_url_change(page, old_url, timeout=8)
log_url_change(page, old_url, "CPA-输入邮箱后点击继续")
except Exception as e:
log.warning(f"CPA 邮箱输入步骤异常: {e}")
log_current_url(page, "CPA-邮箱步骤完成后")
# 输入密码
current_url = page.url
if "/password" in current_url:
try:
log.step("输入密码...")
password_input = wait_for_element(page, 'css:input[type="password"]', timeout=10)
if password_input:
type_slowly(page, 'css:input[type="password"]', password, base_delay=0.06)
log.step("点击继续...")
continue_btn = wait_for_element(page, 'css:button[type="submit"]', timeout=5)
if continue_btn:
old_url = page.url
continue_btn.click()
wait_for_url_change(page, old_url, timeout=8)
log_url_change(page, old_url, "CPA-输入密码后点击继续")
except Exception as e:
log.warning(f"CPA 密码输入步骤异常: {e}")
log_current_url(page, "CPA-密码步骤完成后")
# 等待授权回调 # 等待授权回调
max_wait = 45 max_wait = 45
start_time = time.time() start_time = time.time()
@@ -2654,71 +2654,7 @@ def perform_s2a_authorization(page, email: str, password: str) -> bool:
log.info(f"开始 S2A 授权: {email}", icon="code") log.info(f"开始 S2A 授权: {email}", icon="code")
progress_update(phase="授权", step="开始 S2A 授权...") progress_update(phase="授权", step="开始 S2A 授权...")
# 生成授权 URL # ========== 授权流程 - 需要串行执行 (避免回调端口冲突) ==========
auth_url, session_id = s2a_generate_auth_url()
if not auth_url or not session_id:
log.error("无法获取 S2A 授权 URL")
return False
# 打开授权页面
progress_update(phase="授权", step="打开授权页面...")
log.step("打开 S2A 授权页面...")
log.info(f"[URL] S2A授权URL: {auth_url}", icon="browser")
page.get(auth_url)
wait_for_page_stable(page, timeout=5)
log_current_url(page, "S2A授权页面加载完成", force=True)
# 检测错误页面
check_and_handle_error_page(page)
try:
# 输入邮箱
progress_update(phase="授权", step="输入邮箱...")
log.step("输入邮箱...")
email_input = wait_for_element(page, 'css:input[type="email"]', timeout=10)
if not email_input:
email_input = wait_for_element(page, 'css:input[name="email"]', timeout=5)
if email_input:
type_slowly(page, 'css:input[type="email"], input[name="email"]', email, base_delay=0.06)
# 点击继续
log.step("点击继续...")
continue_btn = wait_for_element(page, 'css:button[type="submit"]', timeout=5)
if continue_btn:
old_url = page.url
continue_btn.click()
wait_for_url_change(page, old_url, timeout=8)
log_url_change(page, old_url, "S2A-输入邮箱后点击继续")
except Exception as e:
log.warning(f"S2A 邮箱输入步骤异常: {e}")
log_current_url(page, "S2A-邮箱步骤完成后")
# 输入密码
current_url = page.url
if "/password" in current_url:
try:
progress_update(phase="授权", step="输入密码...")
log.step("输入密码...")
password_input = wait_for_element(page, 'css:input[type="password"]', timeout=10)
if password_input:
type_slowly(page, 'css:input[type="password"]', password, base_delay=0.06)
log.step("点击继续...")
continue_btn = wait_for_element(page, 'css:button[type="submit"]', timeout=5)
if continue_btn:
old_url = page.url
continue_btn.click()
wait_for_url_change(page, old_url, timeout=8)
log_url_change(page, old_url, "S2A-输入密码后点击继续")
except Exception as e:
log.warning(f"S2A 密码输入步骤异常: {e}")
log_current_url(page, "S2A-密码步骤完成后")
# ========== 授权回调阶段 - 需要串行执行 ==========
# 获取授权回调锁,确保同一时间只有一个线程进行授权回调
try: try:
import run import run
auth_lock = run._auth_callback_lock auth_lock = run._auth_callback_lock
@@ -2726,11 +2662,74 @@ def perform_s2a_authorization(page, email: str, password: str) -> bool:
auth_lock = None auth_lock = None
if auth_lock: if auth_lock:
log.step("等待授权回调锁...") log.step("等待授权锁...")
auth_lock.acquire() auth_lock.acquire()
log.step("获取授权回调锁,开始授权...") log.step("获取授权锁,开始授权流程...")
try: try:
# 生成授权 URL
auth_url, session_id = s2a_generate_auth_url()
if not auth_url or not session_id:
log.error("无法获取 S2A 授权 URL")
return False
# 打开授权页面
progress_update(phase="授权", step="打开授权页面...")
log.step("打开 S2A 授权页面...")
log.info(f"[URL] S2A授权URL: {auth_url}", icon="browser")
page.get(auth_url)
wait_for_page_stable(page, timeout=5)
log_current_url(page, "S2A授权页面加载完成", force=True)
# 检测错误页面
check_and_handle_error_page(page)
try:
# 输入邮箱
progress_update(phase="授权", step="输入邮箱...")
log.step("输入邮箱...")
email_input = wait_for_element(page, 'css:input[type="email"]', timeout=10)
if not email_input:
email_input = wait_for_element(page, 'css:input[name="email"]', timeout=5)
if email_input:
type_slowly(page, 'css:input[type="email"], input[name="email"]', email, base_delay=0.06)
# 点击继续
log.step("点击继续...")
continue_btn = wait_for_element(page, 'css:button[type="submit"]', timeout=5)
if continue_btn:
old_url = page.url
continue_btn.click()
wait_for_url_change(page, old_url, timeout=8)
log_url_change(page, old_url, "S2A-输入邮箱后点击继续")
except Exception as e:
log.warning(f"S2A 邮箱输入步骤异常: {e}")
log_current_url(page, "S2A-邮箱步骤完成后")
# 输入密码
current_url = page.url
if "/password" in current_url:
try:
progress_update(phase="授权", step="输入密码...")
log.step("输入密码...")
password_input = wait_for_element(page, 'css:input[type="password"]', timeout=10)
if password_input:
type_slowly(page, 'css:input[type="password"]', password, base_delay=0.06)
log.step("点击继续...")
continue_btn = wait_for_element(page, 'css:button[type="submit"]', timeout=5)
if continue_btn:
old_url = page.url
continue_btn.click()
wait_for_url_change(page, old_url, timeout=8)
log_url_change(page, old_url, "S2A-输入密码后点击继续")
except Exception as e:
log.warning(f"S2A 密码输入步骤异常: {e}")
log_current_url(page, "S2A-密码步骤完成后")
# 检查是否在授权确认页面,如果是则点击授权按钮 # 检查是否在授权确认页面,如果是则点击授权按钮
time.sleep(1) time.sleep(1)
current_url = page.url current_url = page.url