Files
autoPlus/bot/services/file_sender.py
2026-01-30 10:48:56 +08:00

102 lines
2.7 KiB
Python

"""
JSON 文件生成与发送服务
"""
import json
import os
import tempfile
from datetime import datetime
from typing import Any
from telegram import Update
from telegram.ext import ContextTypes
async def send_results_as_json(
update: Update,
context: ContextTypes.DEFAULT_TYPE,
task_id: str,
plan: str,
results: list[dict[str, Any]],
) -> None:
"""
将结果生成为 JSON 文件并发送给用户
参数:
update: Telegram Update 对象
context: Telegram Context 对象
task_id: 任务 ID
plan: 订阅计划名称
results: 结果列表
"""
# 统计
success_count = sum(1 for r in results if r.get("status") == "success")
failed_count = len(results) - success_count
# 构建 JSON 数据
now = datetime.now()
data = {
"task_id": task_id,
"plan": plan,
"created_at": now.strftime("%Y-%m-%d %H:%M:%S"),
"results": [
{
"email": r.get("email", ""),
"password": r.get("password", ""),
"url": r.get("checkout_url", ""),
"checkout_session_id": r.get("checkout_session_id", ""),
"status": r.get("status", "failed"),
}
for r in results
],
"summary": {
"total": len(results),
"success": success_count,
"failed": failed_count,
},
}
# 生成文件名
timestamp = now.strftime("%Y%m%d_%H%M%S")
filename = f"checkout_{timestamp}.json"
# 写入临时文件并发送
with tempfile.NamedTemporaryFile(
mode="w",
suffix=".json",
prefix="checkout_",
delete=False,
encoding="utf-8",
) as f:
json.dump(data, f, indent=2, ensure_ascii=False)
temp_path = f.name
try:
# 发送消息(不含敏感信息)
message_text = (
f"✅ **任务完成**\n\n"
f"📦 Plan: {plan}\n"
f"✅ 成功: {success_count}\n"
f"❌ 失败: {failed_count}\n"
)
await context.bot.send_message(
chat_id=update.effective_chat.id,
text=message_text,
parse_mode="Markdown",
)
# 发送 JSON 文件
with open(temp_path, "rb") as f:
await context.bot.send_document(
chat_id=update.effective_chat.id,
document=f,
filename=filename,
caption=f"📎 任务 `{task_id}` 结果",
parse_mode="Markdown",
)
finally:
# 清理临时文件
if os.path.exists(temp_path):
os.remove(temp_path)