From 7e92e12c79a7bfc741dea028bf38c127184c476e Mon Sep 17 00:00:00 2001 From: kyx236 Date: Sun, 18 Jan 2026 05:38:14 +0800 Subject: [PATCH] update --- bot_notifier.py | 142 +++++++++++++++++++++++++++++++++++------- browser_automation.py | 97 +++++++++++++++++++++++++---- run.py | 136 +++++++++++++++++++--------------------- team_service.py | 65 +++++++++++++------ telegram_bot.py | 130 +++++++++++++++++++++++++++++++++++--- 5 files changed, 438 insertions(+), 132 deletions(-) diff --git a/bot_notifier.py b/bot_notifier.py index ee03b27..7b3284d 100644 --- a/bot_notifier.py +++ b/bot_notifier.py @@ -41,16 +41,21 @@ def make_progress_bar(current: int, total: int, width: int = 10) -> str: class ProgressTracker: """进度跟踪器 - 用于实时更新 Telegram 消息显示进度""" - def __init__(self, bot: Bot, chat_ids: List[int], team_name: str, total: int): + def __init__(self, bot: Bot, chat_ids: List[int], team_name: str, total: int, + team_index: int = 0, teams_total: int = 0, include_owner: bool = False): self.bot = bot self.chat_ids = chat_ids self.team_name = team_name self.total = total + self.team_index = team_index # 当前 Team 序号 (从 1 开始) + self.teams_total = teams_total # Team 总数 + self.include_owner = include_owner # 是否包含 Owner self.current = 0 self.success = 0 self.failed = 0 self.current_account = "" self.current_step = "" + self.current_role = "" # 当前账号角色 (member/owner) self.messages: Dict[int, Message] = {} # chat_id -> Message self._last_update = 0 self._update_interval = 2 # 最小更新间隔 (秒) @@ -60,17 +65,24 @@ class ProgressTracker: """生成进度消息文本""" bar = make_progress_bar(self.current, self.total, 12) + # 标题行:显示 Team 序号 + if self.teams_total > 0: + title = f"📦 Team [{self.team_index}/{self.teams_total}]: {self.team_name}" + else: + title = f"📦 正在处理: {self.team_name}" + lines = [ - f"正在处理: {self.team_name}", + title, "", f"进度: {bar}", - f"账号: {self.current}/{self.total}", + f"账号: {self.current}/{self.total}" + (f" (含 Owner)" if self.include_owner else ""), f"成功: {self.success} | 失败: {self.failed}", ] if self.current_account: lines.append("") - lines.append(f"当前: {self.current_account}") + role_tag = " 👑" if self.current_role == "owner" else "" + lines.append(f"当前: {self.current_account}{role_tag}") if self.current_step: lines.append(f"步骤: {self.current_step}") @@ -105,10 +117,14 @@ class ProgressTracker: except TelegramError: pass - def _schedule_update(self): - """调度消息更新 (限流)""" + def _schedule_update(self, force: bool = False): + """调度消息更新 (限流) + + Args: + force: 是否强制更新 (忽略限流) + """ now = time.time() - if now - self._last_update < self._update_interval: + if not force and now - self._last_update < self._update_interval: return self._last_update = now @@ -121,7 +137,7 @@ class ProgressTracker: self._loop = loop asyncio.run_coroutine_threadsafe(self._send_initial_message(), loop) - def update(self, current: int = None, account: str = None, step: str = None): + def update(self, current: int = None, account: str = None, step: str = None, role: str = None): """更新进度 (供同步代码调用)""" if current is not None: self.current = current @@ -129,6 +145,8 @@ class ProgressTracker: self.current_account = account if step is not None: self.current_step = step + if role is not None: + self.current_role = role self._schedule_update() @@ -141,13 +159,16 @@ class ProgressTracker: self.failed += 1 self.current_account = "" self.current_step = "" - self._schedule_update() + self.current_role = "" + # 最后一个账号完成时强制更新,确保显示 100% + is_last = self.current >= self.total + self._schedule_update(force=is_last) def finish(self): """完成进度跟踪,发送最终状态""" self.current_step = "已完成!" - if self._loop: - asyncio.run_coroutine_threadsafe(self._update_messages(), self._loop) + # 强制更新最终状态 + self._schedule_update(force=True) class BotNotifier: @@ -197,8 +218,8 @@ class BotNotifier: text=message, parse_mode=parse_mode ) - except TelegramError: - pass + except TelegramError as e: + print(f"[BotNotifier] 发送消息失败 (chat_id={chat_id}): {e}") async def _send_photo_to_all(self, photo_path: str, caption: str = ""): """发送图片到所有管理员""" @@ -232,10 +253,20 @@ class BotNotifier: """直接发送通知 (阻塞)""" await self._send_to_all(message) - def create_progress(self, team_name: str, total: int) -> ProgressTracker: - """创建进度跟踪器""" + def create_progress(self, team_name: str, total: int, team_index: int = 0, + teams_total: int = 0, include_owner: bool = False) -> ProgressTracker: + """创建进度跟踪器 + + Args: + team_name: Team 名称 + total: 账号总数 + team_index: 当前 Team 序号 (从 1 开始) + teams_total: Team 总数 + include_owner: 是否包含 Owner + """ self._current_progress = ProgressTracker( - self.bot, self.chat_ids, team_name, total + self.bot, self.chat_ids, team_name, total, + team_index=team_index, teams_total=teams_total, include_owner=include_owner ) if self._loop: self._current_progress.start(self._loop) @@ -286,6 +317,56 @@ class BotNotifier: await self.notify(message) + async def notify_team_completed(self, team_name: str, results: list): + """通知单个 Team 处理完成 + + Args: + team_name: Team 名称 + results: 处理结果列表 [{"email", "status", ...}] + """ + if not results: + return + + success_accounts = [r.get("email") for r in results if r.get("status") == "success"] + failed_accounts = [r.get("email") for r in results if r.get("status") != "success"] + + success_count = len(success_accounts) + failed_count = len(failed_accounts) + total = len(results) + + # 状态图标 + if failed_count == 0: + icon = "✅" + status = "全部成功" + elif success_count == 0: + icon = "❌" + status = "全部失败" + else: + icon = "⚠️" + status = f"{failed_count} 个失败" + + # 构建消息 + message = ( + f"{icon} Team 处理完成\n" + f"Team: {team_name}\n" + f"结果: {success_count}/{total} 成功\n" + f"状态: {status}" + ) + + # 列出成功的账号 + if success_accounts: + message += "\n\n✓ 成功:" + for email in success_accounts: + message += f"\n {email}" + + # 列出失败的账号 + if failed_accounts: + message += "\n\n✗ 失败:" + for email in failed_accounts: + message += f"\n {email}" + + await self.notify(message) + async def notify_error(self, message: str, details: str = ""): """通知错误""" if not TELEGRAM_NOTIFY_ON_ERROR: @@ -343,17 +424,26 @@ def send_screenshot_sync(photo_path: str, caption: str = ""): # ==================== 进度更新接口 (供 run.py 使用) ==================== -def progress_start(team_name: str, total: int) -> Optional[ProgressTracker]: - """开始进度跟踪""" +def progress_start(team_name: str, total: int, team_index: int = 0, + teams_total: int = 0, include_owner: bool = False) -> Optional[ProgressTracker]: + """开始进度跟踪 + + Args: + team_name: Team 名称 + total: 账号总数 + team_index: 当前 Team 序号 (从 1 开始) + teams_total: Team 总数 + include_owner: 是否包含 Owner + """ if _notifier: - return _notifier.create_progress(team_name, total) + return _notifier.create_progress(team_name, total, team_index, teams_total, include_owner) return None -def progress_update(account: str = None, step: str = None): +def progress_update(account: str = None, step: str = None, role: str = None): """更新当前进度""" if _notifier and _notifier.get_progress(): - _notifier.get_progress().update(account=account, step=step) + _notifier.get_progress().update(account=account, step=step, role=role) def progress_account_done(email: str, success: bool): @@ -366,3 +456,13 @@ def progress_finish(): """完成进度跟踪""" if _notifier: _notifier.clear_progress() + + +def notify_team_completed_sync(team_name: str, results: list): + """同步方式通知单个 Team 完成 (供 run.py 使用)""" + if _notifier and _notifier._loop: + import asyncio + asyncio.run_coroutine_threadsafe( + _notifier.notify_team_completed(team_name, results), + _notifier._loop + ) diff --git a/browser_automation.py b/browser_automation.py index 0748054..7e5f229 100644 --- a/browser_automation.py +++ b/browser_automation.py @@ -35,6 +35,27 @@ from s2a_service import ( from logger import log +# ==================== 停止检查 ==================== +class ShutdownRequested(Exception): + """停止请求异常 - 用于中断浏览器操作""" + pass + + +def check_shutdown(): + """检查是否收到停止请求,如果是则抛出异常""" + try: + import run + if run._shutdown_requested: + log.warning("检测到停止请求,中断当前操作...") + raise ShutdownRequested("用户请求停止") + except ImportError: + pass + except ShutdownRequested: + raise + except Exception: + pass + + # ==================== 浏览器配置常量 ==================== BROWSER_MAX_RETRIES = 3 # 浏览器启动最大重试次数 BROWSER_RETRY_DELAY = 2 # 重试间隔 (秒) @@ -198,24 +219,49 @@ def cleanup_chrome_processes(): """清理残留的 Chrome 进程 (跨平台支持)""" try: if platform.system() == "Windows": - # Windows: 使用 tasklist 和 taskkill - result = subprocess.run( - ['tasklist', '/FI', 'IMAGENAME eq chrome.exe', '/FO', 'CSV'], - capture_output=True, text=True, timeout=5 - ) - - if 'chrome.exe' in result.stdout: + # Windows: 使用 taskkill 清理 chromedriver 和 chrome + try: subprocess.run( ['taskkill', '/F', '/IM', 'chromedriver.exe'], capture_output=True, timeout=5 ) - log.step("已清理 chromedriver 残留进程") + except Exception: + pass + + # 清理无头模式的 chrome 进程 (带 --headless 参数的) + try: + result = subprocess.run( + ['wmic', 'process', 'where', "name='chrome.exe' and commandline like '%--headless%'", 'get', 'processid'], + capture_output=True, text=True, timeout=5 + ) + for line in result.stdout.strip().split('\n'): + pid = line.strip() + if pid.isdigit(): + subprocess.run(['taskkill', '/F', '/PID', pid], capture_output=True, timeout=5) + except Exception: + pass + + log.step("已清理 Chrome 残留进程") else: # Linux/Mac: 使用 pkill - subprocess.run( - ['pkill', '-f', 'chromedriver'], - capture_output=True, timeout=5 - ) + try: + subprocess.run( + ['pkill', '-f', 'chromedriver'], + capture_output=True, timeout=5 + ) + except Exception: + pass + + # 清理无头模式的 chrome 进程 + try: + subprocess.run( + ['pkill', '-f', 'chrome.*--headless'], + capture_output=True, timeout=5 + ) + except Exception: + pass + + log.step("已清理 Chrome 残留进程") except Exception: pass # 静默处理,不影响主流程 @@ -484,6 +530,9 @@ class BrowserRetryContext: if not self._should_continue: break + # 检查停止请求 + check_shutdown() + self.current_attempt = attempt # 非首次尝试时的清理和等待 @@ -495,8 +544,12 @@ class BrowserRetryContext: # 初始化浏览器 try: + check_shutdown() # 再次检查 self.page = init_browser() yield attempt + except ShutdownRequested: + self._should_continue = False + raise except Exception as e: log.error(f"浏览器初始化失败: {e}") if attempt >= self.max_retries - 1: @@ -504,6 +557,11 @@ class BrowserRetryContext: def handle_error(self, error: Exception): """处理错误,决定是否继续重试""" + # 如果是停止请求,直接停止 + if isinstance(error, ShutdownRequested): + self._should_continue = False + return + log.error(f"流程异常: {error}") if self.current_attempt >= self.max_retries - 1: self._should_continue = False @@ -550,6 +608,9 @@ def wait_for_page_stable(page, timeout: int = 10, check_interval: float = 0.5) - stable_count = 0 while time.time() - start_time < timeout: + # 检查停止请求 + check_shutdown() + try: # 检查浏览器标签页是否还在加载(favicon 旋转动画) ready_state = page.run_js('return document.readyState', timeout=2) @@ -567,6 +628,8 @@ def wait_for_page_stable(page, timeout: int = 10, check_interval: float = 0.5) - stable_count = 0 last_html_len = current_len time.sleep(check_interval) + except ShutdownRequested: + raise except Exception: time.sleep(check_interval) @@ -626,11 +689,16 @@ def wait_for_element(page, selector: str, timeout: int = 10, visible: bool = Tru start_time = time.time() while time.time() - start_time < timeout: + # 检查停止请求 + check_shutdown() + try: element = page.ele(selector, timeout=1) if element: if not visible or (element.states.is_displayed if hasattr(element, 'states') else True): return element + except ShutdownRequested: + raise except Exception: pass time.sleep(0.3) @@ -653,11 +721,16 @@ def wait_for_url_change(page, old_url: str, timeout: int = 15, contains: str = N start_time = time.time() while time.time() - start_time < timeout: + # 检查停止请求 + check_shutdown() + try: current_url = page.url if current_url != old_url: if contains is None or contains in current_url: return True + except ShutdownRequested: + raise except Exception: pass time.sleep(0.5) diff --git a/run.py b/run.py index 0b9671c..91d88ac 100644 --- a/run.py +++ b/run.py @@ -26,7 +26,7 @@ from team_service import batch_invite_to_team, print_team_summary, check_availab from crs_service import crs_add_account, crs_sync_team_owners, crs_verify_token from cpa_service import cpa_verify_connection from s2a_service import s2a_verify_connection -from browser_automation import register_and_authorize, login_and_authorize_with_otp, authorize_only, login_and_authorize_team_owner +from browser_automation import register_and_authorize, login_and_authorize_with_otp, authorize_only, login_and_authorize_team_owner, ShutdownRequested from utils import ( save_to_csv, load_team_tracker, @@ -44,13 +44,14 @@ from logger import log # 进度更新 (Telegram Bot 使用,导入失败时忽略) try: - from bot_notifier import progress_start, progress_update, progress_account_done, progress_finish + from bot_notifier import progress_start, progress_update, progress_account_done, progress_finish, notify_team_completed_sync except ImportError: # 如果没有 bot_notifier,使用空函数 def progress_start(team_name, total): pass def progress_update(account=None, step=None): pass def progress_account_done(email, success): pass def progress_finish(): pass + def notify_team_completed_sync(team_name, results): pass # ==================== 全局状态 ==================== @@ -96,10 +97,13 @@ if threading.current_thread() is threading.main_thread(): def process_single_team(team: dict) -> tuple[list, list]: +def process_single_team(team: dict, team_index: int = 0, teams_total: int = 0) -> tuple[list, list]: """处理单个 Team 的完整流程 Args: team: Team 配置 + team_index: 当前 Team 序号 (从 1 开始) + teams_total: Team 总数 Returns: tuple: (处理结果列表, 待处理的 Owner 列表) @@ -202,21 +206,42 @@ def process_single_team(team: dict) -> tuple[list, list]: else: log.warning(f"Team {team_name} 没有可用席位,无法邀请新成员") - # ========== 阶段 3: 处理普通成员 (注册 + Codex 授权 + CRS) ========== - if invited_accounts: - log.section(f"阶段 3: 逐个注册 OpenAI + Codex 授权 + CRS 入库") - member_results = process_accounts(invited_accounts, team_name) - results.extend(member_results) + # ========== 合并成员和 Owner 一起处理 ========== + all_to_process = invited_accounts.copy() + + # 添加未完成的 Owner 到处理列表 + from config import INCLUDE_TEAM_OWNERS + include_owner = INCLUDE_TEAM_OWNERS and len(owner_accounts) > 0 + + if include_owner: + for owner in owner_accounts: + all_to_process.append({ + "email": owner["email"], + "password": owner.get("password", DEFAULT_PASSWORD), + "status": owner.get("status", ""), + "role": "owner" + }) + log.info(f"包含 {len(owner_accounts)} 个 Owner 账号一起处理") - # Owner 不在这里处理,统一放到所有 Team 处理完后 + # ========== 阶段 3: 处理所有账号 (注册 + Codex 授权 + 入库) ========== + if all_to_process: + log.section(f"阶段 3: 逐个注册 OpenAI + Codex 授权 + 入库") + all_results = process_accounts( + all_to_process, team_name, + team_index=team_index, teams_total=teams_total, + include_owner=include_owner + ) + results.extend(all_results) # ========== Team 处理完成 ========== success_count = sum(1 for r in results if r["status"] == "success") if results: - log.success(f"{team_name} 成员处理完成: {success_count}/{len(results)} 成功") + log.success(f"{team_name} 处理完成: {success_count}/{len(results)} 成功") + # 发送 Team 完成报告到 Telegram + notify_team_completed_sync(team_name, results) - # 返回未完成的 Owner 列表供后续统一处理 - return results, owner_accounts + # 返回空列表,因为 Owner 已经在这里处理了 + return results, [] def _get_team_by_name(team_name: str) -> dict: @@ -227,12 +252,16 @@ def _get_team_by_name(team_name: str) -> dict: return {} -def process_accounts(accounts: list, team_name: str) -> list: +def process_accounts(accounts: list, team_name: str, team_index: int = 0, + teams_total: int = 0, include_owner: bool = False) -> list: """处理账号列表 (注册/授权/CRS) Args: accounts: 账号列表 [{"email", "password", "status", "role"}] team_name: Team 名称 + team_index: 当前 Team 序号 (从 1 开始) + teams_total: Team 总数 + include_owner: 是否包含 Owner Returns: list: 处理结果 @@ -242,7 +271,7 @@ def process_accounts(accounts: list, team_name: str) -> list: results = [] # 启动进度跟踪 (Telegram Bot) - progress_start(team_name, len(accounts)) + progress_start(team_name, len(accounts), team_index, teams_total, include_owner) for i, account in enumerate(accounts): if _shutdown_requested: @@ -291,7 +320,7 @@ def process_accounts(accounts: list, team_name: str) -> list: log.separator("#", 50) # 更新进度: 当前账号 - progress_update(account=email, step="Starting...") + progress_update(account=email, step="Starting...", role=role) result = { "team": team_name, @@ -331,10 +360,11 @@ def process_accounts(accounts: list, team_name: str) -> list: update_account_status(_tracker, team_name, email, "processing") save_team_tracker(_tracker) - with Timer(f"账号 {email}"): - if is_team_owner_otp: - # 旧格式 Team Owner: 使用 OTP 登录授权 - log.info("Team Owner 账号 (旧格式),使用一次性验证码登录...", icon="auth") + try: + with Timer(f"账号 {email}"): + if is_team_owner_otp: + # 旧格式 Team Owner: 使用 OTP 登录授权 + log.info("Team Owner 账号 (旧格式),使用一次性验证码登录...", icon="auth") progress_update(step="OTP Login...") auth_success, codex_data = login_and_authorize_with_otp(email) register_success = auth_success @@ -469,6 +499,13 @@ def process_accounts(accounts: list, team_name: str) -> list: update_account_status(_tracker, team_name, email, "register_failed") save_team_tracker(_tracker) + except ShutdownRequested: + # 用户请求停止,保存当前状态并退出 + log.warning(f"用户请求停止,当前账号: {email}") + # 不改变账号状态,保持中断前的状态,下次继续处理 + save_team_tracker(_tracker) + break + # 保存到 CSV save_to_csv( email=email, @@ -514,10 +551,10 @@ def run_all_teams(): log.warning(f"发现 {total_incomplete} 个未完成账号,将优先处理") _current_results = [] - all_pending_owners = [] # 收集所有待处理的 Owner + teams_total = len(TEAMS) with Timer("全部流程"): - # ========== 第一阶段: 处理所有 Team 的普通成员 ========== + # ========== 处理所有 Team (成员 + Owner 一起) ========== for i, team in enumerate(TEAMS): if _shutdown_requested: log.warning("检测到中断请求,停止处理...") @@ -525,51 +562,18 @@ def run_all_teams(): log.separator("★", 60) team_email = team.get('account') or team.get('owner_email', '') - log.highlight(f"Team {i + 1}/{len(TEAMS)}: {team['name']} ({team_email})", icon="team") + log.highlight(f"Team {i + 1}/{teams_total}: {team['name']} ({team_email})", icon="team") log.separator("★", 60) - results, pending_owners = process_single_team(team) - - # 收集待处理的 Owner - if pending_owners: - for owner in pending_owners: - all_pending_owners.append({ - "team_name": team["name"], - "email": owner["email"], - "password": owner.get("password", DEFAULT_PASSWORD), - "status": owner.get("status", "team_owner"), - "role": "owner" - }) + # 传递 Team 序号信息 + results, _ = process_single_team(team, team_index=i + 1, teams_total=teams_total) + _current_results.extend(results) # Team 之间的间隔 - if i < len(TEAMS) - 1 and not _shutdown_requested: + if i < teams_total - 1 and not _shutdown_requested: wait_time = 3 log.countdown(wait_time, "下一个 Team") - # ========== 第二阶段: 统一处理所有 Team Owner 的 CRS 授权 ========== - if all_pending_owners and not _shutdown_requested: - log.separator("★", 60) - log.header(f"统一处理 Team Owner CRS 授权 ({len(all_pending_owners)} 个)") - log.separator("★", 60) - - for i, owner in enumerate(all_pending_owners): - if _shutdown_requested: - log.warning("检测到中断请求,停止处理...") - break - - log.separator("#", 50) - log.info(f"Owner {i + 1}/{len(all_pending_owners)}: {owner['email']} ({owner['team_name']})", icon="account") - log.separator("#", 50) - - owner_results = process_accounts([owner], owner["team_name"]) - _current_results.extend(owner_results) - - # Owner 之间的间隔 - if i < len(all_pending_owners) - 1 and not _shutdown_requested: - wait_time = random.randint(5, 15) - log.info(f"等待 {wait_time}s 后处理下一个 Owner...", icon="wait") - time.sleep(wait_time) - # 打印总结 print_summary(_current_results) @@ -592,22 +596,10 @@ def run_single_team(team_index: int = 0): log.info(f"单 Team 模式: {team['name']}", icon="start") _current_results = [] - results, pending_owners = process_single_team(team) + # 单 Team 模式:序号为 1/1 + results, _ = process_single_team(team, team_index=1, teams_total=1) _current_results.extend(results) - # 单 Team 模式下也处理 Owner - if pending_owners: - log.section(f"处理 Team Owner ({len(pending_owners)} 个)") - for owner in pending_owners: - owner_data = { - "email": owner["email"], - "password": owner.get("password", DEFAULT_PASSWORD), - "status": owner.get("status", "team_owner"), - "role": "owner" - } - owner_results = process_accounts([owner_data], team["name"]) - _current_results.extend(owner_results) - print_summary(_current_results) return _current_results diff --git a/team_service.py b/team_service.py index fefe7e8..78b9a2a 100644 --- a/team_service.py +++ b/team_service.py @@ -113,7 +113,7 @@ def preload_all_account_ids() -> tuple[int, int]: Returns: tuple: (success_count, fail_count) """ - from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TaskProgressColumn + import sys success_count = 0 fail_count = 0 @@ -127,42 +127,71 @@ def preload_all_account_ids() -> tuple[int, int]: log.success(f"所有 Team account_id 已缓存 ({len(teams_with_token)} 个)") return len(teams_with_token), 0 + total = len(teams_with_token) log.info(f"预加载 {len(teams_need_fetch)} 个 Team 的 account_id...", icon="sync") need_save = False failed_teams = [] - with Progress( - SpinnerColumn(), - TextColumn("[progress.description]{task.description}"), - BarColumn(), - TaskProgressColumn(), - TextColumn("{task.fields[status]}"), - ) as progress: - task = progress.add_task("加载中", total=len(teams_with_token), status="") + # 检测是否为 TTY 环境 + is_tty = sys.stdout.isatty() - for team in teams_with_token: - progress.update(task, description=f"[cyan]{team['name']}", status="") + if is_tty: + # TTY 环境: 使用 rich 进度条 + from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TaskProgressColumn + + with Progress( + SpinnerColumn(), + TextColumn("[progress.description]{task.description}"), + BarColumn(), + TaskProgressColumn(), + TextColumn("{task.fields[status]}"), + ) as progress: + task = progress.add_task("加载中", total=total, status="") + + for team in teams_with_token: + progress.update(task, description=f"[cyan]{team['name']}", status="") + + if team.get("account_id"): + success_count += 1 + progress.update(task, advance=1, status="[green]✓ 已缓存") + continue + + account_id = fetch_account_id(team, silent=True) + if account_id: + success_count += 1 + progress.update(task, advance=1, status="[green]✓") + if team.get("format") == "new": + need_save = True + else: + fail_count += 1 + failed_teams.append(team['name']) + progress.update(task, advance=1, status="[red]✗") + else: + # 非 TTY 环境 (systemd/journalctl): 使用普通日志输出 + for idx, team in enumerate(teams_with_token, 1): + team_name = team['name'] if team.get("account_id"): success_count += 1 - progress.update(task, advance=1, status="[green]✓ 已缓存") + log.info(f"预加载 [{idx}/{total}] {team_name}: ✓ 已缓存") continue account_id = fetch_account_id(team, silent=True) if account_id: success_count += 1 - progress.update(task, advance=1, status="[green]✓") + log.info(f"预加载 [{idx}/{total}] {team_name}: ✓ {account_id}") if team.get("format") == "new": need_save = True else: fail_count += 1 - failed_teams.append(team['name']) - progress.update(task, advance=1, status="[red]✗") + failed_teams.append(team_name) + log.warning(f"预加载 [{idx}/{total}] {team_name}: ✗ 失败") - # 输出失败的 team - for name in failed_teams: - log.warning(f"Team {name}: 获取 account_id 失败") + # 输出失败的 team (仅 TTY 环境,非 TTY 已在循环中输出) + if is_tty: + for name in failed_teams: + log.warning(f"Team {name}: 获取 account_id 失败") # 持久化到 team.json if need_save: diff --git a/telegram_bot.py b/telegram_bot.py index eaaf2f6..ccd9631 100644 --- a/telegram_bot.py +++ b/telegram_bot.py @@ -44,7 +44,7 @@ from config import ( S2A_GROUP_IDS, S2A_ADMIN_KEY, ) -from utils import load_team_tracker +from utils import load_team_tracker, get_all_incomplete_accounts from bot_notifier import BotNotifier, set_notifier, progress_finish from s2a_service import s2a_get_dashboard_stats, format_dashboard_stats from email_service import gptmail_service, unified_create_email @@ -98,6 +98,7 @@ class ProvisionerBot: ("fingerprint", self.cmd_fingerprint), ("run", self.cmd_run), ("run_all", self.cmd_run_all), + ("resume", self.cmd_resume), ("stop", self.cmd_stop), ("logs", self.cmd_logs), ("logs_live", self.cmd_logs_live), @@ -173,6 +174,7 @@ class ProvisionerBot: BotCommand("config", "查看系统配置"), BotCommand("run", "处理指定 Team"), BotCommand("run_all", "处理所有 Team"), + BotCommand("resume", "继续处理未完成账号"), BotCommand("stop", "停止当前任务"), BotCommand("logs", "查看最近日志"), BotCommand("dashboard", "查看 S2A 仪表盘"), @@ -209,6 +211,7 @@ class ProvisionerBot: 🚀 任务控制: /run <n> - 开始处理第 n 个 Team /run_all - 开始处理所有 Team +/resume - 继续处理未完成账号 /stop - 停止当前任务 ⚙️ 配置管理: @@ -729,6 +732,53 @@ class ProvisionerBot: self.current_task = asyncio.ensure_future(self._wrap_task(self.current_task, "全部")) + @admin_only + async def cmd_resume(self, update: Update, context: ContextTypes.DEFAULT_TYPE): + """继续处理未完成的账号""" + if self.current_task and not self.current_task.done(): + await update.message.reply_text( + f"⚠️ 任务正在运行: {self.current_team}\n使用 /stop 停止" + ) + return + + # 检查是否有未完成的账号 + tracker = load_team_tracker() + all_incomplete = get_all_incomplete_accounts(tracker) + + if not all_incomplete: + await update.message.reply_text("✅ 没有待处理的账号,所有任务已完成") + return + + # 统计未完成账号 + total_incomplete = sum(len(accs) for accs in all_incomplete.values()) + teams_count = len(all_incomplete) + + # 构建消息 + lines = [ + f"⏳ 发现 {total_incomplete} 个未完成账号", + f"涉及 {teams_count} 个 Team:", + "" + ] + + for team_name, accounts in all_incomplete.items(): + lines.append(f" • {team_name}: {len(accounts)} 个") + + lines.append("") + lines.append("🚀 开始继续处理...") + + await update.message.reply_text("\n".join(lines), parse_mode="HTML") + + # 启动任务 (run_all_teams 会自动处理未完成的账号) + self.current_team = "继续处理" + + loop = asyncio.get_event_loop() + self.current_task = loop.run_in_executor( + self.executor, + self._run_all_teams_task + ) + + self.current_task = asyncio.ensure_future(self._wrap_task(self.current_task, "继续处理")) + async def _wrap_task(self, task, team_name: str): """包装任务以处理完成通知""" try: @@ -736,8 +786,10 @@ class ProvisionerBot: # 收集成功和失败的账号 success_accounts = [r.get("email") for r in (result or []) if r.get("status") == "success"] failed_accounts = [r.get("email") for r in (result or []) if r.get("status") != "success"] + log.info(f"任务完成: {team_name}, 成功: {len(success_accounts)}, 失败: {len(failed_accounts)}") await self.notifier.notify_task_completed(team_name, success_accounts, failed_accounts) except Exception as e: + log.error(f"任务异常: {team_name}, 错误: {e}") await self.notifier.notify_error(f"任务失败: {team_name}", str(e)) finally: self.current_team = None @@ -748,11 +800,31 @@ class ProvisionerBot: """执行单个 Team 任务 (在线程池中运行)""" # 延迟导入避免循环依赖 from run import run_single_team + from team_service import preload_all_account_ids + from utils import load_team_tracker, save_team_tracker, add_team_owners_to_tracker + from config import DEFAULT_PASSWORD + + # 预加载 account_id + preload_all_account_ids() + _tracker = load_team_tracker() + add_team_owners_to_tracker(_tracker, DEFAULT_PASSWORD) + save_team_tracker(_tracker) + return run_single_team(team_idx) def _run_all_teams_task(self): """执行所有 Team 任务 (在线程池中运行)""" from run import run_all_teams + from team_service import preload_all_account_ids + from utils import load_team_tracker, save_team_tracker, add_team_owners_to_tracker + from config import DEFAULT_PASSWORD + + # 预加载 account_id + preload_all_account_ids() + _tracker = load_team_tracker() + add_team_owners_to_tracker(_tracker, DEFAULT_PASSWORD) + save_team_tracker(_tracker) + return run_all_teams() @admin_only @@ -770,8 +842,10 @@ class ProvisionerBot: try: import run run._shutdown_requested = True + # 获取当前运行结果 + current_results = run._current_results.copy() if run._current_results else [] except Exception: - pass + current_results = [] # 2. 取消 asyncio 任务 if self.current_task and not self.current_task.done(): @@ -797,13 +871,51 @@ class ProvisionerBot: # 清理进度跟踪 progress_finish() - await update.message.reply_text( - f"✅ 任务已强制停止\n\n" - f"已停止: {task_name}\n" - f"已清理浏览器进程\n\n" - f"使用 /status 查看状态", - parse_mode="HTML" - ) + # 6. 生成停止报告 + report_lines = [ + f"🛑 任务已停止", + f"任务: {task_name}", + "", + ] + + # 本次运行结果 + if current_results: + success_count = sum(1 for r in current_results if r.get("status") == "success") + failed_count = len(current_results) - success_count + report_lines.append(f"📊 本次运行结果:") + report_lines.append(f" 成功: {success_count}") + report_lines.append(f" 失败: {failed_count}") + report_lines.append("") + + # 获取未完成账号信息 + tracker = load_team_tracker() + all_incomplete = get_all_incomplete_accounts(tracker) + + if all_incomplete: + total_incomplete = sum(len(accs) for accs in all_incomplete.values()) + report_lines.append(f"⏳ 待继续处理: {total_incomplete} 个账号") + + # 显示每个 Team 的未完成账号 (最多显示 3 个 Team) + shown_teams = 0 + for team_name, accounts in all_incomplete.items(): + if shown_teams >= 3: + remaining = len(all_incomplete) - 3 + report_lines.append(f" ... 还有 {remaining} 个 Team") + break + + report_lines.append(f" {team_name}: {len(accounts)} 个") + # 显示第一个待处理账号 + if accounts: + first_acc = accounts[0] + report_lines.append(f" 下一个: {first_acc['email']}") + shown_teams += 1 + + report_lines.append("") + report_lines.append("💡 使用 /resume 继续处理") + else: + report_lines.append("✅ 没有待处理的账号") + + await update.message.reply_text("\n".join(report_lines), parse_mode="HTML") except Exception as e: await update.message.reply_text(f"❌ 停止任务时出错: {e}")