# ==================== Telegram 通知模块 ==================== # 用于将日志和状态变更推送到 Telegram import asyncio import time from typing import Callable, List, Optional, Dict from telegram import Bot, Message from telegram.error import TelegramError from config import ( TELEGRAM_ADMIN_CHAT_IDS, TELEGRAM_NOTIFY_ON_COMPLETE, TELEGRAM_NOTIFY_ON_ERROR, ) def make_progress_bar(current: int, total: int, width: int = 20) -> str: """生成文本进度条 Args: current: 当前进度 total: 总数 width: 进度条宽度 (字符数) Returns: 进度条字符串,如 "▓▓▓▓░░░░░░" """ if total <= 0: return "░" * width percent = min(current / total, 1.0) filled = int(width * percent) empty = width - filled return "▓" * filled + "░" * empty class ProgressTracker: """进度跟踪器 - 用于实时更新 Telegram 消息显示进度""" def __init__(self, bot: Bot, chat_ids: List[int], team_name: str, total: int, team_index: int = 0, teams_total: int = 0, include_owner: bool = False): self.bot = bot self.chat_ids = chat_ids self.team_name = team_name self.total = total self.team_index = team_index # 当前 Team 序号 (从 1 开始) self.teams_total = teams_total # Team 总数 self.include_owner = include_owner # 是否包含 Owner self.current = 0 self.success = 0 self.failed = 0 self.current_account = "" self.current_phase = "" # 当前阶段 (注册/授权/验证) self.current_step = "" self.current_role = "" # 当前账号角色 (member/owner) self.messages: Dict[int, Message] = {} # chat_id -> Message self._last_update = 0 self._update_interval = 2 # 最小更新间隔 (秒) self._loop: asyncio.AbstractEventLoop = None def _get_progress_text(self) -> str: """生成进度消息文本""" bar = make_progress_bar(self.current, self.total, 20) # 标题行:显示 Team 序号 if self.teams_total > 0: title = f"◈ Team [{self.team_index}/{self.teams_total}]: {self.team_name}" else: title = f"◈ 正在处理: {self.team_name}" owner_tag = " (含 Owner)" if self.include_owner else "" lines = [ title, "", f"🔄 注册进度 {self.current}/{self.total}{owner_tag}", bar, "", f"✅ 成功: {self.success}", f"❌ 失败: {self.failed}", ] if self.current_account: lines.append("") role_tag = " 👑" if self.current_role == "owner" else "" lines.append(f"⏳ 正在处理: {self.current_account}{role_tag}") if self.current_phase and self.current_step: lines.append(f" ▸ [{self.current_phase}] {self.current_step}") elif self.current_step: lines.append(f" ▸ {self.current_step}") return "\n".join(lines) async def _send_initial_message(self): """发送初始进度消息""" text = self._get_progress_text() for chat_id in self.chat_ids: try: msg = await self.bot.send_message( chat_id=chat_id, text=text, parse_mode="HTML" ) self.messages[chat_id] = msg except TelegramError: pass async def _update_messages(self): """更新所有进度消息""" text = self._get_progress_text() for chat_id, msg in self.messages.items(): try: await self.bot.edit_message_text( chat_id=chat_id, message_id=msg.message_id, text=text, parse_mode="HTML" ) except TelegramError: pass def _schedule_update(self, force: bool = False): """调度消息更新 (限流) Args: force: 是否强制更新 (忽略限流) """ now = time.time() if not force and now - self._last_update < self._update_interval: return self._last_update = now if self._loop: asyncio.run_coroutine_threadsafe(self._update_messages(), self._loop) def start(self, loop: asyncio.AbstractEventLoop): """启动进度跟踪""" self._loop = loop asyncio.run_coroutine_threadsafe(self._send_initial_message(), loop) def update(self, current: int = None, account: str = None, phase: str = None, step: str = None, role: str = None): """更新进度 (供同步代码调用)""" if current is not None: self.current = current if account is not None: self.current_account = account if phase is not None: self.current_phase = phase if step is not None: self.current_step = step if role is not None: self.current_role = role self._schedule_update() def account_done(self, email: str, success: bool): """标记账号处理完成""" self.current += 1 if success: self.success += 1 else: self.failed += 1 self.current_account = "" self.current_phase = "" self.current_step = "" self.current_role = "" # 最后一个账号完成时强制更新,确保显示 100% is_last = self.current >= self.total self._schedule_update(force=is_last) def finish(self): """完成进度跟踪,发送最终状态""" self.current_step = "已完成!" # 强制更新最终状态 self._schedule_update(force=True) class PreloadProgressTracker: """预加载进度跟踪器 - 用于显示 account_id 预加载进度""" def __init__(self, bot: Bot, chat_ids: List[int], total: int): self.bot = bot self.chat_ids = chat_ids self.total = total self.current = 0 self.success = 0 self.failed = 0 self.current_team = "" self.messages: Dict[int, Message] = {} self._last_update = 0 self._update_interval = 1 # 预加载更新间隔更短 self._loop: asyncio.AbstractEventLoop = None def _get_progress_text(self) -> str: """生成预加载进度消息文本""" bar = make_progress_bar(self.current, self.total, 20) lines = [ "◈ 预加载 Account ID", "", f"🔄 进度 {self.current}/{self.total}", bar, "", f"✅ 成功: {self.success}", f"❌ 失败: {self.failed}", ] if self.current_team: lines.append("") lines.append(f"⏳ 正在获取: {self.current_team}") return "\n".join(lines) async def _send_initial_message(self): """发送初始进度消息""" text = self._get_progress_text() for chat_id in self.chat_ids: try: msg = await self.bot.send_message( chat_id=chat_id, text=text, parse_mode="HTML" ) self.messages[chat_id] = msg except TelegramError: pass async def _update_messages(self): """更新所有进度消息""" text = self._get_progress_text() for chat_id, msg in self.messages.items(): try: await self.bot.edit_message_text( chat_id=chat_id, message_id=msg.message_id, text=text, parse_mode="HTML" ) except TelegramError: pass def _schedule_update(self, force: bool = False): """调度消息更新""" now = time.time() if not force and now - self._last_update < self._update_interval: return self._last_update = now if self._loop: asyncio.run_coroutine_threadsafe(self._update_messages(), self._loop) def start(self, loop: asyncio.AbstractEventLoop): """启动进度跟踪""" self._loop = loop asyncio.run_coroutine_threadsafe(self._send_initial_message(), loop) def update(self, team_name: str = None): """更新当前处理的 Team""" if team_name is not None: self.current_team = team_name self._schedule_update() def item_done(self, team_name: str, success: bool): """标记一个 Team 完成""" self.current += 1 if success: self.success += 1 else: self.failed += 1 self.current_team = "" is_last = self.current >= self.total self._schedule_update(force=is_last) def finish(self, stopped: bool = False): """完成预加载""" if stopped: self.current_team = "已停止" else: self.current_team = "已完成!" self._schedule_update(force=True) class BotNotifier: """Telegram 通知推送器""" def __init__(self, bot: Bot, chat_ids: List[int] = None): self.bot = bot self.chat_ids = chat_ids or TELEGRAM_ADMIN_CHAT_IDS self._message_queue: asyncio.Queue = None self._worker_task: asyncio.Task = None self._current_progress: Optional[ProgressTracker] = None self._current_preload_progress: Optional[PreloadProgressTracker] = None self._loop: asyncio.AbstractEventLoop = None async def start(self): """启动消息发送队列""" self._message_queue = asyncio.Queue() self._worker_task = asyncio.create_task(self._message_worker()) self._loop = asyncio.get_event_loop() async def stop(self): """停止消息发送队列""" if self._worker_task: self._worker_task.cancel() try: await self._worker_task except asyncio.CancelledError: pass async def _message_worker(self): """后台消息发送工作线程""" while True: try: message, level = await self._message_queue.get() await self._send_to_all(message) self._message_queue.task_done() except asyncio.CancelledError: break except Exception: pass async def _send_to_all(self, message: str, parse_mode: str = "HTML"): """发送消息到所有管理员""" for chat_id in self.chat_ids: try: await self.bot.send_message( chat_id=chat_id, text=message, parse_mode=parse_mode ) except TelegramError as e: print(f"[BotNotifier] 发送消息失败 (chat_id={chat_id}): {e}") async def _send_photo_to_all(self, photo_path: str, caption: str = ""): """发送图片到所有管理员""" for chat_id in self.chat_ids: try: with open(photo_path, 'rb') as photo: await self.bot.send_photo( chat_id=chat_id, photo=photo, caption=caption, parse_mode="HTML" ) except TelegramError: pass except FileNotFoundError: pass async def send_screenshot(self, photo_path: str, caption: str = ""): """发送调试截图""" await self._send_photo_to_all(photo_path, caption) def queue_message(self, message: str, level: str = "info"): """将消息加入发送队列 (非阻塞)""" if self._message_queue: try: self._message_queue.put_nowait((message, level)) except asyncio.QueueFull: pass async def notify(self, message: str, level: str = "info"): """直接发送通知 (阻塞)""" await self._send_to_all(message) def create_progress(self, team_name: str, total: int, team_index: int = 0, teams_total: int = 0, include_owner: bool = False) -> ProgressTracker: """创建进度跟踪器 Args: team_name: Team 名称 total: 账号总数 team_index: 当前 Team 序号 (从 1 开始) teams_total: Team 总数 include_owner: 是否包含 Owner """ self._current_progress = ProgressTracker( self.bot, self.chat_ids, team_name, total, team_index=team_index, teams_total=teams_total, include_owner=include_owner ) if self._loop: self._current_progress.start(self._loop) return self._current_progress def get_progress(self) -> Optional[ProgressTracker]: """获取当前进度跟踪器""" return self._current_progress def clear_progress(self): """清除进度跟踪器""" if self._current_progress: self._current_progress.finish() self._current_progress = None def create_preload_progress(self, total: int) -> PreloadProgressTracker: """创建预加载进度跟踪器""" self._current_preload_progress = PreloadProgressTracker( self.bot, self.chat_ids, total ) if self._loop: self._current_preload_progress.start(self._loop) return self._current_preload_progress def get_preload_progress(self) -> Optional[PreloadProgressTracker]: """获取当前预加载进度跟踪器""" return self._current_preload_progress def clear_preload_progress(self, stopped: bool = False): """清除预加载进度跟踪器""" if self._current_preload_progress: self._current_preload_progress.finish(stopped=stopped) self._current_preload_progress = None async def notify_task_started(self, team_name: str): """通知任务开始""" await self.notify(f"🚀 任务开始\nTeam: {team_name}") async def notify_task_completed(self, team_name: str, success_accounts: list, failed_accounts: list): """通知任务完成 Args: team_name: Team 名称 success_accounts: 成功的账号列表 failed_accounts: 失败的账号列表 """ if not TELEGRAM_NOTIFY_ON_COMPLETE: return success_count = len(success_accounts) failed_count = len(failed_accounts) status = "全部成功" if failed_count == 0 else f"{failed_count} 个失败" # 构建消息 message = ( f"✅ 任务完成\n" f"Team: {team_name}\n" f"成功: {success_count}\n" f"状态: {status}" ) # 如果有成功的账号,列出来 if success_accounts: message += "\n\n成功账号:" for email in success_accounts: message += f"\n• {email}" await self.notify(message) async def notify_team_completed(self, team_name: str, results: list): """通知单个 Team 处理完成 Args: team_name: Team 名称 results: 处理结果列表 [{"email", "status", ...}] """ if not results: return success_accounts = [r.get("email") for r in results if r.get("status") == "success"] failed_accounts = [r.get("email") for r in results if r.get("status") != "success"] success_count = len(success_accounts) failed_count = len(failed_accounts) total = len(results) # 状态图标 if failed_count == 0: icon = "✅" status = "全部成功" elif success_count == 0: icon = "❌" status = "全部失败" else: icon = "⚠️" status = f"{failed_count} 个失败" # 构建消息 message = ( f"{icon} Team 处理完成\n" f"Team: {team_name}\n" f"结果: {success_count}/{total} 成功\n" f"状态: {status}" ) # 列出成功的账号 if success_accounts: message += "\n\n✓ 成功:" for email in success_accounts: message += f"\n {email}" # 列出失败的账号 if failed_accounts: message += "\n\n✗ 失败:" for email in failed_accounts: message += f"\n {email}" await self.notify(message) async def notify_error(self, message: str, details: str = ""): """通知错误""" if not TELEGRAM_NOTIFY_ON_ERROR: return text = f"❌ 错误\n{message}" if details: text += f"\n{details[:500]}" await self.notify(text) async def notify_account_status(self, email: str, status: str, team_name: str = ""): """通知账号状态变更""" icon = { "completed": "OK", "authorized": "AUTH", "registered": "REG", "failed": "FAIL", }.get(status, status.upper()) text = f"[{icon}] {email}" if team_name: text = f"{team_name}\n{text}" self.queue_message(text, "info") # 全局通知器实例 (在 telegram_bot.py 中初始化) _notifier: Optional[BotNotifier] = None def set_notifier(notifier: BotNotifier): """设置全局通知器""" global _notifier _notifier = notifier def get_notifier() -> Optional[BotNotifier]: """获取全局通知器""" return _notifier def notify_sync(message: str, level: str = "info"): """同步方式发送通知 (供非异步代码使用)""" if _notifier: _notifier.queue_message(message, level) def send_screenshot_sync(photo_path: str, caption: str = ""): """同步方式发送截图 (供非异步代码使用)""" if _notifier and _notifier._loop: import asyncio asyncio.run_coroutine_threadsafe( _notifier.send_screenshot(photo_path, caption), _notifier._loop ) # ==================== 进度更新接口 (供 run.py 使用) ==================== def progress_start(team_name: str, total: int, team_index: int = 0, teams_total: int = 0, include_owner: bool = False) -> Optional[ProgressTracker]: """开始进度跟踪 Args: team_name: Team 名称 total: 账号总数 team_index: 当前 Team 序号 (从 1 开始) teams_total: Team 总数 include_owner: 是否包含 Owner """ if _notifier: return _notifier.create_progress(team_name, total, team_index, teams_total, include_owner) return None def progress_update(account: str = None, phase: str = None, step: str = None, role: str = None): """更新当前进度 Args: account: 当前处理的账号 phase: 当前阶段 (注册/授权/验证) step: 当前步骤 role: 账号角色 """ if _notifier and _notifier.get_progress(): _notifier.get_progress().update(account=account, phase=phase, step=step, role=role) def progress_account_done(email: str, success: bool): """标记账号完成""" if _notifier and _notifier.get_progress(): _notifier.get_progress().account_done(email, success) def progress_finish(): """完成进度跟踪""" if _notifier: _notifier.clear_progress() # ==================== 预加载进度接口 (供 team_service.py 使用) ==================== def preload_progress_start(total: int) -> Optional[PreloadProgressTracker]: """开始预加载进度跟踪""" if _notifier: return _notifier.create_preload_progress(total) return None def preload_progress_update(team_name: str): """更新当前预加载的 Team""" if _notifier and _notifier.get_preload_progress(): _notifier.get_preload_progress().update(team_name=team_name) def preload_progress_done(team_name: str, success: bool): """标记一个 Team 预加载完成""" if _notifier and _notifier.get_preload_progress(): _notifier.get_preload_progress().item_done(team_name, success) def preload_progress_finish(stopped: bool = False): """完成预加载进度跟踪""" if _notifier: _notifier.clear_preload_progress(stopped=stopped) def notify_team_completed_sync(team_name: str, results: list): """同步方式通知单个 Team 完成 (供 run.py 使用)""" if _notifier and _notifier._loop: import asyncio asyncio.run_coroutine_threadsafe( _notifier.notify_team_completed(team_name, results), _notifier._loop )