""" JSON 文件生成与发送服务 """ import json import os import tempfile from datetime import datetime from typing import Any from telegram import Update from telegram.ext import ContextTypes async def send_results_as_json( update: Update, context: ContextTypes.DEFAULT_TYPE, task_id: str, plan: str, results: list[dict[str, Any]], ) -> None: """ 将结果生成为 JSON 文件并发送给用户 参数: update: Telegram Update 对象 context: Telegram Context 对象 task_id: 任务 ID plan: 订阅计划名称 results: 结果列表 """ # 统计 success_count = sum(1 for r in results if r.get("status") == "success") failed_count = len(results) - success_count # 构建 JSON 数据 now = datetime.now() data = { "task_id": task_id, "plan": plan, "created_at": now.strftime("%Y-%m-%d %H:%M:%S"), "results": [ { "email": r.get("email", ""), "password": r.get("password", ""), "url": r.get("checkout_url", ""), "checkout_session_id": r.get("checkout_session_id", ""), "status": r.get("status", "failed"), } for r in results ], "summary": { "total": len(results), "success": success_count, "failed": failed_count, }, } # 生成文件名 timestamp = now.strftime("%Y%m%d_%H%M%S") filename = f"checkout_{timestamp}.json" # 写入临时文件并发送 with tempfile.NamedTemporaryFile( mode="w", suffix=".json", prefix="checkout_", delete=False, encoding="utf-8", ) as f: json.dump(data, f, indent=2, ensure_ascii=False) temp_path = f.name try: # 发送消息(不含敏感信息) message_text = ( f"✅ **任务完成**\n\n" f"📦 Plan: {plan}\n" f"✅ 成功: {success_count}\n" f"❌ 失败: {failed_count}\n" ) await context.bot.send_message( chat_id=update.effective_chat.id, text=message_text, parse_mode="Markdown", ) # 发送 JSON 文件 with open(temp_path, "rb") as f: await context.bot.send_document( chat_id=update.effective_chat.id, document=f, filename=filename, caption=f"📎 任务 `{task_id}` 结果", parse_mode="Markdown", ) finally: # 清理临时文件 if os.path.exists(temp_path): os.remove(temp_path)