# ==================== Telegram 通知模块 ==================== # 用于将日志和状态变更推送到 Telegram import asyncio import time from typing import Callable, List, Optional, Dict from telegram import Bot, Message from telegram.error import TelegramError from config import ( TELEGRAM_ADMIN_CHAT_IDS, TELEGRAM_NOTIFY_ON_COMPLETE, TELEGRAM_NOTIFY_ON_ERROR, ) def make_progress_bar(current: int, total: int, width: int = 10) -> str: """生成文本进度条 Args: current: 当前进度 total: 总数 width: 进度条宽度 (字符数) Returns: 进度条字符串,如 "████████░░ 80%" """ if total <= 0: return "░" * width + " 0%" percent = min(current / total, 1.0) filled = int(width * percent) empty = width - filled bar = "█" * filled + "░" * empty percent_text = f"{int(percent * 100)}%" return f"{bar} {percent_text}" class ProgressTracker: """进度跟踪器 - 用于实时更新 Telegram 消息显示进度""" def __init__(self, bot: Bot, chat_ids: List[int], team_name: str, total: int): self.bot = bot self.chat_ids = chat_ids self.team_name = team_name self.total = total self.current = 0 self.success = 0 self.failed = 0 self.current_account = "" self.current_step = "" self.messages: Dict[int, Message] = {} # chat_id -> Message self._last_update = 0 self._update_interval = 2 # 最小更新间隔 (秒) self._loop: asyncio.AbstractEventLoop = None def _get_progress_text(self) -> str: """生成进度消息文本""" bar = make_progress_bar(self.current, self.total, 12) lines = [ f"正在处理: {self.team_name}", "", f"进度: {bar}", f"账号: {self.current}/{self.total}", f"成功: {self.success} | 失败: {self.failed}", ] if self.current_account: lines.append("") lines.append(f"当前: {self.current_account}") if self.current_step: lines.append(f"步骤: {self.current_step}") return "\n".join(lines) async def _send_initial_message(self): """发送初始进度消息""" text = self._get_progress_text() for chat_id in self.chat_ids: try: msg = await self.bot.send_message( chat_id=chat_id, text=text, parse_mode="HTML" ) self.messages[chat_id] = msg except TelegramError: pass async def _update_messages(self): """更新所有进度消息""" text = self._get_progress_text() for chat_id, msg in self.messages.items(): try: await self.bot.edit_message_text( chat_id=chat_id, message_id=msg.message_id, text=text, parse_mode="HTML" ) except TelegramError: pass def _schedule_update(self): """调度消息更新 (限流)""" now = time.time() if now - self._last_update < self._update_interval: return self._last_update = now if self._loop: asyncio.run_coroutine_threadsafe(self._update_messages(), self._loop) def start(self, loop: asyncio.AbstractEventLoop): """启动进度跟踪""" self._loop = loop asyncio.run_coroutine_threadsafe(self._send_initial_message(), loop) def update(self, current: int = None, account: str = None, step: str = None): """更新进度 (供同步代码调用)""" if current is not None: self.current = current if account is not None: self.current_account = account if step is not None: self.current_step = step self._schedule_update() def account_done(self, email: str, success: bool): """标记账号处理完成""" self.current += 1 if success: self.success += 1 else: self.failed += 1 self.current_account = "" self.current_step = "" self._schedule_update() def finish(self): """完成进度跟踪,发送最终状态""" self.current_step = "已完成!" if self._loop: asyncio.run_coroutine_threadsafe(self._update_messages(), self._loop) class BotNotifier: """Telegram 通知推送器""" def __init__(self, bot: Bot, chat_ids: List[int] = None): self.bot = bot self.chat_ids = chat_ids or TELEGRAM_ADMIN_CHAT_IDS self._message_queue: asyncio.Queue = None self._worker_task: asyncio.Task = None self._current_progress: Optional[ProgressTracker] = None self._loop: asyncio.AbstractEventLoop = None async def start(self): """启动消息发送队列""" self._message_queue = asyncio.Queue() self._worker_task = asyncio.create_task(self._message_worker()) self._loop = asyncio.get_event_loop() async def stop(self): """停止消息发送队列""" if self._worker_task: self._worker_task.cancel() try: await self._worker_task except asyncio.CancelledError: pass async def _message_worker(self): """后台消息发送工作线程""" while True: try: message, level = await self._message_queue.get() await self._send_to_all(message) self._message_queue.task_done() except asyncio.CancelledError: break except Exception: pass async def _send_to_all(self, message: str, parse_mode: str = "HTML"): """发送消息到所有管理员""" for chat_id in self.chat_ids: try: await self.bot.send_message( chat_id=chat_id, text=message, parse_mode=parse_mode ) except TelegramError: pass async def _send_photo_to_all(self, photo_path: str, caption: str = ""): """发送图片到所有管理员""" for chat_id in self.chat_ids: try: with open(photo_path, 'rb') as photo: await self.bot.send_photo( chat_id=chat_id, photo=photo, caption=caption, parse_mode="HTML" ) except TelegramError: pass except FileNotFoundError: pass async def send_screenshot(self, photo_path: str, caption: str = ""): """发送调试截图""" await self._send_photo_to_all(photo_path, caption) def queue_message(self, message: str, level: str = "info"): """将消息加入发送队列 (非阻塞)""" if self._message_queue: try: self._message_queue.put_nowait((message, level)) except asyncio.QueueFull: pass async def notify(self, message: str, level: str = "info"): """直接发送通知 (阻塞)""" await self._send_to_all(message) def create_progress(self, team_name: str, total: int) -> ProgressTracker: """创建进度跟踪器""" self._current_progress = ProgressTracker( self.bot, self.chat_ids, team_name, total ) if self._loop: self._current_progress.start(self._loop) return self._current_progress def get_progress(self) -> Optional[ProgressTracker]: """获取当前进度跟踪器""" return self._current_progress def clear_progress(self): """清除进度跟踪器""" if self._current_progress: self._current_progress.finish() self._current_progress = None async def notify_task_started(self, team_name: str): """通知任务开始""" await self.notify(f"🚀 任务开始\nTeam: {team_name}") async def notify_task_completed(self, team_name: str, success_accounts: list, failed_accounts: list): """通知任务完成 Args: team_name: Team 名称 success_accounts: 成功的账号列表 failed_accounts: 失败的账号列表 """ if not TELEGRAM_NOTIFY_ON_COMPLETE: return success_count = len(success_accounts) failed_count = len(failed_accounts) status = "全部成功" if failed_count == 0 else f"{failed_count} 个失败" # 构建消息 message = ( f"✅ 任务完成\n" f"Team: {team_name}\n" f"成功: {success_count}\n" f"状态: {status}" ) # 如果有成功的账号,列出来 if success_accounts: message += "\n\n成功账号:" for email in success_accounts: message += f"\n• {email}" await self.notify(message) async def notify_error(self, message: str, details: str = ""): """通知错误""" if not TELEGRAM_NOTIFY_ON_ERROR: return text = f"❌ 错误\n{message}" if details: text += f"\n{details[:500]}" await self.notify(text) async def notify_account_status(self, email: str, status: str, team_name: str = ""): """通知账号状态变更""" icon = { "completed": "OK", "authorized": "AUTH", "registered": "REG", "failed": "FAIL", }.get(status, status.upper()) text = f"[{icon}] {email}" if team_name: text = f"{team_name}\n{text}" self.queue_message(text, "info") # 全局通知器实例 (在 telegram_bot.py 中初始化) _notifier: Optional[BotNotifier] = None def set_notifier(notifier: BotNotifier): """设置全局通知器""" global _notifier _notifier = notifier def get_notifier() -> Optional[BotNotifier]: """获取全局通知器""" return _notifier def notify_sync(message: str, level: str = "info"): """同步方式发送通知 (供非异步代码使用)""" if _notifier: _notifier.queue_message(message, level) def send_screenshot_sync(photo_path: str, caption: str = ""): """同步方式发送截图 (供非异步代码使用)""" if _notifier and _notifier._loop: import asyncio asyncio.run_coroutine_threadsafe( _notifier.send_screenshot(photo_path, caption), _notifier._loop ) # ==================== 进度更新接口 (供 run.py 使用) ==================== def progress_start(team_name: str, total: int) -> Optional[ProgressTracker]: """开始进度跟踪""" if _notifier: return _notifier.create_progress(team_name, total) return None def progress_update(account: str = None, step: str = None): """更新当前进度""" if _notifier and _notifier.get_progress(): _notifier.get_progress().update(account=account, step=step) def progress_account_done(email: str, success: bool): """标记账号完成""" if _notifier and _notifier.get_progress(): _notifier.get_progress().account_done(email, success) def progress_finish(): """完成进度跟踪""" if _notifier: _notifier.clear_progress()