""" 后台任务管理器 管理异步执行的注册、登录等任务的状态和进度 """ import uuid from datetime import datetime from enum import Enum from typing import Dict, List, Optional, Any from dataclasses import dataclass, field class TaskStatus(Enum): """任务状态枚举""" PENDING = "pending" # 等待执行 RUNNING = "running" # 执行中 COMPLETED = "completed" # 已完成 FAILED = "failed" # 失败 PARTIAL = "partial" # 部分成功 @dataclass class Task: """任务数据类""" task_id: str user_id: int task_type: str # register, login, checkout status: TaskStatus description: str total: int = 0 # 总任务数 completed: int = 0 # 已完成数 current_item: str = "" # 当前处理项 result: Optional[Any] = None error: Optional[str] = None created_at: datetime = field(default_factory=datetime.now) updated_at: datetime = field(default_factory=datetime.now) class TaskManager: """ 任务管理器 管理后台任务的创建、状态更新和查询 """ def __init__(self, max_history: int = 100): """ 初始化任务管理器 参数: max_history: 保留的历史任务数量 """ self._tasks: Dict[str, Task] = {} self._max_history = max_history def create_task( self, user_id: int, task_type: str, total: int = 1, description: str = "" ) -> str: """ 创建新任务 参数: user_id: 用户 ID task_type: 任务类型 (register, login, checkout) total: 总任务数 description: 任务描述 返回: 任务 ID """ task_id = self._generate_task_id() task = Task( task_id=task_id, user_id=user_id, task_type=task_type, status=TaskStatus.RUNNING, description=description or f"{task_type} task", total=total, ) self._tasks[task_id] = task self._cleanup_old_tasks() return task_id def update_progress( self, task_id: str, completed: Optional[int] = None, current_item: Optional[str] = None ) -> bool: """ 更新任务进度 参数: task_id: 任务 ID completed: 已完成数量 current_item: 当前处理项描述 返回: 是否更新成功 """ task = self._tasks.get(task_id) if not task: return False if completed is not None: task.completed = completed if current_item is not None: task.current_item = current_item task.updated_at = datetime.now() return True def complete_task( self, task_id: str, status: TaskStatus = TaskStatus.COMPLETED, result: Any = None, error: str = None ) -> bool: """ 完成任务 参数: task_id: 任务 ID status: 最终状态 result: 任务结果 error: 错误信息(如果失败) 返回: 是否更新成功 """ task = self._tasks.get(task_id) if not task: return False task.status = status task.result = result task.error = error task.completed = task.total task.current_item = "" task.updated_at = datetime.now() return True def get_task(self, task_id: str) -> Optional[Task]: """ 获取任务 参数: task_id: 任务 ID 返回: Task 对象,如果不存在返回 None """ return self._tasks.get(task_id) def get_user_tasks(self, user_id: int, limit: int = 10) -> List[Task]: """ 获取用户的任务列表 参数: user_id: 用户 ID limit: 返回数量限制 返回: 任务列表(按创建时间倒序) """ user_tasks = [ task for task in self._tasks.values() if task.user_id == user_id ] user_tasks.sort(key=lambda t: t.created_at, reverse=True) return user_tasks[:limit] def get_all_tasks(self, limit: int = 20) -> List[Task]: """ 获取所有任务(管理员用) 参数: limit: 返回数量限制 返回: 任务列表(按创建时间倒序) """ all_tasks = list(self._tasks.values()) all_tasks.sort(key=lambda t: t.created_at, reverse=True) return all_tasks[:limit] def get_running_tasks(self) -> List[Task]: """ 获取正在运行的任务 返回: 运行中的任务列表 """ return [ task for task in self._tasks.values() if task.status in [TaskStatus.PENDING, TaskStatus.RUNNING] ] def _generate_task_id(self) -> str: """生成唯一任务 ID""" return uuid.uuid4().hex[:8] def _cleanup_old_tasks(self): """清理旧任务,保持历史记录在限制内""" if len(self._tasks) <= self._max_history: return # 获取已完成的任务,按时间排序 completed_tasks = [ (task_id, task) for task_id, task in self._tasks.items() if task.status not in [TaskStatus.PENDING, TaskStatus.RUNNING] ] completed_tasks.sort(key=lambda x: x[1].created_at) # 删除最旧的任务 to_remove = len(self._tasks) - self._max_history for task_id, _ in completed_tasks[:to_remove]: del self._tasks[task_id]