From 8d5f8fe3bbec9983cf4582dcd9d188c836ecd6db Mon Sep 17 00:00:00 2001 From: kyx236 Date: Tue, 27 Jan 2026 10:34:25 +0800 Subject: [PATCH] 9 --- browser_automation.py | 46 ++++++ run.py | 321 +++++++++++++++++++++++++++++++++++------- 2 files changed, 314 insertions(+), 53 deletions(-) diff --git a/browser_automation.py b/browser_automation.py index 5408a2e..b5794a8 100644 --- a/browser_automation.py +++ b/browser_automation.py @@ -2195,6 +2195,52 @@ def login_and_authorize_with_otp(email: str) -> tuple[bool, dict]: return False, None +def register_only(email: str, password: str, use_api_register: bool = True) -> str: + """仅注册 OpenAI 账号,不进行授权 (用于并行注册 + 串行授权模式) + + Args: + email: 邮箱地址 + password: 密码 + use_api_register: 是否优先使用 API 模式注册 (默认 True) + + Returns: + str: 注册结果 + - "success": 注册成功 + - "domain_blacklisted": 域名被列入黑名单 + - "failed": 注册失败 + """ + with browser_context_with_retry(max_browser_retries=2) as ctx: + for attempt in ctx.attempts(): + try: + # 注册 OpenAI (优先使用 API 模式) + register_result = register_openai_account_auto( + ctx.page, email, password, + use_api=use_api_register + ) + + # 检查是否是域名黑名单错误 + if register_result == "domain_blacklisted": + ctx.stop() + return "domain_blacklisted" + + if not register_result: + if attempt < ctx.max_retries - 1: + log.warning("注册失败,准备重试...") + continue + return "failed" + + # 注册成功 + log.success(f"注册成功: {email}") + return "success" + + except Exception as e: + ctx.handle_error(e) + if ctx.current_attempt >= ctx.max_retries - 1: + return "failed" + + return "failed" + + def register_and_authorize(email: str, password: str, use_api_register: bool = True) -> tuple: """完整流程: 注册 OpenAI + Codex 授权 (带重试机制) diff --git a/run.py b/run.py index 83b31dc..5d071c9 100644 --- a/run.py +++ b/run.py @@ -29,7 +29,7 @@ from team_service import batch_invite_to_team, print_team_summary, check_availab from crs_service import crs_add_account, crs_sync_team_owners, crs_verify_token from cpa_service import cpa_verify_connection from s2a_service import s2a_verify_connection -from browser_automation import register_and_authorize, login_and_authorize_with_otp, authorize_only, login_and_authorize_team_owner, ShutdownRequested +from browser_automation import register_and_authorize, login_and_authorize_with_otp, authorize_only, login_and_authorize_team_owner, ShutdownRequested, register_only from utils import ( save_to_csv, load_team_tracker, @@ -757,7 +757,10 @@ def process_accounts_concurrent( include_owner: bool = False, max_workers: int = None ) -> list: - """并发处理账号列表 + """并发处理账号列表 (两阶段模式: 并行注册 + 串行授权) + + 阶段 1: 并行注册所有新账号 + 阶段 2: 串行授权所有已注册的账号 Args: accounts: 账号列表 [{"email", "password", "status", "role"}] @@ -770,12 +773,12 @@ def process_accounts_concurrent( Returns: list: 处理结果 """ - global _shutdown_requested + global _tracker, _shutdown_requested if max_workers is None: max_workers = CONCURRENT_WORKERS - stagger_delay = 4.0 # 线程错开启动间隔 (秒) + stagger_delay = 3.0 # 线程错开启动间隔 (秒) # 过滤已完成的账号 pending_accounts = [acc for acc in accounts if acc.get("status") != "completed"] @@ -787,74 +790,286 @@ def process_accounts_concurrent( total = len(pending_accounts) actual_workers = min(max_workers, total) - # 并发注册,串行授权回调 - log.section(f"并发处理 {total} 个账号 (并发数: {actual_workers}, 授权回调串行)") + # 分类账号: 需要注册的 vs 已注册待授权的 + need_register = [] + need_auth_only = [] + + for acc in pending_accounts: + status = acc.get("status", "") + role = acc.get("role", "member") + + # 已注册但未授权的状态 + if status in ["registered", "auth_failed"] or \ + (AUTH_PROVIDER == "s2a" and status == "partial") or \ + (role == "owner" and status not in ["team_owner", "completed", "authorized", ""]): + need_auth_only.append(acc) + elif status == "team_owner": + # Team Owner 使用 OTP,需要特殊处理 + need_auth_only.append(acc) + elif status in ["invited", "processing", ""]: + # 新账号,需要注册 + need_register.append(acc) + else: + # 其他状态,尝试注册 + need_register.append(acc) + + log.section(f"两阶段并发处理 {total} 个账号") + log.info(f"需要注册: {len(need_register)} 个, 需要授权: {len(need_auth_only)} 个") # 启动进度跟踪 progress_start(team_name, total, team_index, teams_total, include_owner) results = [] - completed_count = 0 - - with ThreadPoolExecutor(max_workers=actual_workers) as executor: - # 错开提交任务,每个任务间隔 stagger_delay 秒 - future_to_account = {} - for i, account in enumerate(pending_accounts): + + # ==================== 阶段 1: 并行注册 ==================== + if need_register: + log.section(f"阶段 1: 并行注册 {len(need_register)} 个账号 (并发数: {actual_workers})") + + registered_accounts = [] + + with ThreadPoolExecutor(max_workers=actual_workers) as executor: + future_to_account = {} + + for i, account in enumerate(need_register): + if _shutdown_requested: + break + + worker_id = i % actual_workers + 1 + log.info(f"[Worker-{worker_id}] 启动注册: {account['email']}", icon="start") + + future = executor.submit( + _register_single_account_worker, + account, + team_name, + worker_id + ) + future_to_account[future] = account + + # 错开启动 + if i < len(need_register) - 1: + time.sleep(stagger_delay) + + # 收集注册结果 + for future in as_completed(future_to_account): + if _shutdown_requested: + log.warning("检测到中断请求,取消剩余任务...") + executor.shutdown(wait=False, cancel_futures=True) + break + + account = future_to_account[future] + try: + reg_result = future.result() + if reg_result == "success": + log.success(f"✅ 注册成功: {account['email']}") + registered_accounts.append(account) + elif reg_result == "domain_blacklisted": + log.error(f"❌ 域名黑名单: {account['email']}") + domain = get_domain_from_email(account['email']) + add_domain_to_blacklist(domain) + with _tracker_lock: + remove_account_from_tracker(_tracker, team_name, account['email']) + save_team_tracker(_tracker) + else: + log.error(f"❌ 注册失败: {account['email']}") + with _tracker_lock: + update_account_status(_tracker, team_name, account['email'], "register_failed") + save_team_tracker(_tracker) + except Exception as e: + log.error(f"注册异常: {account.get('email', 'unknown')} - {e}") + + log.success(f"阶段 1 完成: {len(registered_accounts)}/{len(need_register)} 注册成功") + + # 将成功注册的账号加入授权列表 + need_auth_only.extend(registered_accounts) + + # ==================== 阶段 2: 串行授权 ==================== + if need_auth_only and not _shutdown_requested: + log.section(f"阶段 2: 串行授权 {len(need_auth_only)} 个账号") + + for i, account in enumerate(need_auth_only): if _shutdown_requested: + log.warning("检测到中断请求,停止授权...") break - worker_id = i % actual_workers + 1 - log.info(f"[Worker-{worker_id}] 启动任务: {account['email']}", icon="start") + email = account["email"] + password = account["password"] + role = account.get("role", "member") + status = account.get("status", "") - future = executor.submit( - _process_single_account_worker, - account, - team_name, - worker_id - ) - future_to_account[future] = account + log.info(f"[{i+1}/{len(need_auth_only)}] 授权: {email}", icon="auth") + + result = { + "team": team_name, + "email": email, + "password": password, + "status": "failed", + "crs_id": "" + } - # 错开启动,最后一个不需要等待 - if i < len(pending_accounts) - 1: - time.sleep(stagger_delay) - - # 收集结果 - for future in as_completed(future_to_account): - if _shutdown_requested: - log.warning("检测到中断请求,取消剩余任务...") - executor.shutdown(wait=False, cancel_futures=True) - break - - account = future_to_account[future] try: - result = future.result() - results.append(result) - completed_count += 1 - - # 更新进度 - is_success = result["status"] in ("success", "completed") - progress_account_done(result["email"], is_success) - - status_icon = "✅" if is_success else "❌" - log.info(f"[{completed_count}/{total}] {status_icon} {result['email']} ({result['status']})") - + with Timer(f"授权 {email}"): + # 判断授权方式 + if status == "team_owner": + # Team Owner 使用 OTP + log.info("Team Owner,使用 OTP 登录...", icon="auth") + auth_success, codex_data = login_and_authorize_with_otp(email) + else: + # 普通账号使用密码登录授权 + auth_success, codex_data = authorize_only(email, password) + + if auth_success: + with _tracker_lock: + update_account_status(_tracker, team_name, email, "authorized") + save_team_tracker(_tracker) + + # 验证入库 + if AUTH_PROVIDER == "s2a": + from s2a_service import s2a_verify_account_in_pool + verified, account_data = s2a_verify_account_in_pool(email) + + if verified: + account_id = account_data.get("id", "") + result["status"] = "success" + result["crs_id"] = f"S2A-{account_id}" + with _tracker_lock: + update_account_status(_tracker, team_name, email, "completed") + save_team_tracker(_tracker) + log.success(f"✅ S2A 入库成功: {email} (ID: {account_id})") + else: + log.warning(f"⚠️ S2A 入库验证失败: {email}") + result["status"] = "partial" + with _tracker_lock: + update_account_status(_tracker, team_name, email, "partial") + save_team_tracker(_tracker) + + elif AUTH_PROVIDER == "cpa": + result["status"] = "success" + result["crs_id"] = "CPA-AUTO" + with _tracker_lock: + update_account_status(_tracker, team_name, email, "completed") + save_team_tracker(_tracker) + log.success(f"✅ CPA 处理完成: {email}") + + else: + # CRS 模式 + if codex_data: + crs_result = crs_add_account(email, codex_data) + if crs_result: + crs_id = crs_result.get("id", "") + result["status"] = "success" + result["crs_id"] = crs_id + with _tracker_lock: + update_account_status(_tracker, team_name, email, "completed") + save_team_tracker(_tracker) + log.success(f"✅ CRS 入库成功: {email}") + else: + result["status"] = "partial" + with _tracker_lock: + update_account_status(_tracker, team_name, email, "partial") + save_team_tracker(_tracker) + else: + log.error(f"❌ 授权失败: {email}") + result["status"] = "auth_failed" + with _tracker_lock: + update_account_status(_tracker, team_name, email, "auth_failed") + save_team_tracker(_tracker) + + except ShutdownRequested: + log.warning(f"用户请求停止: {email}") + with _tracker_lock: + save_team_tracker(_tracker) + break except Exception as e: - log.error(f"任务执行异常: {account.get('email', 'unknown')} - {e}") - results.append({ - "team": team_name, - "email": account.get("email", ""), - "password": account.get("password", ""), - "status": "error", - "crs_id": "" - }) + log.error(f"授权异常: {email} - {e}") + result["status"] = "error" + with _tracker_lock: + update_account_status(_tracker, team_name, email, "error") + save_team_tracker(_tracker) + + # 保存到 CSV + save_to_csv( + email=email, + password=password, + team_name=team_name, + status=result["status"], + crs_id=result.get("crs_id", "") + ) + + results.append(result) + + # 更新进度 + is_success = result["status"] in ("success", "completed") + progress_account_done(email, is_success) + + # 授权间隔 + if i < len(need_auth_only) - 1 and not _shutdown_requested: + time.sleep(1) # 统计结果 success_count = sum(1 for r in results if r["status"] in ("success", "completed")) - log.success(f"并发处理完成: {success_count}/{len(results)} 成功") + log.success(f"两阶段处理完成: {success_count}/{len(results)} 成功") return results +def _register_single_account_worker(account: dict, team_name: str, worker_id: int) -> str: + """单个账号注册工作函数 (用于阶段 1 并行注册) + + Args: + account: 账号信息 + team_name: Team 名称 + worker_id: 工作线程 ID + + Returns: + str: "success", "domain_blacklisted", or "failed" + """ + global _tracker, _shutdown_requested + + email = account["email"] + password = account["password"] + + # 检查中断请求 + if _shutdown_requested: + return "failed" + + # 检查邮箱域名黑名单 + if is_email_blacklisted(email): + return "domain_blacklisted" + + log.info(f"[Worker-{worker_id}] 开始注册: {email}", icon="account") + + # 标记为处理中 + with _tracker_lock: + update_account_status(_tracker, team_name, email, "processing") + save_team_tracker(_tracker) + + try: + with Timer(f"[Worker-{worker_id}] 注册 {email}"): + result = register_only(email, password) + + if result == "success": + with _tracker_lock: + update_account_status(_tracker, team_name, email, "registered") + save_team_tracker(_tracker) + return "success" + elif result == "domain_blacklisted": + return "domain_blacklisted" + else: + with _tracker_lock: + update_account_status(_tracker, team_name, email, "register_failed") + save_team_tracker(_tracker) + return "failed" + + except ShutdownRequested: + return "failed" + except Exception as e: + log.error(f"[Worker-{worker_id}] 注册异常: {email} - {e}") + with _tracker_lock: + update_account_status(_tracker, team_name, email, "error") + save_team_tracker(_tracker) + return "failed" + + def _print_system_config(): """打印当前系统配置""" from config import (