重构机器人

This commit is contained in:
dela
2026-01-30 10:48:56 +08:00
parent 81577a3a59
commit 9b7ecb7b80
17 changed files with 1270 additions and 1 deletions

7
bot/services/__init__.py Normal file
View File

@@ -0,0 +1,7 @@
"""
Bot 服务模块
"""
from bot.services.task_manager import TaskManager, Task, TaskStatus
__all__ = ["TaskManager", "Task", "TaskStatus"]

101
bot/services/file_sender.py Normal file
View File

@@ -0,0 +1,101 @@
"""
JSON 文件生成与发送服务
"""
import json
import os
import tempfile
from datetime import datetime
from typing import Any
from telegram import Update
from telegram.ext import ContextTypes
async def send_results_as_json(
update: Update,
context: ContextTypes.DEFAULT_TYPE,
task_id: str,
plan: str,
results: list[dict[str, Any]],
) -> None:
"""
将结果生成为 JSON 文件并发送给用户
参数:
update: Telegram Update 对象
context: Telegram Context 对象
task_id: 任务 ID
plan: 订阅计划名称
results: 结果列表
"""
# 统计
success_count = sum(1 for r in results if r.get("status") == "success")
failed_count = len(results) - success_count
# 构建 JSON 数据
now = datetime.now()
data = {
"task_id": task_id,
"plan": plan,
"created_at": now.strftime("%Y-%m-%d %H:%M:%S"),
"results": [
{
"email": r.get("email", ""),
"password": r.get("password", ""),
"url": r.get("checkout_url", ""),
"checkout_session_id": r.get("checkout_session_id", ""),
"status": r.get("status", "failed"),
}
for r in results
],
"summary": {
"total": len(results),
"success": success_count,
"failed": failed_count,
},
}
# 生成文件名
timestamp = now.strftime("%Y%m%d_%H%M%S")
filename = f"checkout_{timestamp}.json"
# 写入临时文件并发送
with tempfile.NamedTemporaryFile(
mode="w",
suffix=".json",
prefix="checkout_",
delete=False,
encoding="utf-8",
) as f:
json.dump(data, f, indent=2, ensure_ascii=False)
temp_path = f.name
try:
# 发送消息(不含敏感信息)
message_text = (
f"✅ **任务完成**\n\n"
f"📦 Plan: {plan}\n"
f"✅ 成功: {success_count}\n"
f"❌ 失败: {failed_count}\n"
)
await context.bot.send_message(
chat_id=update.effective_chat.id,
text=message_text,
parse_mode="Markdown",
)
# 发送 JSON 文件
with open(temp_path, "rb") as f:
await context.bot.send_document(
chat_id=update.effective_chat.id,
document=f,
filename=filename,
caption=f"📎 任务 `{task_id}` 结果",
parse_mode="Markdown",
)
finally:
# 清理临时文件
if os.path.exists(temp_path):
os.remove(temp_path)

View File

@@ -0,0 +1,228 @@
"""
后台任务管理器
管理异步执行的注册、登录等任务的状态和进度
"""
import uuid
from datetime import datetime
from enum import Enum
from typing import Dict, List, Optional, Any
from dataclasses import dataclass, field
class TaskStatus(Enum):
"""任务状态枚举"""
PENDING = "pending" # 等待执行
RUNNING = "running" # 执行中
COMPLETED = "completed" # 已完成
FAILED = "failed" # 失败
PARTIAL = "partial" # 部分成功
@dataclass
class Task:
"""任务数据类"""
task_id: str
user_id: int
task_type: str # register, login, checkout
status: TaskStatus
description: str
total: int = 0 # 总任务数
completed: int = 0 # 已完成数
current_item: str = "" # 当前处理项
result: Optional[Any] = None
error: Optional[str] = None
created_at: datetime = field(default_factory=datetime.now)
updated_at: datetime = field(default_factory=datetime.now)
class TaskManager:
"""
任务管理器
管理后台任务的创建、状态更新和查询
"""
def __init__(self, max_history: int = 100):
"""
初始化任务管理器
参数:
max_history: 保留的历史任务数量
"""
self._tasks: Dict[str, Task] = {}
self._max_history = max_history
def create_task(
self,
user_id: int,
task_type: str,
total: int = 1,
description: str = ""
) -> str:
"""
创建新任务
参数:
user_id: 用户 ID
task_type: 任务类型 (register, login, checkout)
total: 总任务数
description: 任务描述
返回:
任务 ID
"""
task_id = self._generate_task_id()
task = Task(
task_id=task_id,
user_id=user_id,
task_type=task_type,
status=TaskStatus.RUNNING,
description=description or f"{task_type} task",
total=total,
)
self._tasks[task_id] = task
self._cleanup_old_tasks()
return task_id
def update_progress(
self,
task_id: str,
completed: Optional[int] = None,
current_item: Optional[str] = None
) -> bool:
"""
更新任务进度
参数:
task_id: 任务 ID
completed: 已完成数量
current_item: 当前处理项描述
返回:
是否更新成功
"""
task = self._tasks.get(task_id)
if not task:
return False
if completed is not None:
task.completed = completed
if current_item is not None:
task.current_item = current_item
task.updated_at = datetime.now()
return True
def complete_task(
self,
task_id: str,
status: TaskStatus = TaskStatus.COMPLETED,
result: Any = None,
error: str = None
) -> bool:
"""
完成任务
参数:
task_id: 任务 ID
status: 最终状态
result: 任务结果
error: 错误信息(如果失败)
返回:
是否更新成功
"""
task = self._tasks.get(task_id)
if not task:
return False
task.status = status
task.result = result
task.error = error
task.completed = task.total
task.current_item = ""
task.updated_at = datetime.now()
return True
def get_task(self, task_id: str) -> Optional[Task]:
"""
获取任务
参数:
task_id: 任务 ID
返回:
Task 对象,如果不存在返回 None
"""
return self._tasks.get(task_id)
def get_user_tasks(self, user_id: int, limit: int = 10) -> List[Task]:
"""
获取用户的任务列表
参数:
user_id: 用户 ID
limit: 返回数量限制
返回:
任务列表(按创建时间倒序)
"""
user_tasks = [
task for task in self._tasks.values()
if task.user_id == user_id
]
user_tasks.sort(key=lambda t: t.created_at, reverse=True)
return user_tasks[:limit]
def get_all_tasks(self, limit: int = 20) -> List[Task]:
"""
获取所有任务(管理员用)
参数:
limit: 返回数量限制
返回:
任务列表(按创建时间倒序)
"""
all_tasks = list(self._tasks.values())
all_tasks.sort(key=lambda t: t.created_at, reverse=True)
return all_tasks[:limit]
def get_running_tasks(self) -> List[Task]:
"""
获取正在运行的任务
返回:
运行中的任务列表
"""
return [
task for task in self._tasks.values()
if task.status in [TaskStatus.PENDING, TaskStatus.RUNNING]
]
def _generate_task_id(self) -> str:
"""生成唯一任务 ID"""
return uuid.uuid4().hex[:8]
def _cleanup_old_tasks(self):
"""清理旧任务,保持历史记录在限制内"""
if len(self._tasks) <= self._max_history:
return
# 获取已完成的任务,按时间排序
completed_tasks = [
(task_id, task) for task_id, task in self._tasks.items()
if task.status not in [TaskStatus.PENDING, TaskStatus.RUNNING]
]
completed_tasks.sort(key=lambda x: x[1].created_at)
# 删除最旧的任务
to_remove = len(self._tasks) - self._max_history
for task_id, _ in completed_tasks[:to_remove]:
del self._tasks[task_id]