""" /status 命令处理器 - 查看任务状态 """ from telegram import Update from telegram.ext import ContextTypes from bot.middlewares.auth import require_auth from bot.services.task_manager import TaskStatus STATUS_EMOJI = { TaskStatus.PENDING: "⏳", TaskStatus.RUNNING: "🔄", TaskStatus.COMPLETED: "✅", TaskStatus.FAILED: "❌", TaskStatus.PARTIAL: "⚠️", } @require_auth async def status_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """ 处理 /status 命令 - 查看任务状态 用法: /status - 查看所有进行中的任务 /status - 查看指定任务详情 """ task_manager = context.bot_data["task_manager"] user_id = update.effective_user.id # 检查是否指定了任务 ID if context.args: task_id = context.args[0] task = task_manager.get_task(task_id) if not task: await update.message.reply_text(f"❌ 未找到任务: `{task_id}`", parse_mode="Markdown") return # 检查是否是自己的任务(或管理员) admin_users = context.bot_data.get("admin_users", set()) if task.user_id != user_id and user_id not in admin_users: await update.message.reply_text("❌ 你没有权限查看此任务") return # 显示任务详情 emoji = STATUS_EMOJI.get(task.status, "❓") progress_pct = (task.completed / task.total * 100) if task.total > 0 else 0 text = ( f"{emoji} **任务详情**\n\n" f"🆔 ID: `{task.task_id}`\n" f"📋 类型: {task.task_type}\n" f"📝 描述: {task.description}\n" f"📊 状态: {task.status.value}\n" f"📈 进度: {task.completed}/{task.total} ({progress_pct:.0f}%)\n" ) if task.current_item: text += f"🔄 当前: {task.current_item}\n" if task.result: text += f"\n📦 结果:\n```\n{task.result}\n```" await update.message.reply_text(text, parse_mode="Markdown") return # 获取用户的所有任务 admin_users = context.bot_data.get("admin_users", set()) is_admin = user_id in admin_users if is_admin: # 管理员可以看所有任务 tasks = task_manager.get_all_tasks() else: # 普通用户只能看自己的 tasks = task_manager.get_user_tasks(user_id) if not tasks: await update.message.reply_text( "📭 没有找到任务记录\n\n" "使用 `/register` 开始注册账号", parse_mode="Markdown" ) return # 构建任务列表 text = "📋 **任务列表**\n\n" # 先显示进行中的任务 running_tasks = [t for t in tasks if t.status in [TaskStatus.PENDING, TaskStatus.RUNNING]] completed_tasks = [t for t in tasks if t.status not in [TaskStatus.PENDING, TaskStatus.RUNNING]] if running_tasks: text += "**进行中:**\n" for task in running_tasks[:5]: emoji = STATUS_EMOJI.get(task.status, "❓") progress_pct = (task.completed / task.total * 100) if task.total > 0 else 0 text += f"{emoji} `{task.task_id}` - {task.description} ({progress_pct:.0f}%)\n" text += "\n" if completed_tasks: text += "**已完成:**\n" for task in completed_tasks[:5]: emoji = STATUS_EMOJI.get(task.status, "❓") text += f"{emoji} `{task.task_id}` - {task.description}\n" text += f"\n💡 使用 `/status ` 查看详情" await update.message.reply_text(text, parse_mode="Markdown")