diff --git a/config.py b/config.py index b60b332..be62b55 100644 --- a/config.py +++ b/config.py @@ -312,6 +312,7 @@ def reload_config() -> dict: global PROXY_ENABLED, PROXIES global S2A_API_BASE, S2A_ADMIN_KEY, S2A_ADMIN_TOKEN global S2A_CONCURRENCY, S2A_PRIORITY, S2A_GROUP_NAMES, S2A_GROUP_IDS + global CONCURRENT_ENABLED, CONCURRENT_WORKERS result = { "success": True, @@ -349,6 +350,11 @@ def reload_config() -> dict: _account = _cfg.get("account", {}) ACCOUNTS_PER_TEAM = _account.get("accounts_per_team", 4) + # 并发配置 + _concurrent = _cfg.get("concurrent", {}) + CONCURRENT_ENABLED = _concurrent.get("enabled", False) + CONCURRENT_WORKERS = _concurrent.get("workers", 4) + # GPTMail 配置 _gptmail = _cfg.get("gptmail", {}) GPTMAIL_PREFIX = _gptmail.get("prefix", "") @@ -600,6 +606,11 @@ _account = _cfg.get("account", {}) DEFAULT_PASSWORD = _account.get("default_password", "kfcvivo50") ACCOUNTS_PER_TEAM = _account.get("accounts_per_team", 4) +# 并发处理配置 +_concurrent = _cfg.get("concurrent", {}) +CONCURRENT_ENABLED = _concurrent.get("enabled", False) # 是否启用并发处理 +CONCURRENT_WORKERS = _concurrent.get("workers", 4) # 并发数量 (浏览器实例数) + # 注册 _reg = _cfg.get("register", {}) REGISTER_NAME = _reg.get("name", "test") diff --git a/config.toml.example b/config.toml.example index 1b555e1..b0b5974 100644 --- a/config.toml.example +++ b/config.toml.example @@ -162,6 +162,15 @@ default_password = "YourSecurePassword@2025" # 每个 Team 下创建的账号数量 accounts_per_team = 4 +# ==================== 并发处理配置 ==================== +# 启用后可同时处理多个账号,大幅提升效率 +[concurrent] +# 是否启用并发处理 (默认关闭) +enabled = false +# 并发数量 (同时运行的浏览器实例数) +# 建议根据机器配置设置: 4核8G内存建议设置为 2-4 +workers = 4 + # ==================== 注册配置 ==================== [register] # 注册时使用的用户名 (实际会使用随机生成的英文名) diff --git a/run.py b/run.py index 5b123b2..e1e220e 100644 --- a/run.py +++ b/run.py @@ -5,9 +5,9 @@ # 1. 检查未完成账号 (自动恢复) # 2. 批量创建邮箱 (4个) # 3. 一次性邀请到 Team -# 4. 逐个注册 OpenAI 账号 +# 4. 逐个注册 OpenAI 账号 (或并发处理) # 5. 逐个 Codex 授权 -# 6. 逐个添加到 CRS +# 6. 逐个添加到 CRS/S2A # 7. 切换下一个 Team import time @@ -15,11 +15,14 @@ import random import signal import sys import atexit +import threading +from concurrent.futures import ThreadPoolExecutor, as_completed from config import ( TEAMS, ACCOUNTS_PER_TEAM, DEFAULT_PASSWORD, AUTH_PROVIDER, add_domain_to_blacklist, get_domain_from_email, is_email_blacklisted, - save_team_json, get_next_proxy + save_team_json, get_next_proxy, + CONCURRENT_ENABLED, CONCURRENT_WORKERS ) from email_service import batch_create_emails, unified_create_email from team_service import batch_invite_to_team, print_team_summary, check_available_seats, invite_single_to_team, preload_all_account_ids @@ -58,6 +61,7 @@ except ImportError: _tracker = None _current_results = [] _shutdown_requested = False +_tracker_lock = threading.Lock() # 用于并发时保护 tracker 操作 def _save_state(): @@ -224,12 +228,21 @@ def process_single_team(team: dict, team_index: int = 0, teams_total: int = 0) - # ========== 阶段 3: 处理所有账号 (注册 + Codex 授权 + 入库) ========== if all_to_process: - log.section(f"阶段 3: 逐个注册 OpenAI + Codex 授权 + 入库") - all_results = process_accounts( - all_to_process, team_name, - team_index=team_index, teams_total=teams_total, - include_owner=include_owner - ) + # 根据配置选择串行或并发处理 + if CONCURRENT_ENABLED and len(all_to_process) > 1: + log.section(f"阶段 3: 并发处理 {len(all_to_process)} 个账号 (并发数: {min(CONCURRENT_WORKERS, len(all_to_process))})") + all_results = process_accounts_concurrent( + all_to_process, team_name, + team_index=team_index, teams_total=teams_total, + include_owner=include_owner + ) + else: + log.section(f"阶段 3: 逐个注册 OpenAI + Codex 授权 + 入库") + all_results = process_accounts( + all_to_process, team_name, + team_index=team_index, teams_total=teams_total, + include_owner=include_owner + ) results.extend(all_results) # ========== Team 处理完成 ========== @@ -535,13 +548,307 @@ def process_accounts(accounts: list, team_name: str, team_index: int = 0, return results +# ==================== 并发处理函数 ==================== + +def _process_single_account_worker( + account: dict, + team_name: str, + worker_id: int +) -> dict: + """单个账号处理工作函数 (用于并发执行) + + Args: + account: 账号信息 {"email", "password", "status", "role"} + team_name: Team 名称 + worker_id: 工作线程 ID + + Returns: + dict: 处理结果 + """ + global _tracker, _shutdown_requested + + email = account["email"] + password = account["password"] + role = account.get("role", "member") + account_status = account.get("status", "") + account_role = account.get("role", "member") + + result = { + "team": team_name, + "email": email, + "password": password, + "status": "failed", + "crs_id": "", + "worker_id": worker_id + } + + # 检查中断请求 + if _shutdown_requested: + log.warning(f"[Worker-{worker_id}] 检测到中断请求,跳过: {email}") + return result + + # 检查邮箱域名黑名单 + if is_email_blacklisted(email): + domain = get_domain_from_email(email) + log.warning(f"[Worker-{worker_id}] 邮箱域名 {domain} 在黑名单中,跳过: {email}") + with _tracker_lock: + remove_account_from_tracker(_tracker, team_name, email) + save_team_tracker(_tracker) + return result + + # 已完成的账号跳过 + if account_status == "completed": + log.info(f"[Worker-{worker_id}] 账号已完成,跳过: {email}") + result["status"] = "completed" + return result + + log.info(f"[Worker-{worker_id}] 开始处理: {email}", icon="account") + + # 判断处理流程 + is_team_owner_otp = account_status == "team_owner" + + if AUTH_PROVIDER == "s2a": + need_crs_only = account_status == "authorized" + else: + need_crs_only = account_status in ["authorized", "partial"] + + need_auth_only = ( + account_status in ["registered", "auth_failed"] + or (AUTH_PROVIDER == "s2a" and account_status == "partial") + or (account_role == "owner" and account_status not in ["team_owner", "completed", "authorized", "partial"]) + ) + + # 标记为处理中 + with _tracker_lock: + update_account_status(_tracker, team_name, email, "processing") + save_team_tracker(_tracker) + + try: + with Timer(f"[Worker-{worker_id}] 账号 {email}"): + if is_team_owner_otp: + log.info(f"[Worker-{worker_id}] Team Owner (OTP 登录)...", icon="auth") + auth_success, codex_data = login_and_authorize_with_otp(email) + register_success = auth_success + elif need_crs_only: + log.info(f"[Worker-{worker_id}] 已授权账号,跳过授权...", icon="auth") + register_success = True + codex_data = None + if AUTH_PROVIDER not in ("cpa", "s2a"): + auth_success, codex_data = authorize_only(email, password) + register_success = auth_success + elif need_auth_only: + log.info(f"[Worker-{worker_id}] 已注册账号,密码登录授权...", icon="auth") + auth_success, codex_data = authorize_only(email, password) + register_success = True + else: + log.info(f"[Worker-{worker_id}] 新账号,注册 + 授权...", icon="auth") + register_success, codex_data = register_and_authorize(email, password) + + if register_success == "domain_blacklisted": + domain = get_domain_from_email(email) + log.error(f"[Worker-{worker_id}] 域名 {domain} 不被支持") + add_domain_to_blacklist(domain) + with _tracker_lock: + remove_account_from_tracker(_tracker, team_name, email) + save_team_tracker(_tracker) + return result + + if register_success and register_success != "domain_blacklisted": + with _tracker_lock: + update_account_status(_tracker, team_name, email, "registered") + save_team_tracker(_tracker) + + if AUTH_PROVIDER == "s2a": + from s2a_service import s2a_verify_account_in_pool + + with _tracker_lock: + update_account_status(_tracker, team_name, email, "authorized") + save_team_tracker(_tracker) + + log.step(f"[Worker-{worker_id}] 验证 S2A 入库状态...") + verified, account_data = s2a_verify_account_in_pool(email) + + if verified: + account_id = account_data.get("id", "") + result["status"] = "success" + result["crs_id"] = f"S2A-{account_id}" + with _tracker_lock: + update_account_status(_tracker, team_name, email, "completed") + save_team_tracker(_tracker) + log.success(f"[Worker-{worker_id}] ✅ S2A 入库成功: {email} (ID: {account_id})") + else: + log.warning(f"[Worker-{worker_id}] ⚠️ S2A 入库验证失败: {email}") + result["status"] = "partial" + with _tracker_lock: + update_account_status(_tracker, team_name, email, "partial") + save_team_tracker(_tracker) + + elif AUTH_PROVIDER == "cpa": + with _tracker_lock: + update_account_status(_tracker, team_name, email, "authorized") + save_team_tracker(_tracker) + result["status"] = "success" + result["crs_id"] = "CPA-AUTO" + with _tracker_lock: + update_account_status(_tracker, team_name, email, "completed") + save_team_tracker(_tracker) + log.success(f"[Worker-{worker_id}] ✅ CPA 处理完成: {email}") + + else: + # CRS 模式 + if codex_data: + with _tracker_lock: + update_account_status(_tracker, team_name, email, "authorized") + save_team_tracker(_tracker) + crs_result = crs_add_account(email, codex_data) + if crs_result: + crs_id = crs_result.get("id", "") + result["status"] = "success" + result["crs_id"] = crs_id + with _tracker_lock: + update_account_status(_tracker, team_name, email, "completed") + save_team_tracker(_tracker) + log.success(f"[Worker-{worker_id}] ✅ CRS 入库成功: {email}") + else: + result["status"] = "partial" + with _tracker_lock: + update_account_status(_tracker, team_name, email, "partial") + save_team_tracker(_tracker) + else: + result["status"] = "auth_failed" + with _tracker_lock: + update_account_status(_tracker, team_name, email, "auth_failed") + save_team_tracker(_tracker) + else: + log.error(f"[Worker-{worker_id}] 注册/授权失败: {email}") + with _tracker_lock: + update_account_status(_tracker, team_name, email, "register_failed") + save_team_tracker(_tracker) + + except ShutdownRequested: + log.warning(f"[Worker-{worker_id}] 用户请求停止: {email}") + with _tracker_lock: + save_team_tracker(_tracker) + + except Exception as e: + log.error(f"[Worker-{worker_id}] 处理异常: {email} - {e}") + with _tracker_lock: + update_account_status(_tracker, team_name, email, "error") + save_team_tracker(_tracker) + + # 保存到 CSV + save_to_csv( + email=email, + password=password, + team_name=team_name, + status=result["status"], + crs_id=result.get("crs_id", "") + ) + + return result + + +def process_accounts_concurrent( + accounts: list, + team_name: str, + team_index: int = 0, + teams_total: int = 0, + include_owner: bool = False, + max_workers: int = None +) -> list: + """并发处理账号列表 + + Args: + accounts: 账号列表 [{"email", "password", "status", "role"}] + team_name: Team 名称 + team_index: 当前 Team 序号 + teams_total: Team 总数 + include_owner: 是否包含 Owner + max_workers: 最大并发数 (默认使用配置值) + + Returns: + list: 处理结果 + """ + global _shutdown_requested + + if max_workers is None: + max_workers = CONCURRENT_WORKERS + + # 过滤已完成的账号 + pending_accounts = [acc for acc in accounts if acc.get("status") != "completed"] + + if not pending_accounts: + log.info("所有账号已完成,无需处理") + return [] + + total = len(pending_accounts) + actual_workers = min(max_workers, total) + + log.section(f"并发处理 {total} 个账号 (并发数: {actual_workers})") + + # 启动进度跟踪 + progress_start(team_name, total, team_index, teams_total, include_owner) + + results = [] + completed_count = 0 + + with ThreadPoolExecutor(max_workers=actual_workers) as executor: + # 提交所有任务 + future_to_account = { + executor.submit( + _process_single_account_worker, + account, + team_name, + i % actual_workers + 1 + ): account + for i, account in enumerate(pending_accounts) + } + + # 收集结果 + for future in as_completed(future_to_account): + if _shutdown_requested: + log.warning("检测到中断请求,取消剩余任务...") + executor.shutdown(wait=False, cancel_futures=True) + break + + account = future_to_account[future] + try: + result = future.result() + results.append(result) + completed_count += 1 + + # 更新进度 + is_success = result["status"] in ("success", "completed") + progress_account_done(result["email"], is_success) + + status_icon = "✅" if is_success else "❌" + log.info(f"[{completed_count}/{total}] {status_icon} {result['email']} ({result['status']})") + + except Exception as e: + log.error(f"任务执行异常: {account.get('email', 'unknown')} - {e}") + results.append({ + "team": team_name, + "email": account.get("email", ""), + "password": account.get("password", ""), + "status": "error", + "crs_id": "" + }) + + # 统计结果 + success_count = sum(1 for r in results if r["status"] in ("success", "completed")) + log.success(f"并发处理完成: {success_count}/{len(results)} 成功") + + return results + + def _print_system_config(): """打印当前系统配置""" from config import ( EMAIL_PROVIDER, AUTH_PROVIDER, ACCOUNTS_PER_TEAM, INCLUDE_TEAM_OWNERS, BROWSER_RANDOM_FINGERPRINT, S2A_API_BASE, CPA_API_BASE, CRS_API_BASE, - PROXY_ENABLED, PROXIES + PROXY_ENABLED, PROXIES, + CONCURRENT_ENABLED, CONCURRENT_WORKERS ) log.section("系统配置") @@ -560,6 +867,12 @@ def _print_system_config(): log.info(f"Owner 入库: {'✓ 开启' if INCLUDE_TEAM_OWNERS else '✗ 关闭'}", icon="config") log.info(f"随机指纹: {'✓ 开启' if BROWSER_RANDOM_FINGERPRINT else '✗ 关闭'}", icon="config") + # 并发配置 + if CONCURRENT_ENABLED: + log.info(f"并发处理: ✓ 开启 ({CONCURRENT_WORKERS} 并发)", icon="config") + else: + log.info("并发处理: ✗ 关闭 (串行模式)", icon="config") + if PROXY_ENABLED and PROXIES: log.info(f"代理: 已启用 ({len(PROXIES)} 个)", icon="proxy") else: