From 7a7b503b3c98e1e644e49a6f1260211b65089422 Mon Sep 17 00:00:00 2001 From: kyx236 Date: Tue, 20 Jan 2026 21:42:14 +0800 Subject: [PATCH] 111 --- telegram_bot.py | 56 ++++++++++++++++++++++++++++++++++--------------- 1 file changed, 39 insertions(+), 17 deletions(-) diff --git a/telegram_bot.py b/telegram_bot.py index b63b52a..745cbb3 100644 --- a/telegram_bot.py +++ b/telegram_bot.py @@ -1511,9 +1511,12 @@ class ProvisionerBot: if self._import_progress_message is None: return - # 取消超时任务 + # 取消超时任务 (job_queue job) if self._import_batch_timeout_task: - self._import_batch_timeout_task.cancel() + try: + self._import_batch_timeout_task.schedule_removal() + except Exception: + pass self._import_batch_timeout_task = None # 更新最终进度 @@ -1523,13 +1526,26 @@ class ProvisionerBot: self._import_progress_message = None self._reset_import_batch_stats() - async def _import_batch_timeout(self, chat_id: int, delay: float = 2.0): - """批量导入超时处理 - 在一定时间后自动完成批次""" - try: - await asyncio.sleep(delay) + async def _import_batch_timeout_callback(self, context: ContextTypes.DEFAULT_TYPE): + """批量导入超时回调 - 由 job_queue 调用""" + chat_id = context.job.data.get("chat_id") + if chat_id: await self._finalize_import_batch(chat_id) - except asyncio.CancelledError: - pass + + def _schedule_import_finalize(self, context: ContextTypes.DEFAULT_TYPE, chat_id: int, delay: float = 1.5): + """调度批量导入完成任务""" + # 取消之前的超时任务 + if self._import_batch_timeout_task: + self._import_batch_timeout_task.schedule_removal() + self._import_batch_timeout_task = None + + # 使用 job_queue 调度新的超时任务 + self._import_batch_timeout_task = context.job_queue.run_once( + self._import_batch_timeout_callback, + when=delay, + data={"chat_id": chat_id}, + name="import_batch_finalize" + ) @admin_only async def handle_json_file(self, update: Update, context: ContextTypes.DEFAULT_TYPE): @@ -1550,7 +1566,7 @@ class ProvisionerBot: async with self._import_progress_lock: # 取消之前的超时任务(如果有) if self._import_batch_timeout_task: - self._import_batch_timeout_task.cancel() + self._import_batch_timeout_task.schedule_removal() self._import_batch_timeout_task = None # 更新统计 @@ -1593,10 +1609,14 @@ class ProvisionerBot: # 更新进度 await self._update_import_progress(chat_id) - # 设置超时任务(2秒后如果没有新文件则完成批次) - self._import_batch_timeout_task = asyncio.create_task( - self._import_batch_timeout(chat_id, delay=2.0) - ) + # 检查是否所有文件都已处理,如果是则调度完成任务 + stats = self._import_batch_stats + if stats["processed_files"] >= stats["total_files"]: + # 所有文件处理完成,短延迟后完成批次(防止更多文件到来) + self._schedule_import_finalize(context, chat_id, delay=1.0) + else: + # 还有文件在处理,设置较长的超时 + self._schedule_import_finalize(context, chat_id, delay=3.0) except Exception as e: async with self._import_progress_lock: @@ -1606,10 +1626,12 @@ class ProvisionerBot: await self._update_import_progress(chat_id) - # 设置超时任务 - self._import_batch_timeout_task = asyncio.create_task( - self._import_batch_timeout(chat_id, delay=2.0) - ) + # 调度完成任务 + stats = self._import_batch_stats + if stats["processed_files"] >= stats["total_files"]: + self._schedule_import_finalize(context, chat_id, delay=1.0) + else: + self._schedule_import_finalize(context, chat_id, delay=3.0) async def _process_import_json_batch(self, json_text: str) -> dict: """处理导入的 JSON 数据,保存到 team.json (批量版本,返回结果)