diff --git a/browser_automation.py b/browser_automation.py
index b5794a8..09670ed 100644
--- a/browser_automation.py
+++ b/browser_automation.py
@@ -1079,64 +1079,136 @@ def is_logged_in(page, timeout: int = 5) -> bool:
return False
-def register_openai_account_api(email: str, password: str, proxy: str = None) -> bool:
+def register_openai_account_api(email: str, password: str, proxy: str = None,
+ team_name: str = None, max_email_retries: int = 3) -> bool:
"""使用协议模式 (API) 注册 OpenAI 账号
+ 如果验证码获取超时,会自动创建新邮箱重试(不进入浏览器模式)
+
Args:
email: 邮箱地址
password: 密码
proxy: 代理地址 (可选)
+ team_name: Team 名称 (用于邀请新邮箱)
+ max_email_retries: 验证码超时后最大重试次数 (创建新邮箱)
Returns:
bool: 是否成功
+ str: 如果返回 "new_email:xxx@xxx.com:password",表示使用了新邮箱
"""
if not API_MODE_AVAILABLE:
log.warning("协议模式不可用,回退到浏览器模式")
return None # 返回 None 表示需要回退
- log.info(f"[API模式] 开始注册 OpenAI 账号: {email}", icon="account")
+ current_email = email
+ current_password = password
- # 生成随机姓名和生日
- random_name = get_random_name()
- birthday = get_random_birthday()
- birthdate = f"{birthday['year']}-{birthday['month']}-{birthday['day']}"
-
- log.step(f"姓名: {random_name}, 生日: {birthdate}")
-
- # 定义获取验证码的函数
- def get_code(target_email):
- progress_update(phase="注册", step="等待验证码...")
- log.step("等待验证码邮件...")
- code, error, email_time = unified_get_verification_code(target_email)
- if code:
- log.success(f"获取到验证码: {code}")
- return code
-
- # 执行 API 注册
- try:
- result = api_register_account_only(
- email=email,
- password=password,
- real_name=random_name,
- birthdate=birthdate,
- get_verification_code_func=get_code,
- proxy=proxy,
- progress_callback=lambda msg: log.step(msg)
- )
-
- if result:
- log.success(f"[API模式] 注册完成: {email}")
- return True
- else:
- log.warning("[API模式] 注册失败,可能需要回退到浏览器模式")
- return False
+ for retry in range(max_email_retries):
+ if retry > 0:
+ log.warning(f"验证码超时,尝试创建新邮箱 (重试 {retry}/{max_email_retries - 1})...")
- except Exception as e:
- log.error(f"[API模式] 注册异常: {e}")
- return False
+ # 创建新邮箱
+ from email_service import unified_create_email
+ new_email, new_password = unified_create_email()
+
+ if not new_email:
+ log.error("创建新邮箱失败")
+ continue
+
+ # 如果有 team_name,邀请新邮箱到 Team
+ if team_name:
+ from team_service import invite_single_to_team
+ from config import TEAMS
+
+ # 查找 team 配置
+ team = None
+ for t in TEAMS:
+ if t.get("name") == team_name:
+ team = t
+ break
+
+ if team:
+ log.step(f"邀请新邮箱到 Team: {new_email}")
+ if not invite_single_to_team(new_email, team):
+ log.error("新邮箱邀请失败")
+ continue
+ log.success(f"新邮箱邀请成功: {new_email}")
+
+ current_email = new_email
+ current_password = new_password
+
+ log.info(f"[API模式] 开始注册 OpenAI 账号: {current_email}", icon="account")
+
+ # 生成随机姓名和生日
+ random_name = get_random_name()
+ birthday = get_random_birthday()
+ birthdate = f"{birthday['year']}-{birthday['month']}-{birthday['day']}"
+
+ log.step(f"姓名: {random_name}, 生日: {birthdate}")
+
+ # 验证码超时标志
+ verification_timeout = False
+
+ # 定义获取验证码的函数 (5秒超时)
+ def get_code(target_email):
+ nonlocal verification_timeout
+ progress_update(phase="注册", step="等待验证码...")
+ log.step("等待验证码邮件 (5秒超时)...")
+
+ # 使用较短的超时时间: 5 次快速重试,每次 1 秒
+ code, error, email_time = unified_get_verification_code(
+ target_email,
+ max_retries=5, # 5 次重试
+ interval=1 # 每次间隔 1 秒
+ )
+
+ if code:
+ log.success(f"获取到验证码: {code}")
+ return code
+ else:
+ verification_timeout = True
+ log.warning("验证码获取超时 (5秒)")
+ return None
+
+ # 执行 API 注册
+ try:
+ result = api_register_account_only(
+ email=current_email,
+ password=current_password,
+ real_name=random_name,
+ birthdate=birthdate,
+ get_verification_code_func=get_code,
+ proxy=proxy,
+ progress_callback=lambda msg: log.step(msg)
+ )
+
+ if result:
+ log.success(f"[API模式] 注册完成: {current_email}")
+ # 如果使用了新邮箱,返回特殊标记
+ if current_email != email:
+ return f"new_email:{current_email}:{current_password}"
+ return True
+ elif verification_timeout:
+ # 验证码超时,继续下一次重试(创建新邮箱)
+ log.warning("[API模式] 验证码超时,将创建新邮箱重试...")
+ continue
+ else:
+ log.warning("[API模式] 注册失败")
+ return False
+
+ except Exception as e:
+ log.error(f"[API模式] 注册异常: {e}")
+ if "验证码" in str(e) or "timeout" in str(e).lower():
+ # 验证码相关异常,继续重试
+ continue
+ return False
+
+ log.error(f"[API模式] 已重试 {max_email_retries} 次,全部失败")
+ return False
-def register_openai_account_auto(page, email: str, password: str, use_api: bool = True, proxy: str = None) -> bool:
+def register_openai_account_auto(page, email: str, password: str, use_api: bool = True,
+ proxy: str = None, team_name: str = None) -> bool:
"""自动选择模式注册 OpenAI 账号
优先使用 API 模式,失败则回退到浏览器模式
@@ -1147,15 +1219,20 @@ def register_openai_account_auto(page, email: str, password: str, use_api: bool
password: 密码
use_api: 是否优先使用 API 模式
proxy: 代理地址 (API 模式使用)
+ team_name: Team 名称 (用于验证码超时时邀请新邮箱)
Returns:
bool: 是否成功
+ str: 如果返回 "new_email:xxx@xxx.com:password",表示使用了新邮箱
"""
# 如果启用 API 模式且可用
if use_api and API_MODE_AVAILABLE:
- result = register_openai_account_api(email, password, proxy)
+ result = register_openai_account_api(email, password, proxy, team_name)
if result is True:
return True
+ elif isinstance(result, str) and result.startswith("new_email:"):
+ # 使用了新邮箱,返回新邮箱信息
+ return result
elif result is False:
log.warning("API 模式注册失败,回退到浏览器模式...")
# result is None 表示 API 模式不可用,直接使用浏览器模式
@@ -2241,19 +2318,22 @@ def register_only(email: str, password: str, use_api_register: bool = True) -> s
return "failed"
-def register_and_authorize(email: str, password: str, use_api_register: bool = True) -> tuple:
+def register_and_authorize(email: str, password: str, use_api_register: bool = True,
+ team_name: str = None) -> tuple:
"""完整流程: 注册 OpenAI + Codex 授权 (带重试机制)
Args:
email: 邮箱地址
password: 密码
use_api_register: 是否优先使用 API 模式注册 (默认 True)
+ team_name: Team 名称 (用于验证码超时时邀请新邮箱)
Returns:
- tuple: (register_success, codex_data)
+ tuple: (register_success, codex_data, new_email_info)
- register_success: True/False/"domain_blacklisted"
- CRS 模式: codex_data 包含 tokens
- CPA/S2A 模式: codex_data 为 None (后台自动处理)
+ - new_email_info: 如果使用了新邮箱,返回 {"email": "xxx", "password": "xxx"},否则为 None
"""
# 获取授权回调锁 (CPA/S2A 模式需要串行授权)
auth_lock = None
@@ -2264,25 +2344,42 @@ def register_and_authorize(email: str, password: str, use_api_register: bool = T
except (ImportError, AttributeError):
pass
+ # 用于跟踪是否使用了新邮箱
+ new_email_info = None
+ current_email = email
+ current_password = password
+
with browser_context_with_retry(max_browser_retries=2) as ctx:
for attempt in ctx.attempts():
try:
# 注册 OpenAI (优先使用 API 模式)
register_result = register_openai_account_auto(
- ctx.page, email, password,
- use_api=use_api_register
+ ctx.page, current_email, current_password,
+ use_api=use_api_register,
+ team_name=team_name
)
# 检查是否是域名黑名单错误
if register_result == "domain_blacklisted":
ctx.stop()
- return "domain_blacklisted", None
+ return "domain_blacklisted", None, None
+
+ # 检查是否使用了新邮箱
+ if isinstance(register_result, str) and register_result.startswith("new_email:"):
+ # 解析新邮箱信息: "new_email:xxx@xxx.com:password"
+ parts = register_result.split(":", 2)
+ if len(parts) >= 3:
+ current_email = parts[1]
+ current_password = parts[2]
+ new_email_info = {"email": current_email, "password": current_password}
+ log.success(f"使用新邮箱继续: {current_email}")
+ register_result = True
if not register_result:
if attempt < ctx.max_retries - 1:
log.warning("注册失败,准备重试...")
continue
- return False, None
+ return False, None, new_email_info
# 短暂等待确保注册完成
time.sleep(0.5)
@@ -2303,16 +2400,16 @@ def register_and_authorize(email: str, password: str, use_api_register: bool = T
# 根据配置选择授权方式
if AUTH_PROVIDER == "cpa":
# CPA 模式: 授权成功即完成,后台自动处理账号
- success = perform_cpa_authorization(ctx.page, email, password)
- return True, None if success else (True, None) # 注册成功,授权可能失败
+ success = perform_cpa_authorization(ctx.page, current_email, current_password)
+ return True, None, new_email_info if success else (True, None, new_email_info)
elif AUTH_PROVIDER == "s2a":
# S2A 模式: 授权成功即完成,后台自动处理账号
- success = perform_s2a_authorization(ctx.page, email, password)
- return True, None if success else (True, None) # 注册成功,授权可能失败
+ success = perform_s2a_authorization(ctx.page, current_email, current_password)
+ return True, None, new_email_info if success else (True, None, new_email_info)
else:
# CRS 模式: 需要 codex_data
- codex_data = perform_codex_authorization(ctx.page, email, password)
- return True, codex_data
+ codex_data = perform_codex_authorization(ctx.page, current_email, current_password)
+ return True, codex_data, new_email_info
finally:
if auth_lock and lock_acquired:
log.step("释放授权回调锁")
@@ -2321,9 +2418,9 @@ def register_and_authorize(email: str, password: str, use_api_register: bool = T
except Exception as e:
ctx.handle_error(e)
if ctx.current_attempt >= ctx.max_retries - 1:
- return False, None
+ return False, None, new_email_info
- return False, None
+ return False, None, new_email_info
def authorize_only(email: str, password: str) -> tuple[bool, dict]:
diff --git a/run.py b/run.py
index 67616b9..4b55890 100644
--- a/run.py
+++ b/run.py
@@ -407,7 +407,26 @@ def process_accounts(accounts: list, team_name: str, team_index: int = 0,
else:
# 新账号: 注册 + Codex 授权
progress_update(phase="注册", step="注册 OpenAI...")
- register_success, codex_data = register_and_authorize(email, password)
+ register_success, codex_data, new_email_info = register_and_authorize(email, password, team_name=team_name)
+
+ # 如果使用了新邮箱,更新 tracker
+ if new_email_info:
+ new_email = new_email_info["email"]
+ new_password = new_email_info["password"]
+ log.info(f"验证码超时,已切换到新邮箱: {new_email}")
+
+ # 从 tracker 中移除旧邮箱
+ remove_account_from_tracker(_tracker, team_name, email)
+
+ # 添加新邮箱到 tracker
+ add_account_with_password(_tracker, team_name, new_email, new_password, "registered")
+ save_team_tracker(_tracker)
+
+ # 更新当前处理的邮箱信息
+ email = new_email
+ password = new_password
+ result["email"] = email
+ result["password"] = password
# 检查是否是域名黑名单错误
if register_success == "domain_blacklisted":
@@ -643,7 +662,26 @@ def _process_single_account_worker(
register_success = True
else:
log.info(f"[Worker-{worker_id}] 新账号,注册 + 授权...", icon="auth")
- register_success, codex_data = register_and_authorize(email, password)
+ register_success, codex_data, new_email_info = register_and_authorize(email, password, team_name=team_name)
+
+ # 如果使用了新邮箱,更新 tracker
+ if new_email_info:
+ new_email = new_email_info["email"]
+ new_password = new_email_info["password"]
+ log.info(f"[Worker-{worker_id}] 验证码超时,已切换到新邮箱: {new_email}")
+
+ with _tracker_lock:
+ # 从 tracker 中移除旧邮箱
+ remove_account_from_tracker(_tracker, team_name, email)
+ # 添加新邮箱到 tracker
+ add_account_with_password(_tracker, team_name, new_email, new_password, "registered")
+ save_team_tracker(_tracker)
+
+ # 更新当前处理的邮箱信息
+ email = new_email
+ password = new_password
+ result["email"] = email
+ result["password"] = password
if register_success == "domain_blacklisted":
domain = get_domain_from_email(email)
diff --git a/telegram_bot.py b/telegram_bot.py
index cb5749f..7ddade4 100644
--- a/telegram_bot.py
+++ b/telegram_bot.py
@@ -136,6 +136,7 @@ class ProvisionerBot:
("logs_stop", self.cmd_logs_stop),
("dashboard", self.cmd_dashboard),
("import", self.cmd_import),
+ ("verify", self.cmd_verify),
("stock", self.cmd_stock),
("gptmail_keys", self.cmd_gptmail_keys),
("gptmail_add", self.cmd_gptmail_add),
@@ -269,6 +270,7 @@ class ProvisionerBot:
BotCommand("stock", "查看账号库存"),
BotCommand("s2a_config", "配置 S2A 参数"),
BotCommand("import", "导入账号到 team.json"),
+ BotCommand("verify", "验证账号并移除无效账号"),
# GPTMail
BotCommand("gptmail_keys", "查看 GPTMail API Keys"),
BotCommand("gptmail_add", "添加 GPTMail API Key"),
@@ -336,6 +338,7 @@ class ProvisionerBot:
📤 导入账号:
/import - 导入账号到 team.json
+/verify - 验证账号并移除无效账号
或直接发送 JSON 文件
📧 GPTMail 管理:
@@ -2093,7 +2096,7 @@ class ProvisionerBot:
await self.app.bot.edit_message_text(
chat_id=chat_id,
message_id=progress_msg.message_id,
- text=f"🔍 正在验证账号...\n\n⏳ 验证 {total} 个账号的 account_id...",
+ text=f"🔍 正在验证账号...\n\n⏳ 验证 {total} 个账号的 account_id (20 并发)...",
parse_mode="HTML"
)
@@ -2119,9 +2122,10 @@ class ProvisionerBot:
return idx, email, account_id
- # 使用线程池并行验证
- max_workers = min(10, total)
+ # 使用线程池并行验证 (20 并发)
+ max_workers = min(20, total)
completed_count = 0
+ last_update_time = 0
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {executor.submit(verify_account, item): item for item in accounts_to_verify}
@@ -2138,8 +2142,11 @@ class ProvisionerBot:
# 验证失败
failed_accounts.append({"idx": idx, "email": email})
- # 每处理 5 个更新一次进度
- if completed_count % 5 == 0 or completed_count == total:
+ # 每处理 10 个或间隔 1 秒更新一次进度
+ import time
+ current_time = time.time()
+ if completed_count % 10 == 0 or completed_count == total or current_time - last_update_time > 1:
+ last_update_time = current_time
try:
await self.app.bot.edit_message_text(
chat_id=chat_id,
@@ -2208,6 +2215,63 @@ class ProvisionerBot:
except Exception:
pass
+ @admin_only
+ async def cmd_verify(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
+ """手动验证 team.json 中的账号,获取 account_id 并移除无效账号"""
+ chat_id = update.effective_chat.id
+
+ # 检查是否有任务正在运行
+ if self.current_task and not self.current_task.done():
+ await update.message.reply_text(
+ f"⚠️ 有任务正在运行: {self.current_team}\n"
+ "请等待任务完成或使用 /stop 停止后再验证"
+ )
+ return
+
+ # 检查 team.json 是否存在
+ from pathlib import Path
+ team_json_path = Path(TEAM_JSON_FILE)
+ if not team_json_path.exists():
+ await update.message.reply_text("❌ team.json 不存在,请先导入账号")
+ return
+
+ # 统计需要验证的账号
+ import json
+ try:
+ with open(team_json_path, "r", encoding="utf-8") as f:
+ accounts = json.load(f)
+ if not isinstance(accounts, list):
+ accounts = [accounts]
+ except Exception as e:
+ await update.message.reply_text(f"❌ 读取 team.json 失败: {e}")
+ return
+
+ total_accounts = len(accounts)
+ need_verify = sum(1 for acc in accounts if acc.get("token") and not acc.get("account_id"))
+ already_verified = sum(1 for acc in accounts if acc.get("account_id"))
+
+ if need_verify == 0:
+ await update.message.reply_text(
+ f"✅ 无需验证\n\n"
+ f"team.json 共 {total_accounts} 个账号\n"
+ f"已验证: {already_verified}\n"
+ f"待验证: 0",
+ parse_mode="HTML"
+ )
+ return
+
+ await update.message.reply_text(
+ f"🔍 开始验证账号\n\n"
+ f"team.json 共 {total_accounts} 个账号\n"
+ f"已验证: {already_verified}\n"
+ f"待验证: {need_verify}\n\n"
+ f"⏳ 正在验证...",
+ parse_mode="HTML"
+ )
+
+ # 执行验证
+ await self._validate_and_cleanup_accounts(chat_id)
+
async def _import_batch_timeout_callback(self, context: ContextTypes.DEFAULT_TYPE):
"""批量导入超时回调 - 由 job_queue 调用"""
chat_id = context.job.data.get("chat_id")