diff --git a/bot_notifier.py b/bot_notifier.py
index 1a87c1b..0ad69d2 100644
--- a/bot_notifier.py
+++ b/bot_notifier.py
@@ -61,19 +61,19 @@ class ProgressTracker:
bar = make_progress_bar(self.current, self.total, 12)
lines = [
- f"Processing: {self.team_name}",
+ f"正在处理: {self.team_name}",
"",
- f"Progress: {bar}",
- f"Accounts: {self.current}/{self.total}",
- f"Success: {self.success} | Failed: {self.failed}",
+ f"进度: {bar}",
+ f"账号: {self.current}/{self.total}",
+ f"成功: {self.success} | 失败: {self.failed}",
]
if self.current_account:
lines.append("")
- lines.append(f"Current: {self.current_account}")
+ lines.append(f"当前: {self.current_account}")
if self.current_step:
- lines.append(f"Step: {self.current_step}")
+ lines.append(f"步骤: {self.current_step}")
return "\n".join(lines)
@@ -145,7 +145,7 @@ class ProgressTracker:
def finish(self):
"""完成进度跟踪,发送最终状态"""
- self.current_step = "Completed!"
+ self.current_step = "已完成!"
if self._loop:
asyncio.run_coroutine_threadsafe(self._update_messages(), self._loop)
@@ -253,25 +253,25 @@ class BotNotifier:
async def notify_task_started(self, team_name: str):
"""通知任务开始"""
- await self.notify(f"Task Started\nTeam: {team_name}")
+ await self.notify(f"🚀 任务开始\nTeam: {team_name}")
async def notify_task_completed(self, team_name: str, success: int, failed: int):
"""通知任务完成"""
if not TELEGRAM_NOTIFY_ON_COMPLETE:
return
- status = "All Success" if failed == 0 else f"{failed} Failed"
+ status = "全部成功" if failed == 0 else f"{failed} 个失败"
await self.notify(
- f"Task Completed\n"
+ f"✅ 任务完成\n"
f"Team: {team_name}\n"
- f"Success: {success}\n"
- f"Status: {status}"
+ f"成功: {success}\n"
+ f"状态: {status}"
)
async def notify_error(self, message: str, details: str = ""):
"""通知错误"""
if not TELEGRAM_NOTIFY_ON_ERROR:
return
- text = f"Error\n{message}"
+ text = f"❌ 错误\n{message}"
if details:
text += f"\n{details[:500]}"
await self.notify(text)
diff --git a/browser_automation.py b/browser_automation.py
index fdf587f..9cc2349 100644
--- a/browser_automation.py
+++ b/browser_automation.py
@@ -26,6 +26,10 @@ from cpa_service import (
cpa_poll_auth_status,
is_cpa_callback_url
)
+from s2a_service import (
+ s2a_generate_auth_url,
+ s2a_create_account_from_oauth
+)
from logger import log
@@ -1775,7 +1779,7 @@ def register_and_authorize(email: str, password: str) -> tuple:
tuple: (register_success, codex_data)
- register_success: True/False/"domain_blacklisted"
- CRS 模式: codex_data 包含 tokens
- - CPA 模式: codex_data 为 None (后台自动处理)
+ - CPA/S2A 模式: codex_data 为 None (后台自动处理)
"""
with browser_context_with_retry(max_browser_retries=2) as ctx:
for attempt in ctx.attempts():
@@ -1802,6 +1806,10 @@ def register_and_authorize(email: str, password: str) -> tuple:
# CPA 模式: 授权成功即完成,后台自动处理账号
success = perform_cpa_authorization(ctx.page, email, password)
return True, None if success else (True, None) # 注册成功,授权可能失败
+ elif AUTH_PROVIDER == "s2a":
+ # S2A 模式: 授权成功即完成,后台自动处理账号
+ success = perform_s2a_authorization(ctx.page, email, password)
+ return True, None if success else (True, None) # 注册成功,授权可能失败
else:
# CRS 模式: 需要 codex_data
codex_data = perform_codex_authorization(ctx.page, email, password)
@@ -1825,7 +1833,7 @@ def authorize_only(email: str, password: str) -> tuple[bool, dict]:
Returns:
tuple: (success, codex_data)
- CRS 模式: codex_data 包含 tokens
- - CPA 模式: codex_data 为 None (后台自动处理)
+ - CPA/S2A 模式: codex_data 为 None (后台自动处理)
"""
with browser_context_with_retry(max_browser_retries=2) as ctx:
for attempt in ctx.attempts():
@@ -1841,6 +1849,16 @@ def authorize_only(email: str, password: str) -> tuple[bool, dict]:
log.warning("CPA 授权失败,准备重试...")
continue
return False, None
+ elif AUTH_PROVIDER == "s2a":
+ log.info("已注册账号,使用 S2A 进行 Codex 授权...", icon="auth")
+ success = perform_s2a_authorization(ctx.page, email, password)
+ if success:
+ return True, None # S2A 模式不返回 codex_data
+ else:
+ if attempt < ctx.max_retries - 1:
+ log.warning("S2A 授权失败,准备重试...")
+ continue
+ return False, None
else:
# CRS 模式
log.info("已注册账号,直接进行 Codex 授权...", icon="auth")
@@ -2244,6 +2262,161 @@ def perform_cpa_authorization_with_otp(page, email: str) -> bool:
return False
+# ==================== S2A 授权函数 ====================
+
+def perform_s2a_authorization(page, email: str, password: str) -> bool:
+ """执行 S2A 授权流程 (密码登录)
+
+ Args:
+ page: 浏览器实例
+ email: 邮箱地址
+ password: 密码
+
+ Returns:
+ bool: 授权是否成功
+ """
+ log.info(f"开始 S2A 授权: {email}", icon="code")
+
+ # 生成授权 URL
+ auth_url, session_id = s2a_generate_auth_url()
+ if not auth_url or not session_id:
+ log.error("无法获取 S2A 授权 URL")
+ return False
+
+ # 打开授权页面
+ log.step("打开 S2A 授权页面...")
+ log.info(f"[URL] S2A授权URL: {auth_url}", icon="browser")
+ page.get(auth_url)
+ wait_for_page_stable(page, timeout=5)
+ log_current_url(page, "S2A授权页面加载完成", force=True)
+
+ # 检测错误页面
+ check_and_handle_error_page(page)
+
+ try:
+ # 输入邮箱
+ log.step("输入邮箱...")
+ email_input = wait_for_element(page, 'css:input[type="email"]', timeout=10)
+ if not email_input:
+ email_input = wait_for_element(page, 'css:input[name="email"]', timeout=5)
+ if email_input:
+ type_slowly(page, 'css:input[type="email"], input[name="email"]', email, base_delay=0.06)
+
+ # 点击继续
+ log.step("点击继续...")
+ continue_btn = wait_for_element(page, 'css:button[type="submit"]', timeout=5)
+ if continue_btn:
+ old_url = page.url
+ continue_btn.click()
+ wait_for_url_change(page, old_url, timeout=8)
+ log_url_change(page, old_url, "S2A-输入邮箱后点击继续")
+ except Exception as e:
+ log.warning(f"S2A 邮箱输入步骤异常: {e}")
+
+ log_current_url(page, "S2A-邮箱步骤完成后")
+
+ # 输入密码
+ current_url = page.url
+ if "/password" in current_url:
+ try:
+ log.step("输入密码...")
+ password_input = wait_for_element(page, 'css:input[type="password"]', timeout=10)
+
+ if password_input:
+ type_slowly(page, 'css:input[type="password"]', password, base_delay=0.06)
+
+ log.step("点击继续...")
+ continue_btn = wait_for_element(page, 'css:button[type="submit"]', timeout=5)
+ if continue_btn:
+ old_url = page.url
+ continue_btn.click()
+ wait_for_url_change(page, old_url, timeout=8)
+ log_url_change(page, old_url, "S2A-输入密码后点击继续")
+ except Exception as e:
+ log.warning(f"S2A 密码输入步骤异常: {e}")
+
+ log_current_url(page, "S2A-密码步骤完成后")
+
+ # 等待授权回调 (S2A 使用 localhost 回调)
+ max_wait = 45
+ start_time = time.time()
+ callback_url = None
+ progress_shown = False
+ last_url_in_loop = None
+ log.step(f"等待 S2A 授权回调 (最多 {max_wait}s)...")
+
+ while time.time() - start_time < max_wait:
+ try:
+ current_url = page.url
+
+ # 记录 URL 变化
+ if current_url != last_url_in_loop:
+ log_current_url(page, "S2A等待回调中")
+ last_url_in_loop = current_url
+
+ # 检查是否到达回调页面 (S2A 使用 localhost:1455 或类似端口)
+ if "localhost" in current_url and "code=" in current_url:
+ if progress_shown:
+ log.progress_clear()
+ progress_shown = False
+
+ callback_url = current_url
+ log.success(f"捕获 S2A 回调 URL")
+ break
+
+ # 检测错误
+ check_and_handle_error(page)
+
+ # 检查是否需要点击 Authorize
+ try:
+ auth_btn = page.ele('css:button[type="submit"]', timeout=0.5)
+ if auth_btn:
+ btn_text = auth_btn.text.lower() if auth_btn.text else ""
+ if 'authorize' in btn_text or '授权' in btn_text or 'continue' in btn_text:
+ log.step("点击授权按钮...")
+ old_url = page.url
+ auth_btn.click()
+ wait_for_url_change(page, old_url, timeout=5)
+ log_url_change(page, old_url, "S2A点击授权按钮后")
+ except Exception:
+ pass
+
+ elapsed = int(time.time() - start_time)
+ log.progress_inline(f"[S2A等待中... {elapsed}s]")
+ progress_shown = True
+ time.sleep(1.5)
+
+ except Exception as e:
+ if progress_shown:
+ log.progress_clear()
+ progress_shown = False
+ log.warning(f"S2A检查异常: {e}")
+ time.sleep(1.5)
+
+ if progress_shown:
+ log.progress_clear()
+
+ if not callback_url:
+ log.error("S2A 无法获取回调链接")
+ return False
+
+ # 从回调 URL 中提取 code
+ code = extract_code_from_url(callback_url)
+ if not code:
+ log.error("S2A 无法从回调链接提取授权码")
+ return False
+
+ # S2A 特有流程: 用授权码创建账号 (传入完整邮箱用于验证)
+ log.step("正在提交 S2A 授权码...")
+ result = s2a_create_account_from_oauth(code, session_id, name=email)
+ if result:
+ log.success("S2A 授权流程完成")
+ return True
+ else:
+ log.error("S2A 账号入库失败")
+ return False
+
+
# ==================== 格式3专用: 登录获取 Session ====================
def login_and_get_session(page, email: str, password: str) -> dict:
diff --git a/run.py b/run.py
index 1c9c696..9450a7a 100644
--- a/run.py
+++ b/run.py
@@ -404,7 +404,7 @@ def process_accounts(accounts: list, team_name: str) -> list:
update_account_status(_tracker, team_name, email, "completed")
save_team_tracker(_tracker)
- log.success(f"{AUTH_PROVIDER.upper()} 账号处理完成: {email}")
+ log.success(f"✅ {AUTH_PROVIDER.upper()} 账号处理完成: {email}")
else:
# CRS 模式: 原有逻辑
if codex_data:
diff --git a/s2a_service.py b/s2a_service.py
index 6bf08ac..857ff35 100644
--- a/s2a_service.py
+++ b/s2a_service.py
@@ -252,10 +252,10 @@ def s2a_generate_auth_url(proxy_id: Optional[int] = None) -> Tuple[Optional[str]
session_id = data.get("session_id")
if auth_url and session_id:
- log.success(f"生成 S2A 授权 URL 成功 (Session: {session_id[:16]}...)")
+ log.success(f"生成 S2A 授权链接成功 (会话: {session_id[:16]}...)")
return auth_url, session_id
- log.error(f"生成 S2A 授权 URL 失败: HTTP {response.status_code}")
+ log.error(f"生成 S2A 授权链接失败: HTTP {response.status_code}")
return None, None
except Exception as e:
@@ -263,6 +263,67 @@ def s2a_generate_auth_url(proxy_id: Optional[int] = None) -> Tuple[Optional[str]
return None, None
+def s2a_verify_account_in_pool(email: str, timeout: int = 10) -> Tuple[bool, Optional[Dict[str, Any]]]:
+ """验证账号是否已成功入库到 S2A 账号池
+
+ 通过请求 /admin/accounts 接口,检查第一个账号的 name 是否匹配邮箱
+
+ Args:
+ email: 要验证的邮箱地址
+ timeout: 超时时间 (秒)
+
+ Returns:
+ tuple: (是否成功, 账号数据或None)
+ """
+ headers = build_s2a_headers()
+
+ try:
+ # 使用 search 参数搜索该邮箱
+ params = {
+ "page": 1,
+ "page_size": 20,
+ "platform": "",
+ "type": "",
+ "status": "",
+ "search": email,
+ "timezone": "Asia/Shanghai"
+ }
+
+ response = http_session.get(
+ f"{S2A_API_BASE}/admin/accounts",
+ headers=headers,
+ params=params,
+ timeout=timeout
+ )
+
+ if response.status_code == 200:
+ result = response.json()
+ if result.get("code") == 0:
+ data = result.get("data", {})
+ items = data.get("items", [])
+
+ if items:
+ # 检查第一个账号的 name 是否匹配
+ first_account = items[0]
+ account_name = first_account.get("name", "")
+
+ # 邮箱匹配检查 (忽略大小写)
+ if email.lower() in account_name.lower() or account_name.lower() in email.lower():
+ return True, first_account
+
+ return False, None
+ else:
+ log.warning(f"S2A 验证账号失败: {result.get('message', '未知错误')}")
+ else:
+ log.warning(f"S2A 验证账号失败: HTTP {response.status_code}")
+
+ return False, None
+
+ except Exception as e:
+ log.warning(f"S2A 验证账号异常: {e}")
+ return False, None
+
+
def s2a_create_account_from_oauth(
code: str,
session_id: str,
@@ -288,6 +349,9 @@ def s2a_create_account_from_oauth(
"priority": S2A_PRIORITY,
}
+ # 获取完整邮箱用于后续验证
+ full_email = name if "@" in name else ""
+
if name:
payload["name"] = name
if proxy_id is not None:
@@ -298,6 +362,7 @@ def s2a_create_account_from_oauth(
payload["group_ids"] = group_ids
try:
+ log.step("正在提交授权码到 S2A...")
response = http_session.post(
f"{S2A_API_BASE}/admin/openai/create-from-oauth",
headers=headers,
@@ -311,10 +376,25 @@ def s2a_create_account_from_oauth(
account_data = result.get("data", {})
account_id = account_data.get("id")
account_name = account_data.get("name")
- log.success(f"S2A 账号创建成功 (ID: {account_id}, Name: {account_name})")
+ log.success(f"S2A 授权成功 (ID: {account_id}, 名称: {account_name})")
+
+ # 验证账号是否成功入库
+ if full_email or account_name:
+ verify_email = full_email or account_name
+ log.step(f"正在验证账号入库状态...")
+ verified, verified_data = s2a_verify_account_in_pool(verify_email)
+
+ if verified:
+ verified_id = verified_data.get("id", "")
+ verified_name = verified_data.get("name", "")
+ log.success(f"✅ 账号入库验证成功 (ID: {verified_id}, 名称: {verified_name})")
+ else:
+ log.warning(f"⚠️ 账号入库验证失败,但授权已成功")
+
return account_data
else:
- log.error(f"S2A 账号创建失败: {result.get('message', 'Unknown error')}")
+ error_msg = result.get('message', '未知错误')
+ log.error(f"S2A 账号创建失败: {error_msg}")
else:
log.error(f"S2A 账号创建失败: HTTP {response.status_code}")