This commit is contained in:
2026-01-20 21:26:44 +08:00
parent 906aceff6e
commit 26438386c1
2 changed files with 178 additions and 3 deletions

View File

@@ -177,6 +177,111 @@ class ProgressTracker:
self._schedule_update(force=True) self._schedule_update(force=True)
class PreloadProgressTracker:
"""预加载进度跟踪器 - 用于显示 account_id 预加载进度"""
def __init__(self, bot: Bot, chat_ids: List[int], total: int):
self.bot = bot
self.chat_ids = chat_ids
self.total = total
self.current = 0
self.success = 0
self.failed = 0
self.current_team = ""
self.messages: Dict[int, Message] = {}
self._last_update = 0
self._update_interval = 1 # 预加载更新间隔更短
self._loop: asyncio.AbstractEventLoop = None
def _get_progress_text(self) -> str:
"""生成预加载进度消息文本"""
bar = make_progress_bar(self.current, self.total, 20)
lines = [
"<b>◈ 预加载 Account ID</b>",
"",
f"🔄 进度 {self.current}/{self.total}",
bar,
"",
f"✅ 成功: {self.success}",
f"❌ 失败: {self.failed}",
]
if self.current_team:
lines.append("")
lines.append(f"⏳ 正在获取: <code>{self.current_team}</code>")
return "\n".join(lines)
async def _send_initial_message(self):
"""发送初始进度消息"""
text = self._get_progress_text()
for chat_id in self.chat_ids:
try:
msg = await self.bot.send_message(
chat_id=chat_id,
text=text,
parse_mode="HTML"
)
self.messages[chat_id] = msg
except TelegramError:
pass
async def _update_messages(self):
"""更新所有进度消息"""
text = self._get_progress_text()
for chat_id, msg in self.messages.items():
try:
await self.bot.edit_message_text(
chat_id=chat_id,
message_id=msg.message_id,
text=text,
parse_mode="HTML"
)
except TelegramError:
pass
def _schedule_update(self, force: bool = False):
"""调度消息更新"""
now = time.time()
if not force and now - self._last_update < self._update_interval:
return
self._last_update = now
if self._loop:
asyncio.run_coroutine_threadsafe(self._update_messages(), self._loop)
def start(self, loop: asyncio.AbstractEventLoop):
"""启动进度跟踪"""
self._loop = loop
asyncio.run_coroutine_threadsafe(self._send_initial_message(), loop)
def update(self, team_name: str = None):
"""更新当前处理的 Team"""
if team_name is not None:
self.current_team = team_name
self._schedule_update()
def item_done(self, team_name: str, success: bool):
"""标记一个 Team 完成"""
self.current += 1
if success:
self.success += 1
else:
self.failed += 1
self.current_team = ""
is_last = self.current >= self.total
self._schedule_update(force=is_last)
def finish(self, stopped: bool = False):
"""完成预加载"""
if stopped:
self.current_team = "已停止"
else:
self.current_team = "已完成!"
self._schedule_update(force=True)
class BotNotifier: class BotNotifier:
"""Telegram 通知推送器""" """Telegram 通知推送器"""
@@ -186,6 +291,7 @@ class BotNotifier:
self._message_queue: asyncio.Queue = None self._message_queue: asyncio.Queue = None
self._worker_task: asyncio.Task = None self._worker_task: asyncio.Task = None
self._current_progress: Optional[ProgressTracker] = None self._current_progress: Optional[ProgressTracker] = None
self._current_preload_progress: Optional[PreloadProgressTracker] = None
self._loop: asyncio.AbstractEventLoop = None self._loop: asyncio.AbstractEventLoop = None
async def start(self): async def start(self):
@@ -288,6 +394,25 @@ class BotNotifier:
self._current_progress.finish() self._current_progress.finish()
self._current_progress = None self._current_progress = None
def create_preload_progress(self, total: int) -> PreloadProgressTracker:
"""创建预加载进度跟踪器"""
self._current_preload_progress = PreloadProgressTracker(
self.bot, self.chat_ids, total
)
if self._loop:
self._current_preload_progress.start(self._loop)
return self._current_preload_progress
def get_preload_progress(self) -> Optional[PreloadProgressTracker]:
"""获取当前预加载进度跟踪器"""
return self._current_preload_progress
def clear_preload_progress(self, stopped: bool = False):
"""清除预加载进度跟踪器"""
if self._current_preload_progress:
self._current_preload_progress.finish(stopped=stopped)
self._current_preload_progress = None
async def notify_task_started(self, team_name: str): async def notify_task_started(self, team_name: str):
"""通知任务开始""" """通知任务开始"""
await self.notify(f"<b>🚀 任务开始</b>\nTeam: {team_name}") await self.notify(f"<b>🚀 任务开始</b>\nTeam: {team_name}")
@@ -471,6 +596,33 @@ def progress_finish():
_notifier.clear_progress() _notifier.clear_progress()
# ==================== 预加载进度接口 (供 team_service.py 使用) ====================
def preload_progress_start(total: int) -> Optional[PreloadProgressTracker]:
"""开始预加载进度跟踪"""
if _notifier:
return _notifier.create_preload_progress(total)
return None
def preload_progress_update(team_name: str):
"""更新当前预加载的 Team"""
if _notifier and _notifier.get_preload_progress():
_notifier.get_preload_progress().update(team_name=team_name)
def preload_progress_done(team_name: str, success: bool):
"""标记一个 Team 预加载完成"""
if _notifier and _notifier.get_preload_progress():
_notifier.get_preload_progress().item_done(team_name, success)
def preload_progress_finish(stopped: bool = False):
"""完成预加载进度跟踪"""
if _notifier:
_notifier.clear_preload_progress(stopped=stopped)
def notify_team_completed_sync(team_name: str, results: list): def notify_team_completed_sync(team_name: str, results: list):
"""同步方式通知单个 Team 完成 (供 run.py 使用)""" """同步方式通知单个 Team 完成 (供 run.py 使用)"""
if _notifier and _notifier._loop: if _notifier and _notifier._loop:

View File

@@ -125,6 +125,20 @@ def preload_all_account_ids() -> tuple[int, int]:
import sys import sys
from concurrent.futures import ThreadPoolExecutor, as_completed from concurrent.futures import ThreadPoolExecutor, as_completed
# 导入 Telegram 进度显示接口
try:
from bot_notifier import (
preload_progress_start,
preload_progress_update,
preload_progress_done,
preload_progress_finish
)
except ImportError:
def preload_progress_start(total): pass
def preload_progress_update(team_name): pass
def preload_progress_done(team_name, success): pass
def preload_progress_finish(stopped=False): pass
success_count = 0 success_count = 0
fail_count = 0 fail_count = 0
@@ -143,6 +157,9 @@ def preload_all_account_ids() -> tuple[int, int]:
total = len(teams_need_fetch) total = len(teams_need_fetch)
log.info(f"并行预加载 {total} 个 Team 的 account_id...", icon="sync") log.info(f"并行预加载 {total} 个 Team 的 account_id...", icon="sync")
# 启动 Telegram 进度显示
preload_progress_start(total)
need_save = False need_save = False
failed_teams = [] failed_teams = []
stopped = False stopped = False
@@ -172,6 +189,7 @@ def preload_all_account_ids() -> tuple[int, int]:
team, account_id, status = future.result() team, account_id, status = future.result()
completed += 1 completed += 1
team_name = team['name']
if status == "stopped": if status == "stopped":
stopped = True stopped = True
@@ -180,15 +198,20 @@ def preload_all_account_ids() -> tuple[int, int]:
success_count += 1 success_count += 1
if team.get("format") == "new": if team.get("format") == "new":
need_save = True need_save = True
log.info(f"预加载 [{completed}/{total}] {team['name']}: ✓ {account_id}") log.info(f"预加载 [{completed}/{total}] {team_name}: ✓ {account_id}")
preload_progress_done(team_name, True)
else: else:
fail_count += 1 fail_count += 1
failed_teams.append(team['name']) failed_teams.append(team_name)
log.warning(f"预加载 [{completed}/{total}] {team['name']}: ✗ 失败") log.warning(f"预加载 [{completed}/{total}] {team_name}: ✗ 失败")
preload_progress_done(team_name, False)
# 加上已缓存的数量 # 加上已缓存的数量
success_count += cached_count success_count += cached_count
# 完成 Telegram 进度显示
preload_progress_finish(stopped=stopped)
# 持久化到 team.json # 持久化到 team.json
if need_save: if need_save:
if save_team_json(): if save_team_json():