diff --git a/bot_notifier.py b/bot_notifier.py index 3ccb6c1..952a7b4 100644 --- a/bot_notifier.py +++ b/bot_notifier.py @@ -177,6 +177,111 @@ class ProgressTracker: self._schedule_update(force=True) +class PreloadProgressTracker: + """预加载进度跟踪器 - 用于显示 account_id 预加载进度""" + + def __init__(self, bot: Bot, chat_ids: List[int], total: int): + self.bot = bot + self.chat_ids = chat_ids + self.total = total + self.current = 0 + self.success = 0 + self.failed = 0 + self.current_team = "" + self.messages: Dict[int, Message] = {} + self._last_update = 0 + self._update_interval = 1 # 预加载更新间隔更短 + self._loop: asyncio.AbstractEventLoop = None + + def _get_progress_text(self) -> str: + """生成预加载进度消息文本""" + bar = make_progress_bar(self.current, self.total, 20) + + lines = [ + "◈ 预加载 Account ID", + "", + f"🔄 进度 {self.current}/{self.total}", + bar, + "", + f"✅ 成功: {self.success}", + f"❌ 失败: {self.failed}", + ] + + if self.current_team: + lines.append("") + lines.append(f"⏳ 正在获取: {self.current_team}") + + return "\n".join(lines) + + async def _send_initial_message(self): + """发送初始进度消息""" + text = self._get_progress_text() + for chat_id in self.chat_ids: + try: + msg = await self.bot.send_message( + chat_id=chat_id, + text=text, + parse_mode="HTML" + ) + self.messages[chat_id] = msg + except TelegramError: + pass + + async def _update_messages(self): + """更新所有进度消息""" + text = self._get_progress_text() + for chat_id, msg in self.messages.items(): + try: + await self.bot.edit_message_text( + chat_id=chat_id, + message_id=msg.message_id, + text=text, + parse_mode="HTML" + ) + except TelegramError: + pass + + def _schedule_update(self, force: bool = False): + """调度消息更新""" + now = time.time() + if not force and now - self._last_update < self._update_interval: + return + + self._last_update = now + if self._loop: + asyncio.run_coroutine_threadsafe(self._update_messages(), self._loop) + + def start(self, loop: asyncio.AbstractEventLoop): + """启动进度跟踪""" + self._loop = loop + asyncio.run_coroutine_threadsafe(self._send_initial_message(), loop) + + def update(self, team_name: str = None): + """更新当前处理的 Team""" + if team_name is not None: + self.current_team = team_name + self._schedule_update() + + def item_done(self, team_name: str, success: bool): + """标记一个 Team 完成""" + self.current += 1 + if success: + self.success += 1 + else: + self.failed += 1 + self.current_team = "" + is_last = self.current >= self.total + self._schedule_update(force=is_last) + + def finish(self, stopped: bool = False): + """完成预加载""" + if stopped: + self.current_team = "已停止" + else: + self.current_team = "已完成!" + self._schedule_update(force=True) + + class BotNotifier: """Telegram 通知推送器""" @@ -186,6 +291,7 @@ class BotNotifier: self._message_queue: asyncio.Queue = None self._worker_task: asyncio.Task = None self._current_progress: Optional[ProgressTracker] = None + self._current_preload_progress: Optional[PreloadProgressTracker] = None self._loop: asyncio.AbstractEventLoop = None async def start(self): @@ -288,6 +394,25 @@ class BotNotifier: self._current_progress.finish() self._current_progress = None + def create_preload_progress(self, total: int) -> PreloadProgressTracker: + """创建预加载进度跟踪器""" + self._current_preload_progress = PreloadProgressTracker( + self.bot, self.chat_ids, total + ) + if self._loop: + self._current_preload_progress.start(self._loop) + return self._current_preload_progress + + def get_preload_progress(self) -> Optional[PreloadProgressTracker]: + """获取当前预加载进度跟踪器""" + return self._current_preload_progress + + def clear_preload_progress(self, stopped: bool = False): + """清除预加载进度跟踪器""" + if self._current_preload_progress: + self._current_preload_progress.finish(stopped=stopped) + self._current_preload_progress = None + async def notify_task_started(self, team_name: str): """通知任务开始""" await self.notify(f"🚀 任务开始\nTeam: {team_name}") @@ -471,6 +596,33 @@ def progress_finish(): _notifier.clear_progress() +# ==================== 预加载进度接口 (供 team_service.py 使用) ==================== + +def preload_progress_start(total: int) -> Optional[PreloadProgressTracker]: + """开始预加载进度跟踪""" + if _notifier: + return _notifier.create_preload_progress(total) + return None + + +def preload_progress_update(team_name: str): + """更新当前预加载的 Team""" + if _notifier and _notifier.get_preload_progress(): + _notifier.get_preload_progress().update(team_name=team_name) + + +def preload_progress_done(team_name: str, success: bool): + """标记一个 Team 预加载完成""" + if _notifier and _notifier.get_preload_progress(): + _notifier.get_preload_progress().item_done(team_name, success) + + +def preload_progress_finish(stopped: bool = False): + """完成预加载进度跟踪""" + if _notifier: + _notifier.clear_preload_progress(stopped=stopped) + + def notify_team_completed_sync(team_name: str, results: list): """同步方式通知单个 Team 完成 (供 run.py 使用)""" if _notifier and _notifier._loop: diff --git a/team_service.py b/team_service.py index 200a8bf..2af51d0 100644 --- a/team_service.py +++ b/team_service.py @@ -125,6 +125,20 @@ def preload_all_account_ids() -> tuple[int, int]: import sys from concurrent.futures import ThreadPoolExecutor, as_completed + # 导入 Telegram 进度显示接口 + try: + from bot_notifier import ( + preload_progress_start, + preload_progress_update, + preload_progress_done, + preload_progress_finish + ) + except ImportError: + def preload_progress_start(total): pass + def preload_progress_update(team_name): pass + def preload_progress_done(team_name, success): pass + def preload_progress_finish(stopped=False): pass + success_count = 0 fail_count = 0 @@ -143,6 +157,9 @@ def preload_all_account_ids() -> tuple[int, int]: total = len(teams_need_fetch) log.info(f"并行预加载 {total} 个 Team 的 account_id...", icon="sync") + # 启动 Telegram 进度显示 + preload_progress_start(total) + need_save = False failed_teams = [] stopped = False @@ -172,6 +189,7 @@ def preload_all_account_ids() -> tuple[int, int]: team, account_id, status = future.result() completed += 1 + team_name = team['name'] if status == "stopped": stopped = True @@ -180,15 +198,20 @@ def preload_all_account_ids() -> tuple[int, int]: success_count += 1 if team.get("format") == "new": need_save = True - log.info(f"预加载 [{completed}/{total}] {team['name']}: ✓ {account_id}") + log.info(f"预加载 [{completed}/{total}] {team_name}: ✓ {account_id}") + preload_progress_done(team_name, True) else: fail_count += 1 - failed_teams.append(team['name']) - log.warning(f"预加载 [{completed}/{total}] {team['name']}: ✗ 失败") + failed_teams.append(team_name) + log.warning(f"预加载 [{completed}/{total}] {team_name}: ✗ 失败") + preload_progress_done(team_name, False) # 加上已缓存的数量 success_count += cached_count + # 完成 Telegram 进度显示 + preload_progress_finish(stopped=stopped) + # 持久化到 team.json if need_save: if save_team_json():