From 4d5fa36183218b5463c1525447b36e09397d52a3 Mon Sep 17 00:00:00 2001 From: kyx236 Date: Tue, 10 Feb 2026 02:38:45 +0800 Subject: [PATCH] feat: add configurable timed scheduler to the Telegram bot for automated tasks. --- config.py | 23 ++ config.toml.example | 18 ++ telegram_bot.py | 720 +++++++++++++++++++++++++++++++++++++++++++- 3 files changed, 760 insertions(+), 1 deletion(-) diff --git a/config.py b/config.py index 0caa04c..5529fc8 100644 --- a/config.py +++ b/config.py @@ -314,6 +314,9 @@ def reload_config() -> dict: global S2A_API_BASE, S2A_ADMIN_KEY, S2A_ADMIN_TOKEN global S2A_CONCURRENCY, S2A_PRIORITY, S2A_GROUP_NAMES, S2A_GROUP_IDS, S2A_API_MODE global CONCURRENT_ENABLED, CONCURRENT_WORKERS + global SCHEDULER_ENABLED, SCHEDULER_START_HOUR, SCHEDULER_END_HOUR + global SCHEDULER_BATCH_SIZE, SCHEDULER_COOLDOWN_MINUTES, SCHEDULER_OUTPUT_TYPE + global SCHEDULER_MAX_CONSECUTIVE_FAILURES result = { "success": True, @@ -389,6 +392,16 @@ def reload_config() -> dict: S2A_GROUP_IDS = _s2a.get("group_ids", []) S2A_API_MODE = _s2a.get("api_mode", False) + # 定时调度器配置 + _scheduler = _cfg.get("scheduler", {}) + SCHEDULER_ENABLED = _scheduler.get("enabled", False) + SCHEDULER_START_HOUR = _scheduler.get("start_hour", 8) + SCHEDULER_END_HOUR = _scheduler.get("end_hour", 14) + SCHEDULER_BATCH_SIZE = _scheduler.get("batch_size", 50) + SCHEDULER_COOLDOWN_MINUTES = _scheduler.get("cooldown_minutes", 5) + SCHEDULER_OUTPUT_TYPE = _scheduler.get("output_type", "team") + SCHEDULER_MAX_CONSECUTIVE_FAILURES = _scheduler.get("max_consecutive_failures", 3) + except Exception as e: errors.append(f"config.toml: {e}") @@ -677,6 +690,16 @@ TELEGRAM_NOTIFY_ON_ERROR = _telegram.get("notify_on_error", True) TELEGRAM_CHECK_INTERVAL = _telegram.get("check_interval", 3600) # 默认1小时检查一次 TELEGRAM_LOW_STOCK_THRESHOLD = _telegram.get("low_stock_threshold", 10) # 低库存阈值 +# 定时调度器配置 +_scheduler = _cfg.get("scheduler", {}) +SCHEDULER_ENABLED = _scheduler.get("enabled", False) # 是否启用定时调度 +SCHEDULER_START_HOUR = _scheduler.get("start_hour", 8) # 开始时间 (小时, 0-23) +SCHEDULER_END_HOUR = _scheduler.get("end_hour", 14) # 结束时间 (小时, 0-23) +SCHEDULER_BATCH_SIZE = _scheduler.get("batch_size", 50) # 每轮注册数量 +SCHEDULER_COOLDOWN_MINUTES = _scheduler.get("cooldown_minutes", 5) # 轮次间冷却 (分钟) +SCHEDULER_OUTPUT_TYPE = _scheduler.get("output_type", "team") # 输出方式: team / json +SCHEDULER_MAX_CONSECUTIVE_FAILURES = _scheduler.get("max_consecutive_failures", 3) # 连续失败N轮后暂停 + # 代理 # 注意: proxy_enabled 和 proxies 可能在顶层或被误放在 browser section 下 _proxy_enabled_top = _cfg.get("proxy_enabled") diff --git a/config.toml.example b/config.toml.example index e4c7ce0..462939c 100644 --- a/config.toml.example +++ b/config.toml.example @@ -235,6 +235,24 @@ check_interval = 3600 # 低库存预警阈值 (正常账号数低于此值时预警) low_stock_threshold = 10 +# ==================== 定时调度器配置 ==================== +# 时间窗口内自动循环执行: 注册 → run_all → 冷却 → 重复 +# 通过 Telegram Bot 的 /schedule 命令开启/关闭 +[scheduler] +# 是否启用定时调度 (也可通过 /schedule on 命令开启) +enabled = false +# 时间窗口: 仅在此时间段内运行 (24小时制) +start_hour = 8 +end_hour = 14 +# 每轮注册的 GPT Team 账号数量 +batch_size = 50 +# 每轮完成后的冷却时间 (分钟) +cooldown_minutes = 5 +# 注册输出方式: "team" (写入 team.json 供 run_all 处理) +output_type = "team" +# 连续失败 N 轮后自动暂停调度器并发送告警 +max_consecutive_failures = 3 + # ==================== AutoGPTPlus 配置 ==================== # 独立的 ChatGPT 订阅自动化脚本配置 [autogptplus] diff --git a/telegram_bot.py b/telegram_bot.py index cdb25ab..90e9113 100644 --- a/telegram_bot.py +++ b/telegram_bot.py @@ -19,6 +19,11 @@ from telegram.ext import ( ContextTypes, ) +from datetime import datetime, timedelta, timezone, time as dt_time + +# 北京时间 UTC+8 +BEIJING_TZ = timezone(timedelta(hours=8)) + from config import ( TELEGRAM_BOT_TOKEN, TELEGRAM_ADMIN_CHAT_IDS, @@ -51,6 +56,13 @@ from config import ( S2A_API_MODE, BROWSER_RANDOM_FINGERPRINT, batch_remove_teams_by_names, + SCHEDULER_ENABLED, + SCHEDULER_START_HOUR, + SCHEDULER_END_HOUR, + SCHEDULER_BATCH_SIZE, + SCHEDULER_COOLDOWN_MINUTES, + SCHEDULER_OUTPUT_TYPE, + SCHEDULER_MAX_CONSECUTIVE_FAILURES, ) from utils import load_team_tracker, get_all_incomplete_accounts, save_team_tracker, get_completed_teams, batch_remove_completed_teams from bot_notifier import BotNotifier, set_notifier, progress_finish @@ -105,6 +117,20 @@ class ProvisionerBot: "errors": [] } self._import_batch_timeout_task = None # 超时任务 + # ==================== 调度器状态 ==================== + self._scheduler_active = False # 调度器是否正在运行 + self._scheduler_task: Optional[asyncio.Task] = None # 调度器 asyncio.Task + self._scheduler_stop_event = asyncio.Event() # 用于通知调度器停止 + self._scheduler_round = 0 # 当前轮次 + self._scheduler_stats = { # 累计统计 + "total_rounds": 0, + "total_registered": 0, + "total_ingested": 0, + "total_failed": 0, + "consecutive_failures": 0, + "start_time": None, + } + self._scheduler_suspended_date = None # /stop 后挂起当天调度 (date 对象) async def start(self): """启动 Bot""" @@ -168,6 +194,9 @@ class ProvisionerBot: ("keys_usage", self.cmd_keys_usage), ("autogptplus", self.cmd_autogptplus), ("update_token", self.cmd_update_token), + ("schedule", self.cmd_schedule), + ("schedule_config", self.cmd_schedule_config), + ("schedule_status", self.cmd_schedule_status), ] for cmd, handler in handlers: self.app.add_handler(CommandHandler(cmd, handler)) @@ -228,6 +257,16 @@ class ProvisionerBot: ) log.info(f"Stock check scheduled every {TELEGRAM_CHECK_INTERVAL}s") + # 注册定时调度器 (每天 start_hour 触发) + if SCHEDULER_ENABLED: + trigger_time = dt_time(hour=SCHEDULER_START_HOUR, minute=0, second=0, tzinfo=BEIJING_TZ) + self.app.job_queue.run_daily( + self._scheduler_daily_trigger, + time=trigger_time, + name="scheduler_daily" + ) + log.info(f"Scheduler registered: daily at {SCHEDULER_START_HOUR:02d}:00 - {SCHEDULER_END_HOUR:02d}:00 (Beijing Time)") + # 启动通知器 await self.notifier.start() @@ -1395,7 +1434,13 @@ class ProvisionerBot: @admin_only async def cmd_stop(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """强制停止当前任务""" - if not self.current_task or self.current_task.done(): + # 如果调度器正在运行 + scheduler_was_active = self._scheduler_active + if self._scheduler_active: + self._scheduler_stop_event.set() + self._scheduler_suspended_date = datetime.now().date() # 挂起今天的调度 + + if (not self.current_task or self.current_task.done()) and not scheduler_was_active: await update.message.reply_text("📭 当前没有运行中的任务") return @@ -1477,6 +1522,16 @@ class ProvisionerBot: await update.message.reply_text("\n".join(report_lines), parse_mode="HTML") + # 如果调度器被停止,额外通知 + if scheduler_was_active: + await update.message.reply_text( + "⏰ 调度器已停止\n\n" + "调度器已禁用,不会继续运行\n" + "明天 08:00 将自动重新启动\n\n" + "如需手动恢复: /schedule on", + parse_mode="HTML" + ) + except Exception as e: await update.message.reply_text(f"❌ 停止任务时出错: {e}") @@ -6263,6 +6318,669 @@ class ProvisionerBot: except Exception as e: await update.message.reply_text(f"❌ 添加 IBAN 失败: {e}") + # ==================== 定时调度器 ==================== + + @admin_only + async def cmd_schedule(self, update: Update, context: ContextTypes.DEFAULT_TYPE): + """开启/关闭定时调度器""" + import config as cfg + + args = context.args + if not args: + # 显示当前状态 + status = "✅ 运行中" if self._scheduler_active else ("⏸️ 已启用 (等待触发)" if cfg.SCHEDULER_ENABLED else "❌ 已关闭") + text = ( + f"⏰ 定时调度器\n\n" + f"状态: {status}\n" + f"时间窗口: {cfg.SCHEDULER_START_HOUR:02d}:00 - {cfg.SCHEDULER_END_HOUR:02d}:00\n" + f"每轮注册: {cfg.SCHEDULER_BATCH_SIZE} 个\n" + f"冷却时间: {cfg.SCHEDULER_COOLDOWN_MINUTES} 分钟\n" + f"连续失败上限: {cfg.SCHEDULER_MAX_CONSECUTIVE_FAILURES} 轮\n\n" + f"用法:\n" + f" /schedule on - 立即开启\n" + f" /schedule off - 关闭\n" + f" /schedule_config - 配置参数\n" + f" /schedule_status - 查看运行状态" + ) + await update.message.reply_text(text, parse_mode="HTML") + return + + action = args[0].lower() + + if action == "on": + if self._scheduler_active: + await update.message.reply_text("⚠️ 调度器已经在运行中") + return + + if self.current_task and not self.current_task.done(): + await update.message.reply_text( + f"⚠️ 有任务正在运行: {self.current_team}\n" + "请先等待任务完成或使用 /stop 停止" + ) + return + + # 更新 config + cfg.SCHEDULER_ENABLED = True + + # 检查是否在时间窗口内 + now = datetime.now() + if cfg.SCHEDULER_START_HOUR <= now.hour < cfg.SCHEDULER_END_HOUR: + # 在时间窗口内,立即启动 + self._start_scheduler() + await update.message.reply_text( + f"✅ 调度器已启动\n\n" + f"当前在时间窗口内,立即开始\n" + f"窗口: {cfg.SCHEDULER_START_HOUR:02d}:00 - {cfg.SCHEDULER_END_HOUR:02d}:00\n" + f"每轮: 注册 {cfg.SCHEDULER_BATCH_SIZE} 个 → run_all → 冷却 {cfg.SCHEDULER_COOLDOWN_MINUTES} 分钟", + parse_mode="HTML" + ) + else: + # 不在时间窗口内,注册 daily job + self._register_scheduler_daily_job() + await update.message.reply_text( + f"✅ 调度器已启用\n\n" + f"当前不在时间窗口内\n" + f"将在每天 {cfg.SCHEDULER_START_HOUR:02d}:00 自动启动\n" + f"窗口: {cfg.SCHEDULER_START_HOUR:02d}:00 - {cfg.SCHEDULER_END_HOUR:02d}:00", + parse_mode="HTML" + ) + + elif action == "off": + cfg.SCHEDULER_ENABLED = False + if self._scheduler_active: + self._scheduler_stop_event.set() + await update.message.reply_text( + "🛑 调度器正在停止\n\n" + "当前轮次完成后将停止调度器\n" + "如需立即停止任务,请使用 /stop", + parse_mode="HTML" + ) + else: + # 移除 daily job + self._remove_scheduler_daily_job() + await update.message.reply_text( + "❌ 调度器已关闭", + parse_mode="HTML" + ) + else: + await update.message.reply_text( + "用法: /schedule on/schedule off", + parse_mode="HTML" + ) + + @admin_only + async def cmd_schedule_config(self, update: Update, context: ContextTypes.DEFAULT_TYPE): + """配置调度器参数""" + import config as cfg + import tomli_w + + args = context.args + if not args: + text = ( + f"⚙️ 调度器配置\n\n" + f"start_hour = {cfg.SCHEDULER_START_HOUR} (开始时间)\n" + f"end_hour = {cfg.SCHEDULER_END_HOUR} (结束时间)\n" + f"batch_size = {cfg.SCHEDULER_BATCH_SIZE} (每轮注册数)\n" + f"cooldown = {cfg.SCHEDULER_COOLDOWN_MINUTES} (冷却分钟)\n" + f"max_failures = {cfg.SCHEDULER_MAX_CONSECUTIVE_FAILURES} (连续失败上限)\n\n" + f"修改示例:\n" + f" /schedule_config batch_size 30\n" + f" /schedule_config start_hour 9\n" + f" /schedule_config cooldown 10" + ) + await update.message.reply_text(text, parse_mode="HTML") + return + + if len(args) < 2: + await update.message.reply_text("❌ 用法: /schedule_config 参数名 值", parse_mode="HTML") + return + + key = args[0].lower() + try: + value = int(args[1]) + except ValueError: + await update.message.reply_text("❌ 值必须为整数") + return + + # 参数映射 + config_map = { + "start_hour": ("start_hour", 0, 23), + "end_hour": ("end_hour", 0, 23), + "batch_size": ("batch_size", 1, 200), + "cooldown": ("cooldown_minutes", 1, 60), + "max_failures": ("max_consecutive_failures", 1, 20), + } + + if key not in config_map: + valid_keys = ", ".join(config_map.keys()) + await update.message.reply_text(f"❌ 无效参数。可选: {valid_keys}") + return + + toml_key, min_val, max_val = config_map[key] + if not (min_val <= value <= max_val): + await update.message.reply_text(f"❌ 值必须在 {min_val}-{max_val} 之间") + return + + # 更新内存中的配置 + attr_name = f"SCHEDULER_{toml_key.upper()}" + old_value = getattr(cfg, attr_name) + setattr(cfg, attr_name, value) + + # 持久化到 config.toml + try: + import tomllib + with open(CONFIG_FILE, "rb") as f: + config = tomllib.load(f) + + if "scheduler" not in config: + config["scheduler"] = {} + config["scheduler"][toml_key] = value + + with open(CONFIG_FILE, "wb") as f: + tomli_w.dump(config, f) + + await update.message.reply_text( + f"✅ 配置已更新\n\n" + f"{key}: {old_value} → {value}\n\n" + f"💡 如调度器正在运行,下轮生效", + parse_mode="HTML" + ) + except ImportError: + await update.message.reply_text( + f"⚠️ 内存配置已更新 ({key}: {old_value} → {value})\n" + "但无法持久化: 缺少 tomli_w 依赖" + ) + except Exception as e: + await update.message.reply_text( + f"⚠️ 内存配置已更新 ({key}: {old_value} → {value})\n" + f"但持久化失败: {e}" + ) + + @admin_only + async def cmd_schedule_status(self, update: Update, context: ContextTypes.DEFAULT_TYPE): + """查看调度器运行状态""" + import config as cfg + + if not self._scheduler_active: + status = "⏸️ 已启用 (等待触发)" if cfg.SCHEDULER_ENABLED else "❌ 未启用" + await update.message.reply_text( + f"📊 调度器状态\n\n" + f"状态: {status}\n\n" + f"使用 /schedule on 启动", + parse_mode="HTML" + ) + return + + stats = self._scheduler_stats + elapsed = "" + if stats["start_time"]: + delta = datetime.now() - stats["start_time"] + hours, remainder = divmod(int(delta.total_seconds()), 3600) + minutes, seconds = divmod(remainder, 60) + elapsed = f"{hours}h{minutes}m{seconds}s" + + text = ( + f"📊 调度器运行状态\n\n" + f"🟢 运行中 | 第 {self._scheduler_round} 轮\n" + f"⏱️ 已运行: {elapsed}\n" + f"⏰ 窗口: {cfg.SCHEDULER_START_HOUR:02d}:00 - {cfg.SCHEDULER_END_HOUR:02d}:00\n\n" + f"累计统计:\n" + f" 📝 注册: {stats['total_registered']} 个\n" + f" 📥 入库: {stats['total_ingested']} 个\n" + f" ❌ 失败轮次: {stats['total_failed']}\n" + f" 🔄 总轮次: {stats['total_rounds']}\n" + f" ⚠️ 连续失败: {stats['consecutive_failures']}/{cfg.SCHEDULER_MAX_CONSECUTIVE_FAILURES}" + ) + + if self.current_team: + text += f"\n\n🏃 当前任务: {self.current_team}" + + await update.message.reply_text(text, parse_mode="HTML") + + def _register_scheduler_daily_job(self): + """注册每日定时触发 Job""" + import config as cfg + + # 先移除旧的 job + self._remove_scheduler_daily_job() + + if self.app and self.app.job_queue: + trigger_time = dt_time(hour=cfg.SCHEDULER_START_HOUR, minute=0, second=0, tzinfo=BEIJING_TZ) + self.app.job_queue.run_daily( + self._scheduler_daily_trigger, + time=trigger_time, + name="scheduler_daily" + ) + log.info(f"Scheduler daily job registered at {cfg.SCHEDULER_START_HOUR:02d}:00 (Beijing Time)") + + def _remove_scheduler_daily_job(self): + """移除每日定时触发 Job""" + if self.app and self.app.job_queue: + jobs = self.app.job_queue.get_jobs_by_name("scheduler_daily") + for job in jobs: + job.schedule_removal() + + async def _scheduler_daily_trigger(self, context: ContextTypes.DEFAULT_TYPE): + """每日定时触发回调 - 由 job_queue 调用""" + import config as cfg + + if not cfg.SCHEDULER_ENABLED: + return + + # 检查是否被 /stop 挂起了今天的调度 + today = datetime.now().date() + if self._scheduler_suspended_date == today: + log.info(f"Scheduler suspended for today ({today}), skipping") + return + + if self._scheduler_active: + log.info("Scheduler already active, skipping daily trigger") + return + + if self.current_task and not self.current_task.done(): + log.warning("Task already running, skipping scheduler trigger") + # 通知管理员 + for chat_id in TELEGRAM_ADMIN_CHAT_IDS: + try: + await context.bot.send_message( + chat_id, + f"⚠️ 调度器触发跳过\n\n" + f"当前有任务在运行: {self.current_team}\n" + f"请任务完成后手动使用 /schedule on 启动", + parse_mode="HTML" + ) + except Exception: + pass + return + + # 启动调度器 + self._start_scheduler() + + # 通知管理员 + for chat_id in TELEGRAM_ADMIN_CHAT_IDS: + try: + await context.bot.send_message( + chat_id, + f"⏰ 定时调度器已启动\n\n" + f"时间窗口: {cfg.SCHEDULER_START_HOUR:02d}:00 - {cfg.SCHEDULER_END_HOUR:02d}:00\n" + f"每轮: 注册 {cfg.SCHEDULER_BATCH_SIZE} 个 → run_all → 冷却 {cfg.SCHEDULER_COOLDOWN_MINUTES} 分钟", + parse_mode="HTML" + ) + except Exception: + pass + + def _start_scheduler(self): + """启动调度器循环""" + self._scheduler_active = True + self._scheduler_stop_event.clear() + self._scheduler_round = 0 + self._scheduler_stats = { + "total_rounds": 0, + "total_registered": 0, + "total_ingested": 0, + "total_failed": 0, + "consecutive_failures": 0, + "start_time": datetime.now(), + } + self._scheduler_task = asyncio.create_task(self._scheduler_loop()) + + async def _scheduler_loop(self): + """调度器主循环: 注册 → run_all → 冷却 → 重复""" + import config as cfg + + log.section("定时调度器启动") + chat_id = TELEGRAM_ADMIN_CHAT_IDS[0] if TELEGRAM_ADMIN_CHAT_IDS else None + + try: + while not self._scheduler_stop_event.is_set(): + # ===== 时间窗口检查 ===== + now = datetime.now() + if not (cfg.SCHEDULER_START_HOUR <= now.hour < cfg.SCHEDULER_END_HOUR): + log.info(f"超出时间窗口 ({cfg.SCHEDULER_START_HOUR:02d}:00 - {cfg.SCHEDULER_END_HOUR:02d}:00),调度结束") + break + + self._scheduler_round += 1 + round_num = self._scheduler_round + log.section(f"调度器 - 第 {round_num} 轮") + + # ===== Phase 1: Provisioning (注册 GPT Team 账号) ===== + log.info(f"[Phase 1] 注册 {cfg.SCHEDULER_BATCH_SIZE} 个 GPT Team 账号...") + + if chat_id: + try: + await self.app.bot.send_message( + chat_id, + f"🔄 第 {round_num} 轮开始\n\n" + f"📝 Phase 1: 注册 {cfg.SCHEDULER_BATCH_SIZE} 个账号...\n" + f"⏰ {now.strftime('%H:%M:%S')}", + parse_mode="HTML" + ) + except Exception: + pass + + # 重置停止标志 + try: + import run + run._shutdown_requested = False + except Exception: + pass + + self.current_team = f"调度器-第{round_num}轮-注册" + + # 执行注册 + reg_success = 0 + reg_fail = 0 + try: + reg_success, reg_fail = await self._scheduler_run_registration( + chat_id, cfg.SCHEDULER_BATCH_SIZE, cfg.SCHEDULER_OUTPUT_TYPE + ) + except Exception as e: + log.error(f"调度器注册阶段异常: {e}") + reg_fail = cfg.SCHEDULER_BATCH_SIZE + + self._scheduler_stats["total_registered"] += reg_success + + # 检查中断 + if self._scheduler_stop_event.is_set(): + break + + # 注册全部失败检查 + if reg_success == 0: + self._scheduler_stats["consecutive_failures"] += 1 + self._scheduler_stats["total_failed"] += 1 + log.warning(f"第 {round_num} 轮注册全部失败 (连续失败: {self._scheduler_stats['consecutive_failures']})") + + if self._scheduler_stats["consecutive_failures"] >= cfg.SCHEDULER_MAX_CONSECUTIVE_FAILURES: + log.error(f"连续失败 {self._scheduler_stats['consecutive_failures']} 轮,调度器自动暂停") + if chat_id: + try: + await self.app.bot.send_message( + chat_id, + f"🚨 调度器自动暂停\n\n" + f"连续 {self._scheduler_stats['consecutive_failures']} 轮注册全部失败\n" + f"请检查配置后使用 /schedule on 重新启动\n\n" + f"可能原因:\n" + f"• IBAN 已用完\n" + f"• 邮件 API 异常\n" + f"• 域名全部拉黑", + parse_mode="HTML" + ) + except Exception: + pass + break + continue # 跳过 run_all,直接下一轮 + else: + self._scheduler_stats["consecutive_failures"] = 0 + + # ===== Phase 1.5: Verify (验证 account_id) ===== + log.info(f"[Phase 1.5] 验证注册账号 account_id...") + + if chat_id: + try: + await self.app.bot.send_message( + chat_id, + f"🔍 第 {round_num} 轮 - Phase 1.5\n\n" + f"📝 注册完成: ✅ {reg_success} / ❌ {reg_fail}\n" + f"🔍 正在验证 account_id...", + parse_mode="HTML" + ) + except Exception: + pass + + self.current_team = f"调度器-第{round_num}轮-验证" + + try: + await self._validate_and_cleanup_accounts(chat_id, force_all=True) + log.success("verify_all 完成") + except Exception as e: + log.error(f"调度器验证阶段异常: {e}") + + # 检查中断 + if self._scheduler_stop_event.is_set(): + break + + # ===== Phase 2: Ingestion (run_all) ===== + log.info(f"[Phase 2] 执行 run_all 入库处理...") + + if chat_id: + try: + await self.app.bot.send_message( + chat_id, + f"🚀 第 {round_num} 轮 - Phase 2\n\n" + f"📝 注册: ✅ {reg_success} / ❌ {reg_fail}\n" + f"🔍 验证完成\n" + f"📥 开始 run_all 入库处理...", + parse_mode="HTML" + ) + except Exception: + pass + + self.current_team = f"调度器-第{round_num}轮-run_all" + + ingested = 0 + try: + ingested = await self._scheduler_run_all() + except Exception as e: + log.error(f"调度器 run_all 阶段异常: {e}") + + self._scheduler_stats["total_ingested"] += ingested + self._scheduler_stats["total_rounds"] += 1 + + # 检查中断 + if self._scheduler_stop_event.is_set(): + break + + # ===== Graceful Boundary: 任务完成后检查时间 ===== + now = datetime.now() + if not (cfg.SCHEDULER_START_HOUR <= now.hour < cfg.SCHEDULER_END_HOUR): + log.info(f"run_all 完成后已超出时间窗口 ({now.strftime('%H:%M:%S')}),调度结束") + if chat_id: + try: + await self.app.bot.send_message( + chat_id, + f"⏰ 第 {round_num} 轮完成后已超出时间窗口\n\n" + f"当前: {now.strftime('%H:%M:%S')}\n" + f"窗口: {cfg.SCHEDULER_START_HOUR:02d}:00 - {cfg.SCHEDULER_END_HOUR:02d}:00\n" + f"本日调度结束", + parse_mode="HTML" + ) + except Exception: + pass + break + + # ===== Phase 3: Cooldown ===== + cooldown = cfg.SCHEDULER_COOLDOWN_MINUTES * 60 + log.info(f"[Phase 3] 冷却 {cfg.SCHEDULER_COOLDOWN_MINUTES} 分钟...") + + if chat_id: + try: + next_time = now + timedelta(seconds=cooldown) + await self.app.bot.send_message( + chat_id, + f"⏳ 第 {round_num} 轮完成\n\n" + f"✅ 注册: {reg_success} | 📥 入库: {ingested}\n" + f"冷却 {cfg.SCHEDULER_COOLDOWN_MINUTES} 分钟...\n" + f"下轮预计: {next_time.strftime('%H:%M:%S')}", + parse_mode="HTML" + ) + except Exception: + pass + + self.current_team = f"调度器-冷却中" + + # 可中断的等待 + try: + await asyncio.wait_for( + self._scheduler_stop_event.wait(), + timeout=cooldown + ) + # 如果 wait 返回,说明收到了停止信号 + break + except asyncio.TimeoutError: + # 正常超时,继续下一轮 + pass + + except Exception as e: + log.error(f"调度器异常退出: {e}") + finally: + self._scheduler_active = False + self.current_task = None + self.current_team = None + log.info("调度器已停止") + + # 发送日报 + await self._send_daily_report() + + async def _scheduler_run_registration(self, chat_id: int, count: int, output_type: str) -> tuple: + """调度器: 执行注册阶段,返回 (success_count, fail_count)""" + from auto_gpt_team import run_single_registration_auto, cleanup_chrome_processes, get_register_mode + from config import CONCURRENT_ENABLED, CONCURRENT_WORKERS + import json + import threading + + results = [] + success_count = 0 + fail_count = 0 + results_lock = threading.Lock() + + current_mode = get_register_mode() + workers = CONCURRENT_WORKERS if CONCURRENT_ENABLED else 1 + workers = min(workers, count) + + task_queue = list(range(count)) + queue_lock = threading.Lock() + + def worker_task(worker_id: int): + nonlocal success_count, fail_count + + while True: + try: + import run + if run._shutdown_requested: + break + except Exception: + pass + + with queue_lock: + if not task_queue: + break + task_idx = task_queue.pop(0) + + try: + result = run_single_registration_auto( + progress_callback=None, + step_callback=None + ) + + with results_lock: + if result.get("stopped"): + break + elif result.get("success"): + success_count += 1 + results.append({ + "account": result["account"], + "password": result["password"], + "token": result["token"], + "account_id": result.get("account_id", "") + }) + else: + fail_count += 1 + except Exception as e: + with results_lock: + fail_count += 1 + log.error(f"Scheduler Worker {worker_id}: 注册异常: {e}") + + cleanup_chrome_processes() + + # 使用线程池并发执行 + import concurrent.futures + loop = asyncio.get_event_loop() + with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as executor: + async_futures = [loop.run_in_executor(executor, worker_task, i) for i in range(workers)] + await asyncio.gather(*async_futures, return_exceptions=True) + + # 保存结果到 team.json + if results and output_type == "team": + try: + team_file = TEAM_JSON_FILE + existing = [] + if team_file.exists(): + with open(team_file, "r", encoding="utf-8") as f: + existing = json.load(f) + + existing.extend(results) + + with open(team_file, "w", encoding="utf-8") as f: + json.dump(existing, f, ensure_ascii=False, indent=2) + + reload_config() + log.success(f"调度器: {success_count} 个账号已写入 team.json (总计 {len(existing)})") + except Exception as e: + log.error(f"调度器: 保存 team.json 失败: {e}") + + return (success_count, fail_count) + + async def _scheduler_run_all(self) -> int: + """调度器: 执行 run_all,返回成功入库数""" + from run import run_all_teams + from team_service import preload_all_account_ids + from utils import load_team_tracker, save_team_tracker, add_team_owners_to_tracker + from config import DEFAULT_PASSWORD + + loop = asyncio.get_event_loop() + + def _task(): + preload_all_account_ids() + _tracker = load_team_tracker() + add_team_owners_to_tracker(_tracker, DEFAULT_PASSWORD) + save_team_tracker(_tracker) + return run_all_teams() + + results = await loop.run_in_executor(self.executor, _task) + + # 自动清理 team.json 和 team_tracker.json + await self._auto_clean_after_run_all() + + # 统计成功入库数 + ingested = sum(1 for r in (results or []) if r.get("status") in ("success", "completed")) + return ingested + + async def _send_daily_report(self): + """发送调度器日报""" + stats = self._scheduler_stats + if stats["total_rounds"] == 0 and stats["total_registered"] == 0: + return # 没有任何数据,不发送 + + elapsed = "" + if stats["start_time"]: + delta = datetime.now() - stats["start_time"] + hours, remainder = divmod(int(delta.total_seconds()), 3600) + minutes, _ = divmod(remainder, 60) + elapsed = f"{hours}h{minutes}m" + + text = ( + f"📊 调度器日报\n\n" + f"⏱️ 运行时长: {elapsed}\n" + f"🔄 总轮次: {stats['total_rounds']}\n\n" + f"累计结果:\n" + f" 📝 注册成功: {stats['total_registered']} 个\n" + f" 📥 入库成功: {stats['total_ingested']} 个\n" + f" ❌ 失败轮次: {stats['total_failed']}\n" + ) + + if stats["consecutive_failures"] >= 1: + text += f"\n⚠️ 最终连续失败: {stats['consecutive_failures']} 轮" + + for chat_id in TELEGRAM_ADMIN_CHAT_IDS: + try: + await self.app.bot.send_message( + chat_id, + text, + parse_mode="HTML" + ) + except Exception: + pass + async def main(): """主函数"""