Files
autoPlus/bot/services/task_manager.py
2026-01-30 10:48:56 +08:00

229 lines
5.7 KiB
Python

"""
后台任务管理器
管理异步执行的注册、登录等任务的状态和进度
"""
import uuid
from datetime import datetime
from enum import Enum
from typing import Dict, List, Optional, Any
from dataclasses import dataclass, field
class TaskStatus(Enum):
"""任务状态枚举"""
PENDING = "pending" # 等待执行
RUNNING = "running" # 执行中
COMPLETED = "completed" # 已完成
FAILED = "failed" # 失败
PARTIAL = "partial" # 部分成功
@dataclass
class Task:
"""任务数据类"""
task_id: str
user_id: int
task_type: str # register, login, checkout
status: TaskStatus
description: str
total: int = 0 # 总任务数
completed: int = 0 # 已完成数
current_item: str = "" # 当前处理项
result: Optional[Any] = None
error: Optional[str] = None
created_at: datetime = field(default_factory=datetime.now)
updated_at: datetime = field(default_factory=datetime.now)
class TaskManager:
"""
任务管理器
管理后台任务的创建、状态更新和查询
"""
def __init__(self, max_history: int = 100):
"""
初始化任务管理器
参数:
max_history: 保留的历史任务数量
"""
self._tasks: Dict[str, Task] = {}
self._max_history = max_history
def create_task(
self,
user_id: int,
task_type: str,
total: int = 1,
description: str = ""
) -> str:
"""
创建新任务
参数:
user_id: 用户 ID
task_type: 任务类型 (register, login, checkout)
total: 总任务数
description: 任务描述
返回:
任务 ID
"""
task_id = self._generate_task_id()
task = Task(
task_id=task_id,
user_id=user_id,
task_type=task_type,
status=TaskStatus.RUNNING,
description=description or f"{task_type} task",
total=total,
)
self._tasks[task_id] = task
self._cleanup_old_tasks()
return task_id
def update_progress(
self,
task_id: str,
completed: Optional[int] = None,
current_item: Optional[str] = None
) -> bool:
"""
更新任务进度
参数:
task_id: 任务 ID
completed: 已完成数量
current_item: 当前处理项描述
返回:
是否更新成功
"""
task = self._tasks.get(task_id)
if not task:
return False
if completed is not None:
task.completed = completed
if current_item is not None:
task.current_item = current_item
task.updated_at = datetime.now()
return True
def complete_task(
self,
task_id: str,
status: TaskStatus = TaskStatus.COMPLETED,
result: Any = None,
error: str = None
) -> bool:
"""
完成任务
参数:
task_id: 任务 ID
status: 最终状态
result: 任务结果
error: 错误信息(如果失败)
返回:
是否更新成功
"""
task = self._tasks.get(task_id)
if not task:
return False
task.status = status
task.result = result
task.error = error
task.completed = task.total
task.current_item = ""
task.updated_at = datetime.now()
return True
def get_task(self, task_id: str) -> Optional[Task]:
"""
获取任务
参数:
task_id: 任务 ID
返回:
Task 对象,如果不存在返回 None
"""
return self._tasks.get(task_id)
def get_user_tasks(self, user_id: int, limit: int = 10) -> List[Task]:
"""
获取用户的任务列表
参数:
user_id: 用户 ID
limit: 返回数量限制
返回:
任务列表(按创建时间倒序)
"""
user_tasks = [
task for task in self._tasks.values()
if task.user_id == user_id
]
user_tasks.sort(key=lambda t: t.created_at, reverse=True)
return user_tasks[:limit]
def get_all_tasks(self, limit: int = 20) -> List[Task]:
"""
获取所有任务(管理员用)
参数:
limit: 返回数量限制
返回:
任务列表(按创建时间倒序)
"""
all_tasks = list(self._tasks.values())
all_tasks.sort(key=lambda t: t.created_at, reverse=True)
return all_tasks[:limit]
def get_running_tasks(self) -> List[Task]:
"""
获取正在运行的任务
返回:
运行中的任务列表
"""
return [
task for task in self._tasks.values()
if task.status in [TaskStatus.PENDING, TaskStatus.RUNNING]
]
def _generate_task_id(self) -> str:
"""生成唯一任务 ID"""
return uuid.uuid4().hex[:8]
def _cleanup_old_tasks(self):
"""清理旧任务,保持历史记录在限制内"""
if len(self._tasks) <= self._max_history:
return
# 获取已完成的任务,按时间排序
completed_tasks = [
(task_id, task) for task_id, task in self._tasks.items()
if task.status not in [TaskStatus.PENDING, TaskStatus.RUNNING]
]
completed_tasks.sort(key=lambda x: x[1].created_at)
# 删除最旧的任务
to_remove = len(self._tasks) - self._max_history
for task_id, _ in completed_tasks[:to_remove]:
del self._tasks[task_id]