482 lines
16 KiB
Python
482 lines
16 KiB
Python
# ==================== Telegram 通知模块 ====================
|
|
# 用于将日志和状态变更推送到 Telegram
|
|
|
|
import asyncio
|
|
import time
|
|
from typing import Callable, List, Optional, Dict
|
|
from telegram import Bot, Message
|
|
from telegram.error import TelegramError
|
|
|
|
from config import (
|
|
TELEGRAM_ADMIN_CHAT_IDS,
|
|
TELEGRAM_NOTIFY_ON_COMPLETE,
|
|
TELEGRAM_NOTIFY_ON_ERROR,
|
|
)
|
|
|
|
|
|
def make_progress_bar(current: int, total: int, width: int = 20) -> str:
|
|
"""生成文本进度条
|
|
|
|
Args:
|
|
current: 当前进度
|
|
total: 总数
|
|
width: 进度条宽度 (字符数)
|
|
|
|
Returns:
|
|
进度条字符串,如 "▓▓▓▓░░░░░░"
|
|
"""
|
|
if total <= 0:
|
|
return "░" * width
|
|
|
|
percent = min(current / total, 1.0)
|
|
filled = int(width * percent)
|
|
empty = width - filled
|
|
|
|
return "▓" * filled + "░" * empty
|
|
|
|
|
|
class ProgressTracker:
|
|
"""进度跟踪器 - 用于实时更新 Telegram 消息显示进度"""
|
|
|
|
def __init__(self, bot: Bot, chat_ids: List[int], team_name: str, total: int,
|
|
team_index: int = 0, teams_total: int = 0, include_owner: bool = False):
|
|
self.bot = bot
|
|
self.chat_ids = chat_ids
|
|
self.team_name = team_name
|
|
self.total = total
|
|
self.team_index = team_index # 当前 Team 序号 (从 1 开始)
|
|
self.teams_total = teams_total # Team 总数
|
|
self.include_owner = include_owner # 是否包含 Owner
|
|
self.current = 0
|
|
self.success = 0
|
|
self.failed = 0
|
|
self.current_account = ""
|
|
self.current_phase = "" # 当前阶段 (注册/授权/验证)
|
|
self.current_step = ""
|
|
self.current_role = "" # 当前账号角色 (member/owner)
|
|
self.messages: Dict[int, Message] = {} # chat_id -> Message
|
|
self._last_update = 0
|
|
self._update_interval = 2 # 最小更新间隔 (秒)
|
|
self._loop: asyncio.AbstractEventLoop = None
|
|
|
|
def _get_progress_text(self) -> str:
|
|
"""生成进度消息文本"""
|
|
bar = make_progress_bar(self.current, self.total, 20)
|
|
|
|
# 标题行:显示 Team 序号
|
|
if self.teams_total > 0:
|
|
title = f"<b>◈ Team [{self.team_index}/{self.teams_total}]: {self.team_name}</b>"
|
|
else:
|
|
title = f"<b>◈ 正在处理: {self.team_name}</b>"
|
|
|
|
owner_tag = " (含 Owner)" if self.include_owner else ""
|
|
lines = [
|
|
title,
|
|
"",
|
|
f"🔄 注册进度 {self.current}/{self.total}{owner_tag}",
|
|
bar,
|
|
"",
|
|
f"✅ 成功: {self.success}",
|
|
f"❌ 失败: {self.failed}",
|
|
]
|
|
|
|
if self.current_account:
|
|
lines.append("")
|
|
role_tag = " 👑" if self.current_role == "owner" else ""
|
|
lines.append(f"⏳ 正在处理: <code>{self.current_account}</code>{role_tag}")
|
|
|
|
if self.current_phase and self.current_step:
|
|
lines.append(f" ▸ [{self.current_phase}] {self.current_step}")
|
|
elif self.current_step:
|
|
lines.append(f" ▸ {self.current_step}")
|
|
|
|
return "\n".join(lines)
|
|
|
|
async def _send_initial_message(self):
|
|
"""发送初始进度消息"""
|
|
text = self._get_progress_text()
|
|
for chat_id in self.chat_ids:
|
|
try:
|
|
msg = await self.bot.send_message(
|
|
chat_id=chat_id,
|
|
text=text,
|
|
parse_mode="HTML"
|
|
)
|
|
self.messages[chat_id] = msg
|
|
except TelegramError:
|
|
pass
|
|
|
|
async def _update_messages(self):
|
|
"""更新所有进度消息"""
|
|
text = self._get_progress_text()
|
|
for chat_id, msg in self.messages.items():
|
|
try:
|
|
await self.bot.edit_message_text(
|
|
chat_id=chat_id,
|
|
message_id=msg.message_id,
|
|
text=text,
|
|
parse_mode="HTML"
|
|
)
|
|
except TelegramError:
|
|
pass
|
|
|
|
def _schedule_update(self, force: bool = False):
|
|
"""调度消息更新 (限流)
|
|
|
|
Args:
|
|
force: 是否强制更新 (忽略限流)
|
|
"""
|
|
now = time.time()
|
|
if not force and now - self._last_update < self._update_interval:
|
|
return
|
|
|
|
self._last_update = now
|
|
|
|
if self._loop:
|
|
asyncio.run_coroutine_threadsafe(self._update_messages(), self._loop)
|
|
|
|
def start(self, loop: asyncio.AbstractEventLoop):
|
|
"""启动进度跟踪"""
|
|
self._loop = loop
|
|
asyncio.run_coroutine_threadsafe(self._send_initial_message(), loop)
|
|
|
|
def update(self, current: int = None, account: str = None, phase: str = None, step: str = None, role: str = None):
|
|
"""更新进度 (供同步代码调用)"""
|
|
if current is not None:
|
|
self.current = current
|
|
if account is not None:
|
|
self.current_account = account
|
|
if phase is not None:
|
|
self.current_phase = phase
|
|
if step is not None:
|
|
self.current_step = step
|
|
if role is not None:
|
|
self.current_role = role
|
|
|
|
self._schedule_update()
|
|
|
|
def account_done(self, email: str, success: bool):
|
|
"""标记账号处理完成"""
|
|
self.current += 1
|
|
if success:
|
|
self.success += 1
|
|
else:
|
|
self.failed += 1
|
|
self.current_account = ""
|
|
self.current_phase = ""
|
|
self.current_step = ""
|
|
self.current_role = ""
|
|
# 最后一个账号完成时强制更新,确保显示 100%
|
|
is_last = self.current >= self.total
|
|
self._schedule_update(force=is_last)
|
|
|
|
def finish(self):
|
|
"""完成进度跟踪,发送最终状态"""
|
|
self.current_step = "已完成!"
|
|
# 强制更新最终状态
|
|
self._schedule_update(force=True)
|
|
|
|
|
|
class BotNotifier:
|
|
"""Telegram 通知推送器"""
|
|
|
|
def __init__(self, bot: Bot, chat_ids: List[int] = None):
|
|
self.bot = bot
|
|
self.chat_ids = chat_ids or TELEGRAM_ADMIN_CHAT_IDS
|
|
self._message_queue: asyncio.Queue = None
|
|
self._worker_task: asyncio.Task = None
|
|
self._current_progress: Optional[ProgressTracker] = None
|
|
self._loop: asyncio.AbstractEventLoop = None
|
|
|
|
async def start(self):
|
|
"""启动消息发送队列"""
|
|
self._message_queue = asyncio.Queue()
|
|
self._worker_task = asyncio.create_task(self._message_worker())
|
|
self._loop = asyncio.get_event_loop()
|
|
|
|
async def stop(self):
|
|
"""停止消息发送队列"""
|
|
if self._worker_task:
|
|
self._worker_task.cancel()
|
|
try:
|
|
await self._worker_task
|
|
except asyncio.CancelledError:
|
|
pass
|
|
|
|
async def _message_worker(self):
|
|
"""后台消息发送工作线程"""
|
|
while True:
|
|
try:
|
|
message, level = await self._message_queue.get()
|
|
await self._send_to_all(message)
|
|
self._message_queue.task_done()
|
|
except asyncio.CancelledError:
|
|
break
|
|
except Exception:
|
|
pass
|
|
|
|
async def _send_to_all(self, message: str, parse_mode: str = "HTML"):
|
|
"""发送消息到所有管理员"""
|
|
for chat_id in self.chat_ids:
|
|
try:
|
|
await self.bot.send_message(
|
|
chat_id=chat_id,
|
|
text=message,
|
|
parse_mode=parse_mode
|
|
)
|
|
except TelegramError as e:
|
|
print(f"[BotNotifier] 发送消息失败 (chat_id={chat_id}): {e}")
|
|
|
|
async def _send_photo_to_all(self, photo_path: str, caption: str = ""):
|
|
"""发送图片到所有管理员"""
|
|
for chat_id in self.chat_ids:
|
|
try:
|
|
with open(photo_path, 'rb') as photo:
|
|
await self.bot.send_photo(
|
|
chat_id=chat_id,
|
|
photo=photo,
|
|
caption=caption,
|
|
parse_mode="HTML"
|
|
)
|
|
except TelegramError:
|
|
pass
|
|
except FileNotFoundError:
|
|
pass
|
|
|
|
async def send_screenshot(self, photo_path: str, caption: str = ""):
|
|
"""发送调试截图"""
|
|
await self._send_photo_to_all(photo_path, caption)
|
|
|
|
def queue_message(self, message: str, level: str = "info"):
|
|
"""将消息加入发送队列 (非阻塞)"""
|
|
if self._message_queue:
|
|
try:
|
|
self._message_queue.put_nowait((message, level))
|
|
except asyncio.QueueFull:
|
|
pass
|
|
|
|
async def notify(self, message: str, level: str = "info"):
|
|
"""直接发送通知 (阻塞)"""
|
|
await self._send_to_all(message)
|
|
|
|
def create_progress(self, team_name: str, total: int, team_index: int = 0,
|
|
teams_total: int = 0, include_owner: bool = False) -> ProgressTracker:
|
|
"""创建进度跟踪器
|
|
|
|
Args:
|
|
team_name: Team 名称
|
|
total: 账号总数
|
|
team_index: 当前 Team 序号 (从 1 开始)
|
|
teams_total: Team 总数
|
|
include_owner: 是否包含 Owner
|
|
"""
|
|
self._current_progress = ProgressTracker(
|
|
self.bot, self.chat_ids, team_name, total,
|
|
team_index=team_index, teams_total=teams_total, include_owner=include_owner
|
|
)
|
|
if self._loop:
|
|
self._current_progress.start(self._loop)
|
|
return self._current_progress
|
|
|
|
def get_progress(self) -> Optional[ProgressTracker]:
|
|
"""获取当前进度跟踪器"""
|
|
return self._current_progress
|
|
|
|
def clear_progress(self):
|
|
"""清除进度跟踪器"""
|
|
if self._current_progress:
|
|
self._current_progress.finish()
|
|
self._current_progress = None
|
|
|
|
async def notify_task_started(self, team_name: str):
|
|
"""通知任务开始"""
|
|
await self.notify(f"<b>🚀 任务开始</b>\nTeam: {team_name}")
|
|
|
|
async def notify_task_completed(self, team_name: str, success_accounts: list, failed_accounts: list):
|
|
"""通知任务完成
|
|
|
|
Args:
|
|
team_name: Team 名称
|
|
success_accounts: 成功的账号列表
|
|
failed_accounts: 失败的账号列表
|
|
"""
|
|
if not TELEGRAM_NOTIFY_ON_COMPLETE:
|
|
return
|
|
|
|
success_count = len(success_accounts)
|
|
failed_count = len(failed_accounts)
|
|
status = "全部成功" if failed_count == 0 else f"{failed_count} 个失败"
|
|
|
|
# 构建消息
|
|
message = (
|
|
f"<b>✅ 任务完成</b>\n"
|
|
f"Team: {team_name}\n"
|
|
f"成功: {success_count}\n"
|
|
f"状态: {status}"
|
|
)
|
|
|
|
# 如果有成功的账号,列出来
|
|
if success_accounts:
|
|
message += "\n\n<b>成功账号:</b>"
|
|
for email in success_accounts:
|
|
message += f"\n• <code>{email}</code>"
|
|
|
|
await self.notify(message)
|
|
|
|
async def notify_team_completed(self, team_name: str, results: list):
|
|
"""通知单个 Team 处理完成
|
|
|
|
Args:
|
|
team_name: Team 名称
|
|
results: 处理结果列表 [{"email", "status", ...}]
|
|
"""
|
|
if not results:
|
|
return
|
|
|
|
success_accounts = [r.get("email") for r in results if r.get("status") == "success"]
|
|
failed_accounts = [r.get("email") for r in results if r.get("status") != "success"]
|
|
|
|
success_count = len(success_accounts)
|
|
failed_count = len(failed_accounts)
|
|
total = len(results)
|
|
|
|
# 状态图标
|
|
if failed_count == 0:
|
|
icon = "✅"
|
|
status = "全部成功"
|
|
elif success_count == 0:
|
|
icon = "❌"
|
|
status = "全部失败"
|
|
else:
|
|
icon = "⚠️"
|
|
status = f"{failed_count} 个失败"
|
|
|
|
# 构建消息
|
|
message = (
|
|
f"<b>{icon} Team 处理完成</b>\n"
|
|
f"Team: <code>{team_name}</code>\n"
|
|
f"结果: {success_count}/{total} 成功\n"
|
|
f"状态: {status}"
|
|
)
|
|
|
|
# 列出成功的账号
|
|
if success_accounts:
|
|
message += "\n\n<b>✓ 成功:</b>"
|
|
for email in success_accounts:
|
|
message += f"\n <code>{email}</code>"
|
|
|
|
# 列出失败的账号
|
|
if failed_accounts:
|
|
message += "\n\n<b>✗ 失败:</b>"
|
|
for email in failed_accounts:
|
|
message += f"\n <code>{email}</code>"
|
|
|
|
await self.notify(message)
|
|
|
|
async def notify_error(self, message: str, details: str = ""):
|
|
"""通知错误"""
|
|
if not TELEGRAM_NOTIFY_ON_ERROR:
|
|
return
|
|
text = f"<b>❌ 错误</b>\n{message}"
|
|
if details:
|
|
text += f"\n<code>{details[:500]}</code>"
|
|
await self.notify(text)
|
|
|
|
async def notify_account_status(self, email: str, status: str, team_name: str = ""):
|
|
"""通知账号状态变更"""
|
|
icon = {
|
|
"completed": "OK",
|
|
"authorized": "AUTH",
|
|
"registered": "REG",
|
|
"failed": "FAIL",
|
|
}.get(status, status.upper())
|
|
|
|
text = f"[{icon}] {email}"
|
|
if team_name:
|
|
text = f"<b>{team_name}</b>\n{text}"
|
|
self.queue_message(text, "info")
|
|
|
|
|
|
# 全局通知器实例 (在 telegram_bot.py 中初始化)
|
|
_notifier: Optional[BotNotifier] = None
|
|
|
|
|
|
def set_notifier(notifier: BotNotifier):
|
|
"""设置全局通知器"""
|
|
global _notifier
|
|
_notifier = notifier
|
|
|
|
|
|
def get_notifier() -> Optional[BotNotifier]:
|
|
"""获取全局通知器"""
|
|
return _notifier
|
|
|
|
|
|
def notify_sync(message: str, level: str = "info"):
|
|
"""同步方式发送通知 (供非异步代码使用)"""
|
|
if _notifier:
|
|
_notifier.queue_message(message, level)
|
|
|
|
|
|
def send_screenshot_sync(photo_path: str, caption: str = ""):
|
|
"""同步方式发送截图 (供非异步代码使用)"""
|
|
if _notifier and _notifier._loop:
|
|
import asyncio
|
|
asyncio.run_coroutine_threadsafe(
|
|
_notifier.send_screenshot(photo_path, caption),
|
|
_notifier._loop
|
|
)
|
|
|
|
|
|
# ==================== 进度更新接口 (供 run.py 使用) ====================
|
|
|
|
def progress_start(team_name: str, total: int, team_index: int = 0,
|
|
teams_total: int = 0, include_owner: bool = False) -> Optional[ProgressTracker]:
|
|
"""开始进度跟踪
|
|
|
|
Args:
|
|
team_name: Team 名称
|
|
total: 账号总数
|
|
team_index: 当前 Team 序号 (从 1 开始)
|
|
teams_total: Team 总数
|
|
include_owner: 是否包含 Owner
|
|
"""
|
|
if _notifier:
|
|
return _notifier.create_progress(team_name, total, team_index, teams_total, include_owner)
|
|
return None
|
|
|
|
|
|
def progress_update(account: str = None, phase: str = None, step: str = None, role: str = None):
|
|
"""更新当前进度
|
|
|
|
Args:
|
|
account: 当前处理的账号
|
|
phase: 当前阶段 (注册/授权/验证)
|
|
step: 当前步骤
|
|
role: 账号角色
|
|
"""
|
|
if _notifier and _notifier.get_progress():
|
|
_notifier.get_progress().update(account=account, phase=phase, step=step, role=role)
|
|
|
|
|
|
def progress_account_done(email: str, success: bool):
|
|
"""标记账号完成"""
|
|
if _notifier and _notifier.get_progress():
|
|
_notifier.get_progress().account_done(email, success)
|
|
|
|
|
|
def progress_finish():
|
|
"""完成进度跟踪"""
|
|
if _notifier:
|
|
_notifier.clear_progress()
|
|
|
|
|
|
def notify_team_completed_sync(team_name: str, results: list):
|
|
"""同步方式通知单个 Team 完成 (供 run.py 使用)"""
|
|
if _notifier and _notifier._loop:
|
|
import asyncio
|
|
asyncio.run_coroutine_threadsafe(
|
|
_notifier.notify_team_completed(team_name, results),
|
|
_notifier._loop
|
|
)
|