diff --git a/browser_automation.py b/browser_automation.py
index 0f3c3ba..e34cddd 100644
--- a/browser_automation.py
+++ b/browser_automation.py
@@ -2303,81 +2303,108 @@ def perform_cpa_authorization(page, email: str, password: str) -> bool:
log_current_url(page, "CPA-密码步骤完成后")
- # 等待授权回调
- max_wait = 45
- start_time = time.time()
- callback_url = None
- progress_shown = False
- last_url_in_loop = None
- log.step(f"等待 CPA 授权回调 (最多 {max_wait}s)...")
+ # ========== 授权回调阶段 - 需要串行执行 ==========
+ try:
+ import run
+ auth_lock = run._auth_callback_lock
+ except (ImportError, AttributeError):
+ auth_lock = None
- while time.time() - start_time < max_wait:
- try:
- current_url = page.url
+ if auth_lock:
+ log.step("等待授权回调锁...")
+ auth_lock.acquire()
+ log.step("获取授权回调锁,开始授权...")
- # 记录 URL 变化
- if current_url != last_url_in_loop:
- log_current_url(page, "CPA等待回调中")
- last_url_in_loop = current_url
+ try:
+ # 等待授权回调
+ max_wait = 45
+ start_time = time.time()
+ callback_url = None
+ progress_shown = False
+ last_url_in_loop = None
+ auth_btn_clicked = False # 标记是否已点击授权按钮
+ log.step(f"等待 CPA 授权回调 (最多 {max_wait}s)...")
- # 检查是否到达回调页面 (CPA 使用 localhost:1455)
- if is_cpa_callback_url(current_url):
+ while time.time() - start_time < max_wait:
+ try:
+ current_url = page.url
+
+ # 记录 URL 变化
+ if current_url != last_url_in_loop:
+ log_current_url(page, "CPA等待回调中")
+ last_url_in_loop = current_url
+ # URL 变化后重置授权按钮点击标记
+ auth_btn_clicked = False
+
+ # 检查是否到达回调页面 (CPA 使用 localhost:1455)
+ if is_cpa_callback_url(current_url):
+ if progress_shown:
+ log.progress_clear()
+ log.success("CPA 获取到回调 URL")
+ log.info(f"[URL] CPA回调地址: {current_url}", icon="browser")
+ callback_url = current_url
+ break
+
+ # 尝试点击授权按钮 (只点击一次)
+ if not auth_btn_clicked:
+ try:
+ buttons = page.eles('css:button[type="submit"]')
+ for btn in buttons:
+ if btn.states.is_displayed and btn.states.is_enabled:
+ btn_text = btn.text.lower()
+ if any(x in btn_text for x in ['allow', 'authorize', 'continue', '授权', '允许', '继续', 'accept']):
+ if progress_shown:
+ log.progress_clear()
+ progress_shown = False
+ log.step(f"点击按钮: {btn.text}")
+ btn.click()
+ auth_btn_clicked = True # 标记已点击
+ time.sleep(2) # 等待页面响应
+ break
+ except Exception:
+ pass
+
+ elapsed = int(time.time() - start_time)
+ log.progress_inline(f"[CPA等待中... {elapsed}s]")
+ progress_shown = True
+ time.sleep(1.5)
+
+ except Exception as e:
if progress_shown:
log.progress_clear()
- log.success("CPA 获取到回调 URL")
- log.info(f"[URL] CPA回调地址: {current_url}", icon="browser")
- callback_url = current_url
- break
+ progress_shown = False
+ log.warning(f"CPA检查异常: {e}")
+ time.sleep(1.5)
- # 尝试点击授权按钮
+ if progress_shown:
+ log.progress_clear()
+
+ if not callback_url:
+ log.error("CPA 无法获取回调 URL")
+ return False
+
+ # CPA 特有流程: 提交回调 URL
+ log.step("提交 CPA 回调 URL...")
+ if not cpa_submit_callback(callback_url):
+ log.error("CPA 回调 URL 提交失败")
+ return False
+
+ # CPA 特有流程: 轮询授权状态
+ if cpa_poll_auth_status(state):
+ log.success("CPA Codex 授权成功")
+ return True
+ else:
+ log.error("CPA 授权状态检查失败")
+ return False
+
+ finally:
+ # 确保释放锁
+ if auth_lock:
try:
- buttons = page.eles('css:button[type="submit"]')
- for btn in buttons:
- if btn.states.is_displayed and btn.states.is_enabled:
- btn_text = btn.text.lower()
- if any(x in btn_text for x in ['allow', 'authorize', 'continue', '授权', '允许', '继续', 'accept']):
- if progress_shown:
- log.progress_clear()
- progress_shown = False
- log.step(f"点击按钮: {btn.text}")
- btn.click()
- time.sleep(1.5)
- break
- except Exception:
- pass
-
- elapsed = int(time.time() - start_time)
- log.progress_inline(f"[CPA等待中... {elapsed}s]")
- progress_shown = True
- time.sleep(1.5)
-
- except Exception as e:
- if progress_shown:
- log.progress_clear()
- progress_shown = False
- log.warning(f"CPA检查异常: {e}")
- time.sleep(1.5)
-
- if progress_shown:
- log.progress_clear()
-
- if not callback_url:
- log.error("CPA 无法获取回调 URL")
- return False
-
- # CPA 特有流程: 提交回调 URL
- log.step("提交 CPA 回调 URL...")
- if not cpa_submit_callback(callback_url):
- log.error("CPA 回调 URL 提交失败")
- return False
-
- # CPA 特有流程: 轮询授权状态
- if cpa_poll_auth_status(state):
- log.success("CPA Codex 授权成功")
- return True
- else:
- log.error("CPA 授权状态检查失败")
- return False
+ auth_lock.release()
+ log.step("释放授权回调锁")
+ except RuntimeError:
+ pass # 锁可能已经被释放
def perform_cpa_authorization_with_otp(page, email: str) -> bool:
@@ -2537,6 +2564,7 @@ def perform_cpa_authorization_with_otp(page, email: str) -> bool:
callback_url = None
progress_shown = False
last_url_in_loop = None
+ auth_btn_clicked = False # 标记是否已点击授权按钮
log.step(f"等待 CPA 授权回调 (最多 {max_wait}s)...")
while time.time() - start_time < max_wait:
@@ -2546,6 +2574,8 @@ def perform_cpa_authorization_with_otp(page, email: str) -> bool:
if current_url != last_url_in_loop:
log_current_url(page, "CPA-OTP流程-等待回调中")
last_url_in_loop = current_url
+ # URL 变化后重置授权按钮点击标记
+ auth_btn_clicked = False
if is_cpa_callback_url(current_url):
if progress_shown:
@@ -2555,21 +2585,24 @@ def perform_cpa_authorization_with_otp(page, email: str) -> bool:
callback_url = current_url
break
- try:
- buttons = page.eles('css:button[type="submit"]')
- for btn in buttons:
- if btn.states.is_displayed and btn.states.is_enabled:
- btn_text = btn.text.lower()
- if any(x in btn_text for x in ['allow', 'authorize', 'continue', '授权', '允许', '继续', 'accept']):
- if progress_shown:
- log.progress_clear()
- progress_shown = False
- log.step(f"点击按钮: {btn.text}")
- btn.click()
- time.sleep(1.5)
- break
- except Exception:
- pass
+ # 尝试点击授权按钮 (只点击一次)
+ if not auth_btn_clicked:
+ try:
+ buttons = page.eles('css:button[type="submit"]')
+ for btn in buttons:
+ if btn.states.is_displayed and btn.states.is_enabled:
+ btn_text = btn.text.lower()
+ if any(x in btn_text for x in ['allow', 'authorize', 'continue', '授权', '允许', '继续', 'accept']):
+ if progress_shown:
+ log.progress_clear()
+ progress_shown = False
+ log.step(f"点击按钮: {btn.text}")
+ btn.click()
+ auth_btn_clicked = True # 标记已点击
+ time.sleep(2) # 等待页面响应
+ break
+ except Exception:
+ pass
elapsed = int(time.time() - start_time)
log.progress_inline(f"[CPA-OTP等待中... {elapsed}s]")
@@ -2684,86 +2717,162 @@ def perform_s2a_authorization(page, email: str, password: str) -> bool:
log_current_url(page, "S2A-密码步骤完成后")
- # 等待授权回调 (S2A 使用 localhost 回调)
- max_wait = 45
- start_time = time.time()
- callback_url = None
- progress_shown = False
- last_url_in_loop = None
- progress_update(phase="授权", step="等待回调...")
- log.step(f"等待 S2A 授权回调 (最多 {max_wait}s)...")
+ # ========== 授权回调阶段 - 需要串行执行 ==========
+ # 获取授权回调锁,确保同一时间只有一个线程进行授权回调
+ try:
+ import run
+ auth_lock = run._auth_callback_lock
+ except (ImportError, AttributeError):
+ auth_lock = None
- while time.time() - start_time < max_wait:
- try:
- current_url = page.url
+ if auth_lock:
+ log.step("等待授权回调锁...")
+ auth_lock.acquire()
+ log.step("获取授权回调锁,开始授权...")
- # 记录 URL 变化
- if current_url != last_url_in_loop:
- log_current_url(page, "S2A等待回调中")
- last_url_in_loop = current_url
+ try:
+ # 检查是否在授权确认页面,如果是则点击授权按钮
+ time.sleep(1)
+ current_url = page.url
+ if "/authorize" in current_url or "consent" in current_url:
+ try:
+ log.step("检测到授权确认页面,点击授权按钮...")
+ auth_btn = wait_for_element(page, 'css:button[type="submit"]', timeout=5)
+ if auth_btn:
+ btn_text = auth_btn.text.lower() if auth_btn.text else ""
+ if 'authorize' in btn_text or '授权' in btn_text or 'allow' in btn_text or 'continue' in btn_text:
+ old_url = page.url
+ auth_btn.click()
+ time.sleep(2)
+ if page.url != old_url:
+ log_url_change(page, old_url, "S2A-点击授权按钮后")
+ except Exception as e:
+ log.warning(f"S2A 授权按钮点击异常: {e}")
- # 检查是否到达回调页面 (S2A 使用 localhost:1455 或类似端口)
- if "localhost" in current_url and "code=" in current_url:
+ # 等待授权回调 (S2A 使用 localhost 回调)
+ max_wait = 45
+ start_time = time.time()
+ callback_url = None
+ progress_shown = False
+ last_url_in_loop = None
+ auth_btn_clicked = False # 标记是否已点击授权按钮
+ error_detected = False # 标记是否检测到错误
+ progress_update(phase="授权", step="等待回调...")
+ log.step(f"等待 S2A 授权回调 (最多 {max_wait}s)...")
+
+ while time.time() - start_time < max_wait:
+ try:
+ current_url = page.url
+
+ # 记录 URL 变化
+ if current_url != last_url_in_loop:
+ log_current_url(page, "S2A等待回调中")
+ last_url_in_loop = current_url
+ # URL 变化后重置授权按钮点击标记
+ auth_btn_clicked = False
+ error_detected = False
+
+ # 检查是否到达回调页面 (S2A 使用 localhost:1455 或类似端口)
+ if "localhost" in current_url and "code=" in current_url:
+ if progress_shown:
+ log.progress_clear()
+ progress_shown = False
+
+ callback_url = current_url
+ log.success(f"捕获 S2A 回调 URL")
+ break
+
+ # 检测 OpenAI 错误页面
+ if not error_detected:
+ try:
+ error_elem = page.ele('text:Something went wrong', timeout=0.3) or \
+ page.ele('text:出错了', timeout=0.3) or \
+ page.ele('text:Access denied', timeout=0.3) or \
+ page.ele('text:rate limit', timeout=0.3)
+ if error_elem:
+ log.warning(f"检测到错误页面: {error_elem.text[:50] if error_elem.text else 'unknown'}")
+ error_detected = True
+ # 尝试刷新页面
+ page.refresh()
+ time.sleep(3)
+ continue
+ except Exception:
+ pass
+
+ # 检测错误
+ check_and_handle_error(page)
+
+ # 检查是否需要点击 Authorize (只点击一次)
+ if not auth_btn_clicked:
+ try:
+ auth_btn = page.ele('css:button[type="submit"]', timeout=0.5)
+ if auth_btn:
+ btn_text = auth_btn.text.lower() if auth_btn.text else ""
+ btn_enabled = auth_btn.states.is_enabled if hasattr(auth_btn, 'states') else True
+ btn_displayed = auth_btn.states.is_displayed if hasattr(auth_btn, 'states') else True
+
+ if 'authorize' in btn_text or '授权' in btn_text or 'continue' in btn_text or 'allow' in btn_text:
+ if btn_enabled and btn_displayed:
+ log.step(f"点击授权按钮: '{auth_btn.text}'")
+ old_url = page.url
+ auth_btn.click()
+ auth_btn_clicked = True # 标记已点击
+ time.sleep(3) # 增加等待时间
+ new_url = page.url
+ if new_url != old_url:
+ log_url_change(page, old_url, "S2A点击授权按钮后")
+ else:
+ log.warning(f"点击授权按钮后 URL 未变化,当前: {new_url[:80]}...")
+ else:
+ log.warning(f"授权按钮不可用: enabled={btn_enabled}, displayed={btn_displayed}")
+ except Exception as e:
+ log.warning(f"检查授权按钮异常: {e}")
+
+ elapsed = int(time.time() - start_time)
+ log.progress_inline(f"[S2A等待中... {elapsed}s]")
+ progress_shown = True
+ time.sleep(1.5)
+
+ except Exception as e:
if progress_shown:
log.progress_clear()
progress_shown = False
+ log.warning(f"S2A检查异常: {e}")
+ time.sleep(1.5)
- callback_url = current_url
- log.success(f"捕获 S2A 回调 URL")
- break
+ if progress_shown:
+ log.progress_clear()
- # 检测错误
- check_and_handle_error(page)
+ if not callback_url:
+ log.error("S2A 无法获取回调链接")
+ return False
- # 检查是否需要点击 Authorize
+ # 从回调 URL 中提取 code
+ code = extract_code_from_url(callback_url)
+ if not code:
+ log.error("S2A 无法从回调链接提取授权码")
+ return False
+
+ # S2A 特有流程: 用授权码创建账号 (传入完整邮箱用于验证)
+ progress_update(phase="授权", step="提交授权码...")
+ log.step("正在提交 S2A 授权码...")
+ result = s2a_create_account_from_oauth(code, session_id, name=email)
+
+ if result:
+ log.success("S2A 授权流程完成")
+ return True
+ else:
+ log.error("S2A 账号入库失败")
+ return False
+
+ finally:
+ # 确保释放锁
+ if auth_lock:
try:
- auth_btn = page.ele('css:button[type="submit"]', timeout=0.5)
- if auth_btn:
- btn_text = auth_btn.text.lower() if auth_btn.text else ""
- if 'authorize' in btn_text or '授权' in btn_text or 'continue' in btn_text:
- log.step("点击授权按钮...")
- old_url = page.url
- auth_btn.click()
- wait_for_url_change(page, old_url, timeout=5)
- log_url_change(page, old_url, "S2A点击授权按钮后")
- except Exception:
- pass
-
- elapsed = int(time.time() - start_time)
- log.progress_inline(f"[S2A等待中... {elapsed}s]")
- progress_shown = True
- time.sleep(1.5)
-
- except Exception as e:
- if progress_shown:
- log.progress_clear()
- progress_shown = False
- log.warning(f"S2A检查异常: {e}")
- time.sleep(1.5)
-
- if progress_shown:
- log.progress_clear()
-
- if not callback_url:
- log.error("S2A 无法获取回调链接")
- return False
-
- # 从回调 URL 中提取 code
- code = extract_code_from_url(callback_url)
- if not code:
- log.error("S2A 无法从回调链接提取授权码")
- return False
-
- # S2A 特有流程: 用授权码创建账号 (传入完整邮箱用于验证)
- progress_update(phase="授权", step="提交授权码...")
- log.step("正在提交 S2A 授权码...")
- result = s2a_create_account_from_oauth(code, session_id, name=email)
- if result:
- log.success("S2A 授权流程完成")
- return True
- else:
- log.error("S2A 账号入库失败")
- return False
+ auth_lock.release()
+ log.step("释放授权回调锁")
+ except RuntimeError:
+ pass # 锁可能已经被释放
# ==================== 格式3专用: 登录获取 Session ====================
diff --git a/run.py b/run.py
index 267b5d6..5dfd9c5 100644
--- a/run.py
+++ b/run.py
@@ -62,6 +62,7 @@ _tracker = None
_current_results = []
_shutdown_requested = False
_tracker_lock = threading.Lock() # 用于并发时保护 tracker 操作
+_auth_callback_lock = threading.Lock() # 授权回调锁 - 确保同一时间只有一个线程进行授权回调
def _save_state():
@@ -228,7 +229,7 @@ def process_single_team(team: dict, team_index: int = 0, teams_total: int = 0) -
# ========== 阶段 3: 处理所有账号 (注册 + Codex 授权 + 入库) ==========
if all_to_process:
- # 根据配置选择串行或并发处理
+ # 根据配置选择处理模式
if CONCURRENT_ENABLED and len(all_to_process) > 1:
log.section(f"阶段 3: 并发处理 {len(all_to_process)} 个账号 (并发数: {min(CONCURRENT_WORKERS, len(all_to_process))})")
all_results = process_accounts_concurrent(
@@ -786,7 +787,8 @@ def process_accounts_concurrent(
total = len(pending_accounts)
actual_workers = min(max_workers, total)
- log.section(f"并发处理 {total} 个账号 (并发数: {actual_workers}, 间隔: {stagger_delay}s)")
+ # 并发注册,串行授权回调
+ log.section(f"并发处理 {total} 个账号 (并发数: {actual_workers}, 授权回调串行)")
# 启动进度跟踪
progress_start(team_name, total, team_index, teams_total, include_owner)
@@ -881,7 +883,7 @@ def _print_system_config():
# 并发配置
if CONCURRENT_ENABLED:
- log.info(f"并发处理: ✓ 开启 ({CONCURRENT_WORKERS} 并发)", icon="config")
+ log.info(f"并发处理: ✓ 开启 ({CONCURRENT_WORKERS} 并发, 授权串行)", icon="config")
else:
log.info("并发处理: ✗ 关闭 (串行模式)", icon="config")
diff --git a/telegram_bot.py b/telegram_bot.py
index 7639b8c..f00e62b 100644
--- a/telegram_bot.py
+++ b/telegram_bot.py
@@ -671,7 +671,9 @@ class ProvisionerBot:
f"⚡ 并发处理\n\n"
f"状态: {status}\n"
f"并发数: {workers}\n\n"
- f"开启后将同时启动 {workers} 个浏览器实例并行处理账号\n\n"
+ f"说明:\n"
+ f"• 注册流程并发执行\n"
+ f"• 授权回调串行执行 (避免端口冲突)\n\n"
f"💡 设置并发数:\n"
f"/concurrent 4 - 设置为 4 并发\n\n"
f"使用 /reload 立即生效",
@@ -697,7 +699,9 @@ class ProvisionerBot:
f"⚡ 并发处理\n\n"
f"状态: ✅ 已开启\n"
f"并发数: {workers}\n\n"
- f"将同时启动 {workers} 个浏览器实例并行处理账号\n\n"
+ f"说明:\n"
+ f"• 注册流程并发执行\n"
+ f"• 授权回调串行执行 (避免端口冲突)\n\n"
f"💡 使用 /reload 立即生效",
parse_mode="HTML"
)