From 64707768f85404c8953db0f03f4eba91329f6b79 Mon Sep 17 00:00:00 2001 From: kyx236 Date: Fri, 16 Jan 2026 00:32:23 +0800 Subject: [PATCH] 5 --- bot_notifier.py | 26 +++---- browser_automation.py | 177 +++++++++++++++++++++++++++++++++++++++++- run.py | 2 +- s2a_service.py | 88 ++++++++++++++++++++- 4 files changed, 273 insertions(+), 20 deletions(-) diff --git a/bot_notifier.py b/bot_notifier.py index 1a87c1b..0ad69d2 100644 --- a/bot_notifier.py +++ b/bot_notifier.py @@ -61,19 +61,19 @@ class ProgressTracker: bar = make_progress_bar(self.current, self.total, 12) lines = [ - f"Processing: {self.team_name}", + f"正在处理: {self.team_name}", "", - f"Progress: {bar}", - f"Accounts: {self.current}/{self.total}", - f"Success: {self.success} | Failed: {self.failed}", + f"进度: {bar}", + f"账号: {self.current}/{self.total}", + f"成功: {self.success} | 失败: {self.failed}", ] if self.current_account: lines.append("") - lines.append(f"Current: {self.current_account}") + lines.append(f"当前: {self.current_account}") if self.current_step: - lines.append(f"Step: {self.current_step}") + lines.append(f"步骤: {self.current_step}") return "\n".join(lines) @@ -145,7 +145,7 @@ class ProgressTracker: def finish(self): """完成进度跟踪,发送最终状态""" - self.current_step = "Completed!" + self.current_step = "已完成!" if self._loop: asyncio.run_coroutine_threadsafe(self._update_messages(), self._loop) @@ -253,25 +253,25 @@ class BotNotifier: async def notify_task_started(self, team_name: str): """通知任务开始""" - await self.notify(f"Task Started\nTeam: {team_name}") + await self.notify(f"🚀 任务开始\nTeam: {team_name}") async def notify_task_completed(self, team_name: str, success: int, failed: int): """通知任务完成""" if not TELEGRAM_NOTIFY_ON_COMPLETE: return - status = "All Success" if failed == 0 else f"{failed} Failed" + status = "全部成功" if failed == 0 else f"{failed} 个失败" await self.notify( - f"Task Completed\n" + f"✅ 任务完成\n" f"Team: {team_name}\n" - f"Success: {success}\n" - f"Status: {status}" + f"成功: {success}\n" + f"状态: {status}" ) async def notify_error(self, message: str, details: str = ""): """通知错误""" if not TELEGRAM_NOTIFY_ON_ERROR: return - text = f"Error\n{message}" + text = f"❌ 错误\n{message}" if details: text += f"\n{details[:500]}" await self.notify(text) diff --git a/browser_automation.py b/browser_automation.py index fdf587f..9cc2349 100644 --- a/browser_automation.py +++ b/browser_automation.py @@ -26,6 +26,10 @@ from cpa_service import ( cpa_poll_auth_status, is_cpa_callback_url ) +from s2a_service import ( + s2a_generate_auth_url, + s2a_create_account_from_oauth +) from logger import log @@ -1775,7 +1779,7 @@ def register_and_authorize(email: str, password: str) -> tuple: tuple: (register_success, codex_data) - register_success: True/False/"domain_blacklisted" - CRS 模式: codex_data 包含 tokens - - CPA 模式: codex_data 为 None (后台自动处理) + - CPA/S2A 模式: codex_data 为 None (后台自动处理) """ with browser_context_with_retry(max_browser_retries=2) as ctx: for attempt in ctx.attempts(): @@ -1802,6 +1806,10 @@ def register_and_authorize(email: str, password: str) -> tuple: # CPA 模式: 授权成功即完成,后台自动处理账号 success = perform_cpa_authorization(ctx.page, email, password) return True, None if success else (True, None) # 注册成功,授权可能失败 + elif AUTH_PROVIDER == "s2a": + # S2A 模式: 授权成功即完成,后台自动处理账号 + success = perform_s2a_authorization(ctx.page, email, password) + return True, None if success else (True, None) # 注册成功,授权可能失败 else: # CRS 模式: 需要 codex_data codex_data = perform_codex_authorization(ctx.page, email, password) @@ -1825,7 +1833,7 @@ def authorize_only(email: str, password: str) -> tuple[bool, dict]: Returns: tuple: (success, codex_data) - CRS 模式: codex_data 包含 tokens - - CPA 模式: codex_data 为 None (后台自动处理) + - CPA/S2A 模式: codex_data 为 None (后台自动处理) """ with browser_context_with_retry(max_browser_retries=2) as ctx: for attempt in ctx.attempts(): @@ -1841,6 +1849,16 @@ def authorize_only(email: str, password: str) -> tuple[bool, dict]: log.warning("CPA 授权失败,准备重试...") continue return False, None + elif AUTH_PROVIDER == "s2a": + log.info("已注册账号,使用 S2A 进行 Codex 授权...", icon="auth") + success = perform_s2a_authorization(ctx.page, email, password) + if success: + return True, None # S2A 模式不返回 codex_data + else: + if attempt < ctx.max_retries - 1: + log.warning("S2A 授权失败,准备重试...") + continue + return False, None else: # CRS 模式 log.info("已注册账号,直接进行 Codex 授权...", icon="auth") @@ -2244,6 +2262,161 @@ def perform_cpa_authorization_with_otp(page, email: str) -> bool: return False +# ==================== S2A 授权函数 ==================== + +def perform_s2a_authorization(page, email: str, password: str) -> bool: + """执行 S2A 授权流程 (密码登录) + + Args: + page: 浏览器实例 + email: 邮箱地址 + password: 密码 + + Returns: + bool: 授权是否成功 + """ + log.info(f"开始 S2A 授权: {email}", icon="code") + + # 生成授权 URL + auth_url, session_id = s2a_generate_auth_url() + if not auth_url or not session_id: + log.error("无法获取 S2A 授权 URL") + return False + + # 打开授权页面 + log.step("打开 S2A 授权页面...") + log.info(f"[URL] S2A授权URL: {auth_url}", icon="browser") + page.get(auth_url) + wait_for_page_stable(page, timeout=5) + log_current_url(page, "S2A授权页面加载完成", force=True) + + # 检测错误页面 + check_and_handle_error_page(page) + + try: + # 输入邮箱 + log.step("输入邮箱...") + email_input = wait_for_element(page, 'css:input[type="email"]', timeout=10) + if not email_input: + email_input = wait_for_element(page, 'css:input[name="email"]', timeout=5) + if email_input: + type_slowly(page, 'css:input[type="email"], input[name="email"]', email, base_delay=0.06) + + # 点击继续 + log.step("点击继续...") + continue_btn = wait_for_element(page, 'css:button[type="submit"]', timeout=5) + if continue_btn: + old_url = page.url + continue_btn.click() + wait_for_url_change(page, old_url, timeout=8) + log_url_change(page, old_url, "S2A-输入邮箱后点击继续") + except Exception as e: + log.warning(f"S2A 邮箱输入步骤异常: {e}") + + log_current_url(page, "S2A-邮箱步骤完成后") + + # 输入密码 + current_url = page.url + if "/password" in current_url: + try: + log.step("输入密码...") + password_input = wait_for_element(page, 'css:input[type="password"]', timeout=10) + + if password_input: + type_slowly(page, 'css:input[type="password"]', password, base_delay=0.06) + + log.step("点击继续...") + continue_btn = wait_for_element(page, 'css:button[type="submit"]', timeout=5) + if continue_btn: + old_url = page.url + continue_btn.click() + wait_for_url_change(page, old_url, timeout=8) + log_url_change(page, old_url, "S2A-输入密码后点击继续") + except Exception as e: + log.warning(f"S2A 密码输入步骤异常: {e}") + + log_current_url(page, "S2A-密码步骤完成后") + + # 等待授权回调 (S2A 使用 localhost 回调) + max_wait = 45 + start_time = time.time() + callback_url = None + progress_shown = False + last_url_in_loop = None + log.step(f"等待 S2A 授权回调 (最多 {max_wait}s)...") + + while time.time() - start_time < max_wait: + try: + current_url = page.url + + # 记录 URL 变化 + if current_url != last_url_in_loop: + log_current_url(page, "S2A等待回调中") + last_url_in_loop = current_url + + # 检查是否到达回调页面 (S2A 使用 localhost:1455 或类似端口) + if "localhost" in current_url and "code=" in current_url: + if progress_shown: + log.progress_clear() + progress_shown = False + + callback_url = current_url + log.success(f"捕获 S2A 回调 URL") + break + + # 检测错误 + check_and_handle_error(page) + + # 检查是否需要点击 Authorize + try: + auth_btn = page.ele('css:button[type="submit"]', timeout=0.5) + if auth_btn: + btn_text = auth_btn.text.lower() if auth_btn.text else "" + if 'authorize' in btn_text or '授权' in btn_text or 'continue' in btn_text: + log.step("点击授权按钮...") + old_url = page.url + auth_btn.click() + wait_for_url_change(page, old_url, timeout=5) + log_url_change(page, old_url, "S2A点击授权按钮后") + except Exception: + pass + + elapsed = int(time.time() - start_time) + log.progress_inline(f"[S2A等待中... {elapsed}s]") + progress_shown = True + time.sleep(1.5) + + except Exception as e: + if progress_shown: + log.progress_clear() + progress_shown = False + log.warning(f"S2A检查异常: {e}") + time.sleep(1.5) + + if progress_shown: + log.progress_clear() + + if not callback_url: + log.error("S2A 无法获取回调链接") + return False + + # 从回调 URL 中提取 code + code = extract_code_from_url(callback_url) + if not code: + log.error("S2A 无法从回调链接提取授权码") + return False + + # S2A 特有流程: 用授权码创建账号 (传入完整邮箱用于验证) + log.step("正在提交 S2A 授权码...") + result = s2a_create_account_from_oauth(code, session_id, name=email) + if result: + log.success("S2A 授权流程完成") + return True + else: + log.error("S2A 账号入库失败") + return False + + # ==================== 格式3专用: 登录获取 Session ==================== def login_and_get_session(page, email: str, password: str) -> dict: diff --git a/run.py b/run.py index 1c9c696..9450a7a 100644 --- a/run.py +++ b/run.py @@ -404,7 +404,7 @@ def process_accounts(accounts: list, team_name: str) -> list: update_account_status(_tracker, team_name, email, "completed") save_team_tracker(_tracker) - log.success(f"{AUTH_PROVIDER.upper()} 账号处理完成: {email}") + log.success(f"✅ {AUTH_PROVIDER.upper()} 账号处理完成: {email}") else: # CRS 模式: 原有逻辑 if codex_data: diff --git a/s2a_service.py b/s2a_service.py index 6bf08ac..857ff35 100644 --- a/s2a_service.py +++ b/s2a_service.py @@ -252,10 +252,10 @@ def s2a_generate_auth_url(proxy_id: Optional[int] = None) -> Tuple[Optional[str] session_id = data.get("session_id") if auth_url and session_id: - log.success(f"生成 S2A 授权 URL 成功 (Session: {session_id[:16]}...)") + log.success(f"生成 S2A 授权链接成功 (会话: {session_id[:16]}...)") return auth_url, session_id - log.error(f"生成 S2A 授权 URL 失败: HTTP {response.status_code}") + log.error(f"生成 S2A 授权链接失败: HTTP {response.status_code}") return None, None except Exception as e: @@ -263,6 +263,67 @@ def s2a_generate_auth_url(proxy_id: Optional[int] = None) -> Tuple[Optional[str] return None, None +def s2a_verify_account_in_pool(email: str, timeout: int = 10) -> Tuple[bool, Optional[Dict[str, Any]]]: + """验证账号是否已成功入库到 S2A 账号池 + + 通过请求 /admin/accounts 接口,检查第一个账号的 name 是否匹配邮箱 + + Args: + email: 要验证的邮箱地址 + timeout: 超时时间 (秒) + + Returns: + tuple: (是否成功, 账号数据或None) + """ + headers = build_s2a_headers() + + try: + # 使用 search 参数搜索该邮箱 + params = { + "page": 1, + "page_size": 20, + "platform": "", + "type": "", + "status": "", + "search": email, + "timezone": "Asia/Shanghai" + } + + response = http_session.get( + f"{S2A_API_BASE}/admin/accounts", + headers=headers, + params=params, + timeout=timeout + ) + + if response.status_code == 200: + result = response.json() + if result.get("code") == 0: + data = result.get("data", {}) + items = data.get("items", []) + + if items: + # 检查第一个账号的 name 是否匹配 + first_account = items[0] + account_name = first_account.get("name", "") + + # 邮箱匹配检查 (忽略大小写) + if email.lower() in account_name.lower() or account_name.lower() in email.lower(): + return True, first_account + + return False, None + else: + log.warning(f"S2A 验证账号失败: {result.get('message', '未知错误')}") + else: + log.warning(f"S2A 验证账号失败: HTTP {response.status_code}") + + return False, None + + except Exception as e: + log.warning(f"S2A 验证账号异常: {e}") + return False, None + + def s2a_create_account_from_oauth( code: str, session_id: str, @@ -288,6 +349,9 @@ def s2a_create_account_from_oauth( "priority": S2A_PRIORITY, } + # 获取完整邮箱用于后续验证 + full_email = name if "@" in name else "" + if name: payload["name"] = name if proxy_id is not None: @@ -298,6 +362,7 @@ def s2a_create_account_from_oauth( payload["group_ids"] = group_ids try: + log.step("正在提交授权码到 S2A...") response = http_session.post( f"{S2A_API_BASE}/admin/openai/create-from-oauth", headers=headers, @@ -311,10 +376,25 @@ def s2a_create_account_from_oauth( account_data = result.get("data", {}) account_id = account_data.get("id") account_name = account_data.get("name") - log.success(f"S2A 账号创建成功 (ID: {account_id}, Name: {account_name})") + log.success(f"S2A 授权成功 (ID: {account_id}, 名称: {account_name})") + + # 验证账号是否成功入库 + if full_email or account_name: + verify_email = full_email or account_name + log.step(f"正在验证账号入库状态...") + verified, verified_data = s2a_verify_account_in_pool(verify_email) + + if verified: + verified_id = verified_data.get("id", "") + verified_name = verified_data.get("name", "") + log.success(f"✅ 账号入库验证成功 (ID: {verified_id}, 名称: {verified_name})") + else: + log.warning(f"⚠️ 账号入库验证失败,但授权已成功") + return account_data else: - log.error(f"S2A 账号创建失败: {result.get('message', 'Unknown error')}") + error_msg = result.get('message', '未知错误') + log.error(f"S2A 账号创建失败: {error_msg}") else: log.error(f"S2A 账号创建失败: HTTP {response.status_code}")