This commit is contained in:
2026-01-20 21:42:14 +08:00
parent 2f04569470
commit 7a7b503b3c

View File

@@ -1511,9 +1511,12 @@ class ProvisionerBot:
if self._import_progress_message is None:
return
# 取消超时任务
# 取消超时任务 (job_queue job)
if self._import_batch_timeout_task:
self._import_batch_timeout_task.cancel()
try:
self._import_batch_timeout_task.schedule_removal()
except Exception:
pass
self._import_batch_timeout_task = None
# 更新最终进度
@@ -1523,13 +1526,26 @@ class ProvisionerBot:
self._import_progress_message = None
self._reset_import_batch_stats()
async def _import_batch_timeout(self, chat_id: int, delay: float = 2.0):
"""批量导入超时处理 - 在一定时间后自动完成批次"""
try:
await asyncio.sleep(delay)
async def _import_batch_timeout_callback(self, context: ContextTypes.DEFAULT_TYPE):
"""批量导入超时回调 - 由 job_queue 调用"""
chat_id = context.job.data.get("chat_id")
if chat_id:
await self._finalize_import_batch(chat_id)
except asyncio.CancelledError:
pass
def _schedule_import_finalize(self, context: ContextTypes.DEFAULT_TYPE, chat_id: int, delay: float = 1.5):
"""调度批量导入完成任务"""
# 取消之前的超时任务
if self._import_batch_timeout_task:
self._import_batch_timeout_task.schedule_removal()
self._import_batch_timeout_task = None
# 使用 job_queue 调度新的超时任务
self._import_batch_timeout_task = context.job_queue.run_once(
self._import_batch_timeout_callback,
when=delay,
data={"chat_id": chat_id},
name="import_batch_finalize"
)
@admin_only
async def handle_json_file(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
@@ -1550,7 +1566,7 @@ class ProvisionerBot:
async with self._import_progress_lock:
# 取消之前的超时任务(如果有)
if self._import_batch_timeout_task:
self._import_batch_timeout_task.cancel()
self._import_batch_timeout_task.schedule_removal()
self._import_batch_timeout_task = None
# 更新统计
@@ -1593,10 +1609,14 @@ class ProvisionerBot:
# 更新进度
await self._update_import_progress(chat_id)
# 设置超时任务2秒后如果没有新文件则完成批次
self._import_batch_timeout_task = asyncio.create_task(
self._import_batch_timeout(chat_id, delay=2.0)
)
# 检查是否所有文件都已处理,如果是则调度完成任务
stats = self._import_batch_stats
if stats["processed_files"] >= stats["total_files"]:
# 所有文件处理完成,短延迟后完成批次(防止更多文件到来)
self._schedule_import_finalize(context, chat_id, delay=1.0)
else:
# 还有文件在处理,设置较长的超时
self._schedule_import_finalize(context, chat_id, delay=3.0)
except Exception as e:
async with self._import_progress_lock:
@@ -1606,10 +1626,12 @@ class ProvisionerBot:
await self._update_import_progress(chat_id)
# 设置超时任务
self._import_batch_timeout_task = asyncio.create_task(
self._import_batch_timeout(chat_id, delay=2.0)
)
# 调度完成任务
stats = self._import_batch_stats
if stats["processed_files"] >= stats["total_files"]:
self._schedule_import_finalize(context, chat_id, delay=1.0)
else:
self._schedule_import_finalize(context, chat_id, delay=3.0)
async def _process_import_json_batch(self, json_text: str) -> dict:
"""处理导入的 JSON 数据,保存到 team.json (批量版本,返回结果)