多线程

This commit is contained in:
2026-01-27 09:08:34 +08:00
parent 8cb7a50bb9
commit 6cafaa4ab7
3 changed files with 343 additions and 10 deletions

View File

@@ -312,6 +312,7 @@ def reload_config() -> dict:
global PROXY_ENABLED, PROXIES
global S2A_API_BASE, S2A_ADMIN_KEY, S2A_ADMIN_TOKEN
global S2A_CONCURRENCY, S2A_PRIORITY, S2A_GROUP_NAMES, S2A_GROUP_IDS
global CONCURRENT_ENABLED, CONCURRENT_WORKERS
result = {
"success": True,
@@ -349,6 +350,11 @@ def reload_config() -> dict:
_account = _cfg.get("account", {})
ACCOUNTS_PER_TEAM = _account.get("accounts_per_team", 4)
# 并发配置
_concurrent = _cfg.get("concurrent", {})
CONCURRENT_ENABLED = _concurrent.get("enabled", False)
CONCURRENT_WORKERS = _concurrent.get("workers", 4)
# GPTMail 配置
_gptmail = _cfg.get("gptmail", {})
GPTMAIL_PREFIX = _gptmail.get("prefix", "")
@@ -600,6 +606,11 @@ _account = _cfg.get("account", {})
DEFAULT_PASSWORD = _account.get("default_password", "kfcvivo50")
ACCOUNTS_PER_TEAM = _account.get("accounts_per_team", 4)
# 并发处理配置
_concurrent = _cfg.get("concurrent", {})
CONCURRENT_ENABLED = _concurrent.get("enabled", False) # 是否启用并发处理
CONCURRENT_WORKERS = _concurrent.get("workers", 4) # 并发数量 (浏览器实例数)
# 注册
_reg = _cfg.get("register", {})
REGISTER_NAME = _reg.get("name", "test")

View File

@@ -162,6 +162,15 @@ default_password = "YourSecurePassword@2025"
# 每个 Team 下创建的账号数量
accounts_per_team = 4
# ==================== 并发处理配置 ====================
# 启用后可同时处理多个账号,大幅提升效率
[concurrent]
# 是否启用并发处理 (默认关闭)
enabled = false
# 并发数量 (同时运行的浏览器实例数)
# 建议根据机器配置设置: 4核8G内存建议设置为 2-4
workers = 4
# ==================== 注册配置 ====================
[register]
# 注册时使用的用户名 (实际会使用随机生成的英文名)

321
run.py
View File

@@ -5,9 +5,9 @@
# 1. 检查未完成账号 (自动恢复)
# 2. 批量创建邮箱 (4个)
# 3. 一次性邀请到 Team
# 4. 逐个注册 OpenAI 账号
# 4. 逐个注册 OpenAI 账号 (或并发处理)
# 5. 逐个 Codex 授权
# 6. 逐个添加到 CRS
# 6. 逐个添加到 CRS/S2A
# 7. 切换下一个 Team
import time
@@ -15,11 +15,14 @@ import random
import signal
import sys
import atexit
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
from config import (
TEAMS, ACCOUNTS_PER_TEAM, DEFAULT_PASSWORD, AUTH_PROVIDER,
add_domain_to_blacklist, get_domain_from_email, is_email_blacklisted,
save_team_json, get_next_proxy
save_team_json, get_next_proxy,
CONCURRENT_ENABLED, CONCURRENT_WORKERS
)
from email_service import batch_create_emails, unified_create_email
from team_service import batch_invite_to_team, print_team_summary, check_available_seats, invite_single_to_team, preload_all_account_ids
@@ -58,6 +61,7 @@ except ImportError:
_tracker = None
_current_results = []
_shutdown_requested = False
_tracker_lock = threading.Lock() # 用于并发时保护 tracker 操作
def _save_state():
@@ -224,6 +228,15 @@ def process_single_team(team: dict, team_index: int = 0, teams_total: int = 0) -
# ========== 阶段 3: 处理所有账号 (注册 + Codex 授权 + 入库) ==========
if all_to_process:
# 根据配置选择串行或并发处理
if CONCURRENT_ENABLED and len(all_to_process) > 1:
log.section(f"阶段 3: 并发处理 {len(all_to_process)} 个账号 (并发数: {min(CONCURRENT_WORKERS, len(all_to_process))})")
all_results = process_accounts_concurrent(
all_to_process, team_name,
team_index=team_index, teams_total=teams_total,
include_owner=include_owner
)
else:
log.section(f"阶段 3: 逐个注册 OpenAI + Codex 授权 + 入库")
all_results = process_accounts(
all_to_process, team_name,
@@ -535,13 +548,307 @@ def process_accounts(accounts: list, team_name: str, team_index: int = 0,
return results
# ==================== 并发处理函数 ====================
def _process_single_account_worker(
account: dict,
team_name: str,
worker_id: int
) -> dict:
"""单个账号处理工作函数 (用于并发执行)
Args:
account: 账号信息 {"email", "password", "status", "role"}
team_name: Team 名称
worker_id: 工作线程 ID
Returns:
dict: 处理结果
"""
global _tracker, _shutdown_requested
email = account["email"]
password = account["password"]
role = account.get("role", "member")
account_status = account.get("status", "")
account_role = account.get("role", "member")
result = {
"team": team_name,
"email": email,
"password": password,
"status": "failed",
"crs_id": "",
"worker_id": worker_id
}
# 检查中断请求
if _shutdown_requested:
log.warning(f"[Worker-{worker_id}] 检测到中断请求,跳过: {email}")
return result
# 检查邮箱域名黑名单
if is_email_blacklisted(email):
domain = get_domain_from_email(email)
log.warning(f"[Worker-{worker_id}] 邮箱域名 {domain} 在黑名单中,跳过: {email}")
with _tracker_lock:
remove_account_from_tracker(_tracker, team_name, email)
save_team_tracker(_tracker)
return result
# 已完成的账号跳过
if account_status == "completed":
log.info(f"[Worker-{worker_id}] 账号已完成,跳过: {email}")
result["status"] = "completed"
return result
log.info(f"[Worker-{worker_id}] 开始处理: {email}", icon="account")
# 判断处理流程
is_team_owner_otp = account_status == "team_owner"
if AUTH_PROVIDER == "s2a":
need_crs_only = account_status == "authorized"
else:
need_crs_only = account_status in ["authorized", "partial"]
need_auth_only = (
account_status in ["registered", "auth_failed"]
or (AUTH_PROVIDER == "s2a" and account_status == "partial")
or (account_role == "owner" and account_status not in ["team_owner", "completed", "authorized", "partial"])
)
# 标记为处理中
with _tracker_lock:
update_account_status(_tracker, team_name, email, "processing")
save_team_tracker(_tracker)
try:
with Timer(f"[Worker-{worker_id}] 账号 {email}"):
if is_team_owner_otp:
log.info(f"[Worker-{worker_id}] Team Owner (OTP 登录)...", icon="auth")
auth_success, codex_data = login_and_authorize_with_otp(email)
register_success = auth_success
elif need_crs_only:
log.info(f"[Worker-{worker_id}] 已授权账号,跳过授权...", icon="auth")
register_success = True
codex_data = None
if AUTH_PROVIDER not in ("cpa", "s2a"):
auth_success, codex_data = authorize_only(email, password)
register_success = auth_success
elif need_auth_only:
log.info(f"[Worker-{worker_id}] 已注册账号,密码登录授权...", icon="auth")
auth_success, codex_data = authorize_only(email, password)
register_success = True
else:
log.info(f"[Worker-{worker_id}] 新账号,注册 + 授权...", icon="auth")
register_success, codex_data = register_and_authorize(email, password)
if register_success == "domain_blacklisted":
domain = get_domain_from_email(email)
log.error(f"[Worker-{worker_id}] 域名 {domain} 不被支持")
add_domain_to_blacklist(domain)
with _tracker_lock:
remove_account_from_tracker(_tracker, team_name, email)
save_team_tracker(_tracker)
return result
if register_success and register_success != "domain_blacklisted":
with _tracker_lock:
update_account_status(_tracker, team_name, email, "registered")
save_team_tracker(_tracker)
if AUTH_PROVIDER == "s2a":
from s2a_service import s2a_verify_account_in_pool
with _tracker_lock:
update_account_status(_tracker, team_name, email, "authorized")
save_team_tracker(_tracker)
log.step(f"[Worker-{worker_id}] 验证 S2A 入库状态...")
verified, account_data = s2a_verify_account_in_pool(email)
if verified:
account_id = account_data.get("id", "")
result["status"] = "success"
result["crs_id"] = f"S2A-{account_id}"
with _tracker_lock:
update_account_status(_tracker, team_name, email, "completed")
save_team_tracker(_tracker)
log.success(f"[Worker-{worker_id}] ✅ S2A 入库成功: {email} (ID: {account_id})")
else:
log.warning(f"[Worker-{worker_id}] ⚠️ S2A 入库验证失败: {email}")
result["status"] = "partial"
with _tracker_lock:
update_account_status(_tracker, team_name, email, "partial")
save_team_tracker(_tracker)
elif AUTH_PROVIDER == "cpa":
with _tracker_lock:
update_account_status(_tracker, team_name, email, "authorized")
save_team_tracker(_tracker)
result["status"] = "success"
result["crs_id"] = "CPA-AUTO"
with _tracker_lock:
update_account_status(_tracker, team_name, email, "completed")
save_team_tracker(_tracker)
log.success(f"[Worker-{worker_id}] ✅ CPA 处理完成: {email}")
else:
# CRS 模式
if codex_data:
with _tracker_lock:
update_account_status(_tracker, team_name, email, "authorized")
save_team_tracker(_tracker)
crs_result = crs_add_account(email, codex_data)
if crs_result:
crs_id = crs_result.get("id", "")
result["status"] = "success"
result["crs_id"] = crs_id
with _tracker_lock:
update_account_status(_tracker, team_name, email, "completed")
save_team_tracker(_tracker)
log.success(f"[Worker-{worker_id}] ✅ CRS 入库成功: {email}")
else:
result["status"] = "partial"
with _tracker_lock:
update_account_status(_tracker, team_name, email, "partial")
save_team_tracker(_tracker)
else:
result["status"] = "auth_failed"
with _tracker_lock:
update_account_status(_tracker, team_name, email, "auth_failed")
save_team_tracker(_tracker)
else:
log.error(f"[Worker-{worker_id}] 注册/授权失败: {email}")
with _tracker_lock:
update_account_status(_tracker, team_name, email, "register_failed")
save_team_tracker(_tracker)
except ShutdownRequested:
log.warning(f"[Worker-{worker_id}] 用户请求停止: {email}")
with _tracker_lock:
save_team_tracker(_tracker)
except Exception as e:
log.error(f"[Worker-{worker_id}] 处理异常: {email} - {e}")
with _tracker_lock:
update_account_status(_tracker, team_name, email, "error")
save_team_tracker(_tracker)
# 保存到 CSV
save_to_csv(
email=email,
password=password,
team_name=team_name,
status=result["status"],
crs_id=result.get("crs_id", "")
)
return result
def process_accounts_concurrent(
accounts: list,
team_name: str,
team_index: int = 0,
teams_total: int = 0,
include_owner: bool = False,
max_workers: int = None
) -> list:
"""并发处理账号列表
Args:
accounts: 账号列表 [{"email", "password", "status", "role"}]
team_name: Team 名称
team_index: 当前 Team 序号
teams_total: Team 总数
include_owner: 是否包含 Owner
max_workers: 最大并发数 (默认使用配置值)
Returns:
list: 处理结果
"""
global _shutdown_requested
if max_workers is None:
max_workers = CONCURRENT_WORKERS
# 过滤已完成的账号
pending_accounts = [acc for acc in accounts if acc.get("status") != "completed"]
if not pending_accounts:
log.info("所有账号已完成,无需处理")
return []
total = len(pending_accounts)
actual_workers = min(max_workers, total)
log.section(f"并发处理 {total} 个账号 (并发数: {actual_workers})")
# 启动进度跟踪
progress_start(team_name, total, team_index, teams_total, include_owner)
results = []
completed_count = 0
with ThreadPoolExecutor(max_workers=actual_workers) as executor:
# 提交所有任务
future_to_account = {
executor.submit(
_process_single_account_worker,
account,
team_name,
i % actual_workers + 1
): account
for i, account in enumerate(pending_accounts)
}
# 收集结果
for future in as_completed(future_to_account):
if _shutdown_requested:
log.warning("检测到中断请求,取消剩余任务...")
executor.shutdown(wait=False, cancel_futures=True)
break
account = future_to_account[future]
try:
result = future.result()
results.append(result)
completed_count += 1
# 更新进度
is_success = result["status"] in ("success", "completed")
progress_account_done(result["email"], is_success)
status_icon = "" if is_success else ""
log.info(f"[{completed_count}/{total}] {status_icon} {result['email']} ({result['status']})")
except Exception as e:
log.error(f"任务执行异常: {account.get('email', 'unknown')} - {e}")
results.append({
"team": team_name,
"email": account.get("email", ""),
"password": account.get("password", ""),
"status": "error",
"crs_id": ""
})
# 统计结果
success_count = sum(1 for r in results if r["status"] in ("success", "completed"))
log.success(f"并发处理完成: {success_count}/{len(results)} 成功")
return results
def _print_system_config():
"""打印当前系统配置"""
from config import (
EMAIL_PROVIDER, AUTH_PROVIDER, ACCOUNTS_PER_TEAM,
INCLUDE_TEAM_OWNERS, BROWSER_RANDOM_FINGERPRINT,
S2A_API_BASE, CPA_API_BASE, CRS_API_BASE,
PROXY_ENABLED, PROXIES
PROXY_ENABLED, PROXIES,
CONCURRENT_ENABLED, CONCURRENT_WORKERS
)
log.section("系统配置")
@@ -560,6 +867,12 @@ def _print_system_config():
log.info(f"Owner 入库: {'✓ 开启' if INCLUDE_TEAM_OWNERS else '✗ 关闭'}", icon="config")
log.info(f"随机指纹: {'✓ 开启' if BROWSER_RANDOM_FINGERPRINT else '✗ 关闭'}", icon="config")
# 并发配置
if CONCURRENT_ENABLED:
log.info(f"并发处理: ✓ 开启 ({CONCURRENT_WORKERS} 并发)", icon="config")
else:
log.info("并发处理: ✗ 关闭 (串行模式)", icon="config")
if PROXY_ENABLED and PROXIES:
log.info(f"代理: 已启用 ({len(PROXIES)} 个)", icon="proxy")
else: