This commit is contained in:
2026-01-27 10:34:25 +08:00
parent 52b875a7f9
commit 8d5f8fe3bb
2 changed files with 314 additions and 53 deletions

View File

@@ -2195,6 +2195,52 @@ def login_and_authorize_with_otp(email: str) -> tuple[bool, dict]:
return False, None return False, None
def register_only(email: str, password: str, use_api_register: bool = True) -> str:
"""仅注册 OpenAI 账号,不进行授权 (用于并行注册 + 串行授权模式)
Args:
email: 邮箱地址
password: 密码
use_api_register: 是否优先使用 API 模式注册 (默认 True)
Returns:
str: 注册结果
- "success": 注册成功
- "domain_blacklisted": 域名被列入黑名单
- "failed": 注册失败
"""
with browser_context_with_retry(max_browser_retries=2) as ctx:
for attempt in ctx.attempts():
try:
# 注册 OpenAI (优先使用 API 模式)
register_result = register_openai_account_auto(
ctx.page, email, password,
use_api=use_api_register
)
# 检查是否是域名黑名单错误
if register_result == "domain_blacklisted":
ctx.stop()
return "domain_blacklisted"
if not register_result:
if attempt < ctx.max_retries - 1:
log.warning("注册失败,准备重试...")
continue
return "failed"
# 注册成功
log.success(f"注册成功: {email}")
return "success"
except Exception as e:
ctx.handle_error(e)
if ctx.current_attempt >= ctx.max_retries - 1:
return "failed"
return "failed"
def register_and_authorize(email: str, password: str, use_api_register: bool = True) -> tuple: def register_and_authorize(email: str, password: str, use_api_register: bool = True) -> tuple:
"""完整流程: 注册 OpenAI + Codex 授权 (带重试机制) """完整流程: 注册 OpenAI + Codex 授权 (带重试机制)

321
run.py
View File

@@ -29,7 +29,7 @@ from team_service import batch_invite_to_team, print_team_summary, check_availab
from crs_service import crs_add_account, crs_sync_team_owners, crs_verify_token from crs_service import crs_add_account, crs_sync_team_owners, crs_verify_token
from cpa_service import cpa_verify_connection from cpa_service import cpa_verify_connection
from s2a_service import s2a_verify_connection from s2a_service import s2a_verify_connection
from browser_automation import register_and_authorize, login_and_authorize_with_otp, authorize_only, login_and_authorize_team_owner, ShutdownRequested from browser_automation import register_and_authorize, login_and_authorize_with_otp, authorize_only, login_and_authorize_team_owner, ShutdownRequested, register_only
from utils import ( from utils import (
save_to_csv, save_to_csv,
load_team_tracker, load_team_tracker,
@@ -757,7 +757,10 @@ def process_accounts_concurrent(
include_owner: bool = False, include_owner: bool = False,
max_workers: int = None max_workers: int = None
) -> list: ) -> list:
"""并发处理账号列表 """并发处理账号列表 (两阶段模式: 并行注册 + 串行授权)
阶段 1: 并行注册所有新账号
阶段 2: 串行授权所有已注册的账号
Args: Args:
accounts: 账号列表 [{"email", "password", "status", "role"}] accounts: 账号列表 [{"email", "password", "status", "role"}]
@@ -770,12 +773,12 @@ def process_accounts_concurrent(
Returns: Returns:
list: 处理结果 list: 处理结果
""" """
global _shutdown_requested global _tracker, _shutdown_requested
if max_workers is None: if max_workers is None:
max_workers = CONCURRENT_WORKERS max_workers = CONCURRENT_WORKERS
stagger_delay = 4.0 # 线程错开启动间隔 (秒) stagger_delay = 3.0 # 线程错开启动间隔 (秒)
# 过滤已完成的账号 # 过滤已完成的账号
pending_accounts = [acc for acc in accounts if acc.get("status") != "completed"] pending_accounts = [acc for acc in accounts if acc.get("status") != "completed"]
@@ -787,74 +790,286 @@ def process_accounts_concurrent(
total = len(pending_accounts) total = len(pending_accounts)
actual_workers = min(max_workers, total) actual_workers = min(max_workers, total)
# 并发注册,串行授权回调 # 分类账号: 需要注册的 vs 已注册待授权的
log.section(f"并发处理 {total} 个账号 (并发数: {actual_workers}, 授权回调串行)") need_register = []
need_auth_only = []
for acc in pending_accounts:
status = acc.get("status", "")
role = acc.get("role", "member")
# 已注册但未授权的状态
if status in ["registered", "auth_failed"] or \
(AUTH_PROVIDER == "s2a" and status == "partial") or \
(role == "owner" and status not in ["team_owner", "completed", "authorized", ""]):
need_auth_only.append(acc)
elif status == "team_owner":
# Team Owner 使用 OTP需要特殊处理
need_auth_only.append(acc)
elif status in ["invited", "processing", ""]:
# 新账号,需要注册
need_register.append(acc)
else:
# 其他状态,尝试注册
need_register.append(acc)
log.section(f"两阶段并发处理 {total} 个账号")
log.info(f"需要注册: {len(need_register)} 个, 需要授权: {len(need_auth_only)}")
# 启动进度跟踪 # 启动进度跟踪
progress_start(team_name, total, team_index, teams_total, include_owner) progress_start(team_name, total, team_index, teams_total, include_owner)
results = [] results = []
completed_count = 0
# ==================== 阶段 1: 并行注册 ====================
with ThreadPoolExecutor(max_workers=actual_workers) as executor: if need_register:
# 错开提交任务,每个任务间隔 stagger_delay 秒 log.section(f"阶段 1: 并行注册 {len(need_register)} 个账号 (并发数: {actual_workers})")
future_to_account = {}
for i, account in enumerate(pending_accounts): registered_accounts = []
with ThreadPoolExecutor(max_workers=actual_workers) as executor:
future_to_account = {}
for i, account in enumerate(need_register):
if _shutdown_requested:
break
worker_id = i % actual_workers + 1
log.info(f"[Worker-{worker_id}] 启动注册: {account['email']}", icon="start")
future = executor.submit(
_register_single_account_worker,
account,
team_name,
worker_id
)
future_to_account[future] = account
# 错开启动
if i < len(need_register) - 1:
time.sleep(stagger_delay)
# 收集注册结果
for future in as_completed(future_to_account):
if _shutdown_requested:
log.warning("检测到中断请求,取消剩余任务...")
executor.shutdown(wait=False, cancel_futures=True)
break
account = future_to_account[future]
try:
reg_result = future.result()
if reg_result == "success":
log.success(f"✅ 注册成功: {account['email']}")
registered_accounts.append(account)
elif reg_result == "domain_blacklisted":
log.error(f"❌ 域名黑名单: {account['email']}")
domain = get_domain_from_email(account['email'])
add_domain_to_blacklist(domain)
with _tracker_lock:
remove_account_from_tracker(_tracker, team_name, account['email'])
save_team_tracker(_tracker)
else:
log.error(f"❌ 注册失败: {account['email']}")
with _tracker_lock:
update_account_status(_tracker, team_name, account['email'], "register_failed")
save_team_tracker(_tracker)
except Exception as e:
log.error(f"注册异常: {account.get('email', 'unknown')} - {e}")
log.success(f"阶段 1 完成: {len(registered_accounts)}/{len(need_register)} 注册成功")
# 将成功注册的账号加入授权列表
need_auth_only.extend(registered_accounts)
# ==================== 阶段 2: 串行授权 ====================
if need_auth_only and not _shutdown_requested:
log.section(f"阶段 2: 串行授权 {len(need_auth_only)} 个账号")
for i, account in enumerate(need_auth_only):
if _shutdown_requested: if _shutdown_requested:
log.warning("检测到中断请求,停止授权...")
break break
worker_id = i % actual_workers + 1 email = account["email"]
log.info(f"[Worker-{worker_id}] 启动任务: {account['email']}", icon="start") password = account["password"]
role = account.get("role", "member")
status = account.get("status", "")
future = executor.submit( log.info(f"[{i+1}/{len(need_auth_only)}] 授权: {email}", icon="auth")
_process_single_account_worker,
account, result = {
team_name, "team": team_name,
worker_id "email": email,
) "password": password,
future_to_account[future] = account "status": "failed",
"crs_id": ""
}
# 错开启动,最后一个不需要等待
if i < len(pending_accounts) - 1:
time.sleep(stagger_delay)
# 收集结果
for future in as_completed(future_to_account):
if _shutdown_requested:
log.warning("检测到中断请求,取消剩余任务...")
executor.shutdown(wait=False, cancel_futures=True)
break
account = future_to_account[future]
try: try:
result = future.result() with Timer(f"授权 {email}"):
results.append(result) # 判断授权方式
completed_count += 1 if status == "team_owner":
# Team Owner 使用 OTP
# 更新进度 log.info("Team Owner使用 OTP 登录...", icon="auth")
is_success = result["status"] in ("success", "completed") auth_success, codex_data = login_and_authorize_with_otp(email)
progress_account_done(result["email"], is_success) else:
# 普通账号使用密码登录授权
status_icon = "" if is_success else "" auth_success, codex_data = authorize_only(email, password)
log.info(f"[{completed_count}/{total}] {status_icon} {result['email']} ({result['status']})")
if auth_success:
with _tracker_lock:
update_account_status(_tracker, team_name, email, "authorized")
save_team_tracker(_tracker)
# 验证入库
if AUTH_PROVIDER == "s2a":
from s2a_service import s2a_verify_account_in_pool
verified, account_data = s2a_verify_account_in_pool(email)
if verified:
account_id = account_data.get("id", "")
result["status"] = "success"
result["crs_id"] = f"S2A-{account_id}"
with _tracker_lock:
update_account_status(_tracker, team_name, email, "completed")
save_team_tracker(_tracker)
log.success(f"✅ S2A 入库成功: {email} (ID: {account_id})")
else:
log.warning(f"⚠️ S2A 入库验证失败: {email}")
result["status"] = "partial"
with _tracker_lock:
update_account_status(_tracker, team_name, email, "partial")
save_team_tracker(_tracker)
elif AUTH_PROVIDER == "cpa":
result["status"] = "success"
result["crs_id"] = "CPA-AUTO"
with _tracker_lock:
update_account_status(_tracker, team_name, email, "completed")
save_team_tracker(_tracker)
log.success(f"✅ CPA 处理完成: {email}")
else:
# CRS 模式
if codex_data:
crs_result = crs_add_account(email, codex_data)
if crs_result:
crs_id = crs_result.get("id", "")
result["status"] = "success"
result["crs_id"] = crs_id
with _tracker_lock:
update_account_status(_tracker, team_name, email, "completed")
save_team_tracker(_tracker)
log.success(f"✅ CRS 入库成功: {email}")
else:
result["status"] = "partial"
with _tracker_lock:
update_account_status(_tracker, team_name, email, "partial")
save_team_tracker(_tracker)
else:
log.error(f"❌ 授权失败: {email}")
result["status"] = "auth_failed"
with _tracker_lock:
update_account_status(_tracker, team_name, email, "auth_failed")
save_team_tracker(_tracker)
except ShutdownRequested:
log.warning(f"用户请求停止: {email}")
with _tracker_lock:
save_team_tracker(_tracker)
break
except Exception as e: except Exception as e:
log.error(f"任务执行异常: {account.get('email', 'unknown')} - {e}") log.error(f"授权异常: {email} - {e}")
results.append({ result["status"] = "error"
"team": team_name, with _tracker_lock:
"email": account.get("email", ""), update_account_status(_tracker, team_name, email, "error")
"password": account.get("password", ""), save_team_tracker(_tracker)
"status": "error",
"crs_id": "" # 保存到 CSV
}) save_to_csv(
email=email,
password=password,
team_name=team_name,
status=result["status"],
crs_id=result.get("crs_id", "")
)
results.append(result)
# 更新进度
is_success = result["status"] in ("success", "completed")
progress_account_done(email, is_success)
# 授权间隔
if i < len(need_auth_only) - 1 and not _shutdown_requested:
time.sleep(1)
# 统计结果 # 统计结果
success_count = sum(1 for r in results if r["status"] in ("success", "completed")) success_count = sum(1 for r in results if r["status"] in ("success", "completed"))
log.success(f"并发处理完成: {success_count}/{len(results)} 成功") log.success(f"两阶段处理完成: {success_count}/{len(results)} 成功")
return results return results
def _register_single_account_worker(account: dict, team_name: str, worker_id: int) -> str:
"""单个账号注册工作函数 (用于阶段 1 并行注册)
Args:
account: 账号信息
team_name: Team 名称
worker_id: 工作线程 ID
Returns:
str: "success", "domain_blacklisted", or "failed"
"""
global _tracker, _shutdown_requested
email = account["email"]
password = account["password"]
# 检查中断请求
if _shutdown_requested:
return "failed"
# 检查邮箱域名黑名单
if is_email_blacklisted(email):
return "domain_blacklisted"
log.info(f"[Worker-{worker_id}] 开始注册: {email}", icon="account")
# 标记为处理中
with _tracker_lock:
update_account_status(_tracker, team_name, email, "processing")
save_team_tracker(_tracker)
try:
with Timer(f"[Worker-{worker_id}] 注册 {email}"):
result = register_only(email, password)
if result == "success":
with _tracker_lock:
update_account_status(_tracker, team_name, email, "registered")
save_team_tracker(_tracker)
return "success"
elif result == "domain_blacklisted":
return "domain_blacklisted"
else:
with _tracker_lock:
update_account_status(_tracker, team_name, email, "register_failed")
save_team_tracker(_tracker)
return "failed"
except ShutdownRequested:
return "failed"
except Exception as e:
log.error(f"[Worker-{worker_id}] 注册异常: {email} - {e}")
with _tracker_lock:
update_account_status(_tracker, team_name, email, "error")
save_team_tracker(_tracker)
return "failed"
def _print_system_config(): def _print_system_config():
"""打印当前系统配置""" """打印当前系统配置"""
from config import ( from config import (